From f9410d8e48031d9b2c5c7b7296d0f47082553a3f Mon Sep 17 00:00:00 2001 From: Achyut Madhusudan Date: Wed, 22 Nov 2023 23:04:54 +0530 Subject: [PATCH] Extracted DB and Object Storage Logs to display on DSPA Signed-off-by: Achyut Madhusudan --- controllers/database.go | 35 +++++++++------ controllers/dspipeline_controller.go | 13 +++--- controllers/storage.go | 67 ++++++++++++++++------------ controllers/storage_test.go | 41 ++++++++--------- controllers/suite_test.go | 8 ++-- 5 files changed, 91 insertions(+), 73 deletions(-) diff --git a/controllers/database.go b/controllers/database.go index b0e6d891f..d2c0c6fdc 100644 --- a/controllers/database.go +++ b/controllers/database.go @@ -21,10 +21,13 @@ import ( b64 "encoding/base64" "fmt" + "time" + + "errors" + _ "github.com/go-sql-driver/mysql" dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1" "github.com/opendatahub-io/data-science-pipelines-operator/controllers/config" - "time" ) const dbSecret = "mariadb/secret.yaml.tmpl" @@ -37,7 +40,7 @@ var mariadbTemplates = []string{ } // extract to var for mocking in testing -var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) bool { +var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) (bool, error) { // Create a context with a timeout of 1 second ctx, cancel := context.WithTimeout(context.Background(), dbConnectionTimeout) defer cancel() @@ -45,22 +48,23 @@ var ConnectAndQueryDatabase = func(host, port, username, password, dbname string connectionString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, host, port, dbname) db, err := sql.Open("mysql", connectionString) if err != nil { - return false + return false, err } defer db.Close() testStatement := "SELECT 1;" _, err = db.QueryContext(ctx, testStatement) - return err == nil + return err == nil, nil } func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication, - params *DSPAParams) bool { + params *DSPAParams) (bool, error) { log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name) if params.DatabaseHealthCheckDisabled(dsp) { - log.V(1).Info("Database health check disabled, assuming database is available and ready.") - return true + infoMessage := "Database health check disabled, assuming database is available and ready." + log.V(1).Info(infoMessage) + return true, nil } log.Info("Performing Database Health Check") @@ -68,8 +72,9 @@ func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1al usingExternalDB := params.UsingExternalDB(dsp) usingMariaDB := !databaseSpecified || dsp.Spec.Database.MariaDB != nil if !usingMariaDB && !usingExternalDB { - log.Info("Could not connect to Database: Unsupported Type") - return false + errorMessage := "Could not connect to Database: Unsupported Type" + log.Error(nil, errorMessage) + return false, errors.New(errorMessage) } decodePass, _ := b64.StdEncoding.DecodeString(params.DBConnection.Password) @@ -77,18 +82,20 @@ func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1al log.V(1).Info(fmt.Sprintf("Database Heath Check connection timeout: %s", dbConnectionTimeout)) - dbHealthCheckPassed := ConnectAndQueryDatabase(params.DBConnection.Host, + dbHealthCheckPassed, err := ConnectAndQueryDatabase(params.DBConnection.Host, params.DBConnection.Port, params.DBConnection.Username, string(decodePass), params.DBConnection.DBName, dbConnectionTimeout) - if dbHealthCheckPassed { - log.Info("Database Health Check Successful") - } else { + + if err != nil { log.Info("Unable to connect to Database") + } else { + log.Info("Database Health Check Successful") } - return dbHealthCheckPassed + + return dbHealthCheckPassed, err } func (r *DSPAReconciler) ReconcileDatabase(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication, diff --git a/controllers/dspipeline_controller.go b/controllers/dspipeline_controller.go index 14faf6b27..38eaad295 100644 --- a/controllers/dspipeline_controller.go +++ b/controllers/dspipeline_controller.go @@ -19,6 +19,7 @@ package controllers import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/go-logr/logr" @@ -224,8 +225,8 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } // Get Prereq Status (DB and ObjStore Ready) - dbAvailable := r.isDatabaseAccessible(ctx, dspa, params) - objStoreAvailable := r.isObjectStorageAccessible(ctx, dspa, params) + dbAvailable, dbAvailableError := r.isDatabaseAccessible(ctx, dspa, params) + objStoreAvailable, objStoreAvailableError := r.isObjectStorageAccessible(ctx, dspa, params) dspaPrereqsReady := dbAvailable && objStoreAvailable if dspaPrereqsReady { @@ -269,7 +270,7 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } - conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable) + conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable, dbAvailableError, objStoreAvailableError) if err != nil { log.Info(err.Error()) return ctrl.Result{}, err @@ -411,14 +412,14 @@ func (r *DSPAReconciler) handleReadyCondition(ctx context.Context, dspa *dspav1a } func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.DataSciencePipelinesApplication, - params *DSPAParams, dbAvailableStatus, objStoreAvailableStatus bool) ([]metav1.Condition, error) { + params *DSPAParams, dbAvailableStatus bool, objStoreAvailableStatus bool, dbAvailableError error, objStoreAvailableError error) ([]metav1.Condition, error) { // Create Database Availability Condition databaseAvailable := r.buildCondition(config.DatabaseAvailable, dspa, config.DatabaseAvailable) if dbAvailableStatus { databaseAvailable.Status = metav1.ConditionTrue databaseAvailable.Message = "Database connectivity successfully verified" } else { - databaseAvailable.Message = "Could not connect to database" + databaseAvailable.Message = error.Error(dbAvailableError) } // Create Object Storage Availability Condition @@ -427,7 +428,7 @@ func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1. objStoreAvailable.Status = metav1.ConditionTrue objStoreAvailable.Message = "Object Store connectivity successfully verified" } else { - objStoreAvailable.Message = "Could not connect to Object Store" + objStoreAvailable.Message = error.Error(objStoreAvailableError) } // Create APIServer Readiness Condition diff --git a/controllers/storage.go b/controllers/storage.go index 90606da30..52844f6d4 100644 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -24,13 +24,14 @@ import ( "fmt" "net/http" + "time" + "github.com/go-logr/logr" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1" "github.com/opendatahub-io/data-science-pipelines-operator/controllers/config" "github.com/opendatahub-io/data-science-pipelines-operator/controllers/util" - "time" ) const storageSecret = "minio/secret.yaml.tmpl" @@ -94,7 +95,7 @@ func getHttpsTransportWithCACert(log logr.Logger, pemCerts []byte) (*http.Transp return transport, nil } -var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { +var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { cred := createCredentialProvidersChain(string(accesskey), string(secretkey)) opts := &minio.Options{ @@ -105,16 +106,18 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin if len(pemCerts) != 0 { tr, err := getHttpsTransportWithCACert(log, pemCerts) if err != nil { - log.Error(err, "Encountered error when processing custom ca bundle.") - return false + errorMessage := "Encountered error when processing custom ca bundle." + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } opts.Transport = tr } minioClient, err := minio.New(endpoint, opts) if err != nil { - log.Info(fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint)) - return false + errorMessage := fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint) + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } ctx, cancel := context.WithTimeout(ctx, objStoreConnectionTimeout) @@ -128,68 +131,74 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin // In the case that the Error is NoSuchKey (or NoSuchBucket), we can verify that the endpoint worked and the object just doesn't exist case minio.ErrorResponse: if err.Code == "NoSuchKey" || err.Code == "NoSuchBucket" { - return true + return true, err } } if util.IsX509UnknownAuthorityError(err) { - log.Error(err, "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. "+ - "If using an tls S3 connection with self-signed certs, you may specify a custom CABundle "+ - "to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already "+ - "provided a CABundle, verify the validity of the provided CABundle.") - return false + errorMessage := "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. " + + "If using an tls S3 connection with self-signed certs, you may specify a custom CABundle " + + "to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already " + + "provided a CABundle, verify the validity of the provided CABundle." + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } // Every other error means the endpoint in inaccessible, or the credentials provided do not have, at a minimum GetObject, permissions - log.Info(fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error())) - return false + errorMessage := fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error()) + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } // Getting here means the health check passed - return true + return true, nil } func (r *DSPAReconciler) isObjectStorageAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication, - params *DSPAParams) bool { + params *DSPAParams) (bool, error) { log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name) if params.ObjectStorageHealthCheckDisabled(dsp) { - log.V(1).Info("Object Storage health check disabled, assuming object store is available and ready.") - return true + infoMessage := "Object Storage health check disabled, assuming object store is available and ready." + log.V(1).Info(infoMessage) + return true, nil } log.Info("Performing Object Storage Health Check") endpoint, err := joinHostPort(params.ObjectStorageConnection.Host, params.ObjectStorageConnection.Port) if err != nil { - log.Error(err, "Could not determine Object Storage Endpoint") - return false + errorMessage := "Could not determine Object Storage Endpoint" + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } accesskey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.AccessKeyID) if err != nil { - log.Error(err, "Could not decode Object Storage Access Key ID") - return false + errorMessage := "Could not decode Object Storage Access Key ID" + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } secretkey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.SecretAccessKey) if err != nil { - log.Error(err, "Could not decode Object Storage Secret Access Key") - return false + errorMessage := "Could not decode Object Storage Secret Access Key" + log.Error(err, errorMessage) + return false, errors.New(errorMessage) } objStoreConnectionTimeout := config.GetDurationConfigWithDefault(config.ObjStoreConnectionTimeoutConfigName, config.DefaultObjStoreConnectionTimeout) log.V(1).Info(fmt.Sprintf("Object Store connection timeout: %s", objStoreConnectionTimeout)) - verified := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey, + verified, err := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey, *params.ObjectStorageConnection.Secure, params.APICustomPemCerts, objStoreConnectionTimeout) - if verified { - log.Info("Object Storage Health Check Successful") - } else { + if err != nil { log.Info("Object Storage Health Check Failed") + } else { + log.Info("Object Storage Health Check Successful") } - return verified + return verified, err } // ReconcileStorage will set up Storage Connection. diff --git a/controllers/storage_test.go b/controllers/storage_test.go index 36d863b97..308f90e41 100644 --- a/controllers/storage_test.go +++ b/controllers/storage_test.go @@ -20,6 +20,7 @@ package controllers import ( "context" "encoding/base64" + "errors" "testing" "time" @@ -270,8 +271,8 @@ func TestDefaultDeployBehaviorStorage(t *testing.T) { func TestIsDatabaseAccessibleTrue(t *testing.T) { // Override the live connection function with a mock version - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return true + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return true, nil } testNamespace := "testnamespace" @@ -302,14 +303,14 @@ func TestIsDatabaseAccessibleTrue(t *testing.T) { }, } - verified := reconciler.isObjectStorageAccessible(ctx, dspa, params) - assert.True(t, verified) + verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params) + assert.True(t, verified, err) } func TestIsDatabaseNotAccessibleFalse(t *testing.T) { // Override the live connection function with a mock version - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return false + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return false, errors.New("Object Store is not Accessible") } testNamespace := "testnamespace" @@ -340,14 +341,14 @@ func TestIsDatabaseNotAccessibleFalse(t *testing.T) { }, } - verified := reconciler.isObjectStorageAccessible(ctx, dspa, params) - assert.False(t, verified) + verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params) + assert.False(t, verified, err) } func TestDisabledHealthCheckReturnsTrue(t *testing.T) { // Override the live connection function with a mock version that would always return false if called - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return false + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return false, errors.New("Object Store is not Accessible") } testNamespace := "testnamespace" @@ -378,16 +379,16 @@ func TestDisabledHealthCheckReturnsTrue(t *testing.T) { }, } - verified := reconciler.isObjectStorageAccessible(ctx, dspa, params) + verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params) // if health check is disabled this should always return True // even thought the mock connection function would return false if called - assert.True(t, verified) + assert.True(t, verified, err) } func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) { // Override the live connection function with a mock version - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return true + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return true, nil } testNamespace := "testnamespace" @@ -418,14 +419,14 @@ func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) { }, } - verified := reconciler.isObjectStorageAccessible(ctx, dspa, params) - assert.False(t, verified) + verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params) + assert.False(t, verified, err) } func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) { // Override the live connection function with a mock version - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return true + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return true, nil } testNamespace := "testnamespace" @@ -456,8 +457,8 @@ func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) { }, } - verified := reconciler.isObjectStorageAccessible(ctx, dspa, params) - assert.False(t, verified) + verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params) + assert.False(t, verified, err) } func TestJoinHostPort(t *testing.T) { diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 5697550f4..34886937e 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -73,11 +73,11 @@ func TestAPIs(t *testing.T) { var _ = BeforeEach(func() { By("Overriding the Database and Object Store live connection functions with trivial stubs") - ConnectAndQueryDatabase = func(host string, port string, username string, password string, dbname string, dbConnectionTimeout time.Duration) bool { - return true + ConnectAndQueryDatabase = func(host string, port string, username string, password string, dbname string, dbConnectionTimeout time.Duration) (bool, error) { + return true, nil } - ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool { - return true + ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) { + return true, nil } })