Skip to content

Commit

Permalink
Extracted DB and Object Storage Logs to display on DSPA
Browse files Browse the repository at this point in the history
Signed-off-by: Achyut Madhusudan <amadhusu@redhat.com>
  • Loading branch information
Achyut Madhusudan committed Nov 23, 2023
1 parent b8e5b06 commit 9b5afdc
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 65 deletions.
25 changes: 14 additions & 11 deletions controllers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
b64 "encoding/base64"
"fmt"

"time"

_ "github.com/go-sql-driver/mysql"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"time"
)

const dbSecret = "mariadb/secret.yaml.tmpl"
Expand All @@ -37,47 +38,49 @@ var mariadbTemplates = []string{
}

// extract to var for mocking in testing
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) bool {
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) (bool, string) {
// Create a context with a timeout of 1 second
ctx, cancel := context.WithTimeout(context.Background(), dbConnectionTimeout)
defer cancel()

connectionString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, host, port, dbname)
db, err := sql.Open("mysql", connectionString)
if err != nil {
return false
return false, err.Error()
}
defer db.Close()

testStatement := "SELECT 1;"
_, err = db.QueryContext(ctx, testStatement)
return err == nil
return err == nil, ""
}

func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, string) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)

if params.DatabaseHealthCheckDisabled(dsp) {
log.V(1).Info("Database health check disabled, assuming database is available and ready.")
return true
infoMessage := "Database health check disabled, assuming database is available and ready."
log.V(1).Info(infoMessage)
return true, infoMessage
}

log.Info("Performing Database Health Check")
databaseSpecified := dsp.Spec.Database != nil
usingExternalDB := params.UsingExternalDB(dsp)
usingMariaDB := !databaseSpecified || dsp.Spec.Database.MariaDB != nil
if !usingMariaDB && !usingExternalDB {
log.Info("Could not connect to Database: Unsupported Type")
return false
errorMessage := "Could not connect to Database: Unsupported Type"
log.Error(nil, errorMessage)
return false, errorMessage
}

decodePass, _ := b64.StdEncoding.DecodeString(params.DBConnection.Password)
dbConnectionTimeout := config.GetDurationConfigWithDefault(config.DBConnectionTimeoutConfigName, config.DefaultDBConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Database Heath Check connection timeout: %s", dbConnectionTimeout))

dbHealthCheckPassed := ConnectAndQueryDatabase(params.DBConnection.Host,
dbHealthCheckPassed, message := ConnectAndQueryDatabase(params.DBConnection.Host,
params.DBConnection.Port,
params.DBConnection.Username,
string(decodePass),
Expand All @@ -88,7 +91,7 @@ func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1al
} else {
log.Info("Unable to connect to Database")
}
return dbHealthCheckPassed
return dbHealthCheckPassed, message
}

func (r *DSPAReconciler) ReconcileDatabase(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
Expand Down
15 changes: 9 additions & 6 deletions controllers/dspipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -224,8 +225,8 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

// Get Prereq Status (DB and ObjStore Ready)
dbAvailable := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable := r.isObjectStorageAccessible(ctx, dspa, params)
dbAvailable, dbAvailableMessage := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable, objStoreAvailableMessage := r.isObjectStorageAccessible(ctx, dspa, params)
dspaPrereqsReady := dbAvailable && objStoreAvailable

if dspaPrereqsReady {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable)
conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable, dbAvailableMessage, objStoreAvailableMessage)
if err != nil {
log.Info(err.Error())
return ctrl.Result{}, err
Expand Down Expand Up @@ -411,14 +412,15 @@ func (r *DSPAReconciler) handleReadyCondition(ctx context.Context, dspa *dspav1a
}

func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams, dbAvailableStatus, objStoreAvailableStatus bool) ([]metav1.Condition, error) {
params *DSPAParams, dbAvailableStatus bool, objStoreAvailableStatus bool, dbAvailableMessage string, objStoreAvailableMessage string) ([]metav1.Condition, error) {
// Create Database Availability Condition
databaseAvailable := r.buildCondition(config.DatabaseAvailable, dspa, config.DatabaseAvailable)
if dbAvailableStatus {
databaseAvailable.Status = metav1.ConditionTrue
databaseAvailable.Message = "Database connectivity successfully verified"
} else {
databaseAvailable.Message = "Could not connect to database"
databaseAvailable.Status = metav1.ConditionFalse
databaseAvailable.Message = dbAvailableMessage
}

// Create Object Storage Availability Condition
Expand All @@ -427,7 +429,8 @@ func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.
objStoreAvailable.Status = metav1.ConditionTrue
objStoreAvailable.Message = "Object Store connectivity successfully verified"
} else {
objStoreAvailable.Message = "Could not connect to Object Store"
objStoreAvailable.Status = metav1.ConditionFalse
objStoreAvailable.Message = objStoreAvailableMessage
}

// Create APIServer Readiness Condition
Expand Down
57 changes: 33 additions & 24 deletions controllers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"fmt"
"net/http"

"time"

"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/util"
"time"
)

const storageSecret = "minio/secret.yaml.tmpl"
Expand Down Expand Up @@ -94,7 +95,7 @@ func getHttpsTransportWithCACert(log logr.Logger, pemCerts []byte) (*http.Transp
return transport, nil
}

var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
cred := createCredentialProvidersChain(string(accesskey), string(secretkey))

opts := &minio.Options{
Expand All @@ -105,16 +106,18 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
if len(pemCerts) != 0 {
tr, err := getHttpsTransportWithCACert(log, pemCerts)
if err != nil {
log.Error(err, "Encountered error when processing custom ca bundle.")
return false
errorMessage := "Encountered error when processing custom ca bundle."
log.Error(err, errorMessage)
return false, errorMessage
}
opts.Transport = tr
}

minioClient, err := minio.New(endpoint, opts)
if err != nil {
errorMessage := "Could not connect to object storage endpoint"
log.Info(fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint))
return false
return false, errorMessage
}

ctx, cancel := context.WithTimeout(ctx, objStoreConnectionTimeout)
Expand All @@ -128,68 +131,74 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
// In the case that the Error is NoSuchKey (or NoSuchBucket), we can verify that the endpoint worked and the object just doesn't exist
case minio.ErrorResponse:
if err.Code == "NoSuchKey" || err.Code == "NoSuchBucket" {
return true
return true, err.Error()
}
}

if util.IsX509UnknownAuthorityError(err) {
log.Error(err, "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. "+
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle "+
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already "+
"provided a CABundle, verify the validity of the provided CABundle.")
return false
errorMessage := "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. " +
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle " +
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already " +
"provided a CABundle, verify the validity of the provided CABundle."
log.Error(err, errorMessage)
return false, errorMessage
}

// Every other error means the endpoint in inaccessible, or the credentials provided do not have, at a minimum GetObject, permissions
errorMessage := err.Error()
log.Info(fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error()))
return false
return false, errorMessage
}

// Getting here means the health check passed
return true
return true, ""
}

func (r *DSPAReconciler) isObjectStorageAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, string) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)
if params.ObjectStorageHealthCheckDisabled(dsp) {
log.V(1).Info("Object Storage health check disabled, assuming object store is available and ready.")
return true
infoMessage := "Object Storage health check disabled, assuming object store is available and ready."
log.V(1).Info(infoMessage)
return true, infoMessage
}

log.Info("Performing Object Storage Health Check")

endpoint, err := joinHostPort(params.ObjectStorageConnection.Host, params.ObjectStorageConnection.Port)
if err != nil {
log.Error(err, "Could not determine Object Storage Endpoint")
return false
errorMessage := "Could not determine Object Storage Endpoint"
log.Error(err, errorMessage)
return false, errorMessage
}

accesskey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.AccessKeyID)
if err != nil {
log.Error(err, "Could not decode Object Storage Access Key ID")
return false
errorMessage := "Could not decode Object Storage Access Key ID"
log.Error(err, errorMessage)
return false, errorMessage
}

secretkey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.SecretAccessKey)
if err != nil {
log.Error(err, "Could not decode Object Storage Secret Access Key")
return false
errorMessage := "Could not decode Object Storage Secret Access Key"
log.Error(err, errorMessage)
return false, errorMessage
}

objStoreConnectionTimeout := config.GetDurationConfigWithDefault(config.ObjStoreConnectionTimeoutConfigName, config.DefaultObjStoreConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Object Store connection timeout: %s", objStoreConnectionTimeout))

verified := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
verified, message := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
*params.ObjectStorageConnection.Secure, params.APICustomPemCerts, objStoreConnectionTimeout)

if verified {
log.Info("Object Storage Health Check Successful")
} else {
log.Info("Object Storage Health Check Failed")
}
return verified
return verified, message
}

// ReconcileStorage will set up Storage Connection.
Expand Down
40 changes: 20 additions & 20 deletions controllers/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ func TestDefaultDeployBehaviorStorage(t *testing.T) {

func TestIsDatabaseAccessibleTrue(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return true, ""
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -302,14 +302,14 @@ func TestIsDatabaseAccessibleTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified)
verified, message := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified, message)
}

func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return false, "Object Storage is not accessible"
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -340,14 +340,14 @@ func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, message := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, message)
}

func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
// Override the live connection function with a mock version that would always return false if called
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return false, "Object Storage is not accessible"
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -378,16 +378,16 @@ func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
verified, message := reconciler.isObjectStorageAccessible(ctx, dspa, params)
// if health check is disabled this should always return True
// even thought the mock connection function would return false if called
assert.True(t, verified)
assert.True(t, verified, message)
}

func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return true, ""
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -418,14 +418,14 @@ func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, message := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, message)
}

func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return true, ""
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -456,8 +456,8 @@ func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, message := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, message)
}

func TestJoinHostPort(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func TestAPIs(t *testing.T) {

var _ = BeforeEach(func() {
By("Overriding the Database and Object Store live connection functions with trivial stubs")
ConnectAndQueryDatabase = func(host string, port string, username string, password string, dbname string, dbConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryDatabase = func(host string, port string, username string, password string, dbname string, dbConnectionTimeout time.Duration) (bool, string) {
return true, ""
}
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, string) {
return true, ""
}
})

Expand Down

0 comments on commit 9b5afdc

Please sign in to comment.