Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted DB and Object Storage Error Logs to display on DSPA #487

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions controllers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
b64 "encoding/base64"
"fmt"

"time"

"errors"

_ "github.com/go-sql-driver/mysql"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"time"
)

const dbSecret = "mariadb/secret.yaml.tmpl"
Expand All @@ -37,58 +40,62 @@ var mariadbTemplates = []string{
}

// extract to var for mocking in testing
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) bool {
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) (bool, error) {
// Create a context with a timeout of 1 second
ctx, cancel := context.WithTimeout(context.Background(), dbConnectionTimeout)
defer cancel()

connectionString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, host, port, dbname)
db, err := sql.Open("mysql", connectionString)
if err != nil {
return false
return false, err
}
defer db.Close()

testStatement := "SELECT 1;"
_, err = db.QueryContext(ctx, testStatement)
return err == nil
return err == nil, nil
}

func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, error) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)

if params.DatabaseHealthCheckDisabled(dsp) {
log.V(1).Info("Database health check disabled, assuming database is available and ready.")
return true
infoMessage := "Database health check disabled, assuming database is available and ready."
log.V(1).Info(infoMessage)
return true, nil
}

log.Info("Performing Database Health Check")
databaseSpecified := dsp.Spec.Database != nil
usingExternalDB := params.UsingExternalDB(dsp)
usingMariaDB := !databaseSpecified || dsp.Spec.Database.MariaDB != nil
if !usingMariaDB && !usingExternalDB {
log.Info("Could not connect to Database: Unsupported Type")
return false
errorMessage := "Could not connect to Database: Unsupported Type"
log.Error(nil, errorMessage)
return false, errors.New(errorMessage)
}

decodePass, _ := b64.StdEncoding.DecodeString(params.DBConnection.Password)
dbConnectionTimeout := config.GetDurationConfigWithDefault(config.DBConnectionTimeoutConfigName, config.DefaultDBConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Database Heath Check connection timeout: %s", dbConnectionTimeout))

dbHealthCheckPassed := ConnectAndQueryDatabase(params.DBConnection.Host,
dbHealthCheckPassed, err := ConnectAndQueryDatabase(params.DBConnection.Host,
params.DBConnection.Port,
params.DBConnection.Username,
string(decodePass),
params.DBConnection.DBName,
dbConnectionTimeout)
if dbHealthCheckPassed {
log.Info("Database Health Check Successful")
} else {

if err != nil {
log.Info("Unable to connect to Database")
} else {
log.Info("Database Health Check Successful")
}
return dbHealthCheckPassed

return dbHealthCheckPassed, err
}

func (r *DSPAReconciler) ReconcileDatabase(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
Expand Down
13 changes: 7 additions & 6 deletions controllers/dspipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -224,8 +225,8 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

// Get Prereq Status (DB and ObjStore Ready)
dbAvailable := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable := r.isObjectStorageAccessible(ctx, dspa, params)
dbAvailable, dbAvailableError := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable, objStoreAvailableError := r.isObjectStorageAccessible(ctx, dspa, params)
dspaPrereqsReady := dbAvailable && objStoreAvailable

if dspaPrereqsReady {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable)
conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable, dbAvailableError, objStoreAvailableError)
if err != nil {
log.Info(err.Error())
return ctrl.Result{}, err
Expand Down Expand Up @@ -411,14 +412,14 @@ func (r *DSPAReconciler) handleReadyCondition(ctx context.Context, dspa *dspav1a
}

func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams, dbAvailableStatus, objStoreAvailableStatus bool) ([]metav1.Condition, error) {
params *DSPAParams, dbAvailableStatus bool, objStoreAvailableStatus bool, dbAvailableError error, objStoreAvailableError error) ([]metav1.Condition, error) {
// Create Database Availability Condition
databaseAvailable := r.buildCondition(config.DatabaseAvailable, dspa, config.DatabaseAvailable)
if dbAvailableStatus {
databaseAvailable.Status = metav1.ConditionTrue
databaseAvailable.Message = "Database connectivity successfully verified"
} else {
databaseAvailable.Message = "Could not connect to database"
databaseAvailable.Message = error.Error(dbAvailableError)
}

// Create Object Storage Availability Condition
Expand All @@ -427,7 +428,7 @@ func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.
objStoreAvailable.Status = metav1.ConditionTrue
objStoreAvailable.Message = "Object Store connectivity successfully verified"
} else {
objStoreAvailable.Message = "Could not connect to Object Store"
objStoreAvailable.Message = error.Error(objStoreAvailableError)
}

// Create APIServer Readiness Condition
Expand Down
67 changes: 38 additions & 29 deletions controllers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"fmt"
"net/http"

"time"

"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/util"
"time"
)

const storageSecret = "minio/secret.yaml.tmpl"
Expand Down Expand Up @@ -94,7 +95,7 @@ func getHttpsTransportWithCACert(log logr.Logger, pemCerts []byte) (*http.Transp
return transport, nil
}

var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
cred := createCredentialProvidersChain(string(accesskey), string(secretkey))

opts := &minio.Options{
Expand All @@ -105,16 +106,18 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
if len(pemCerts) != 0 {
tr, err := getHttpsTransportWithCACert(log, pemCerts)
if err != nil {
log.Error(err, "Encountered error when processing custom ca bundle.")
return false
errorMessage := "Encountered error when processing custom ca bundle."
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}
opts.Transport = tr
}

minioClient, err := minio.New(endpoint, opts)
if err != nil {
log.Info(fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint))
return false
errorMessage := fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint)
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

ctx, cancel := context.WithTimeout(ctx, objStoreConnectionTimeout)
Expand All @@ -128,68 +131,74 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
// In the case that the Error is NoSuchKey (or NoSuchBucket), we can verify that the endpoint worked and the object just doesn't exist
case minio.ErrorResponse:
if err.Code == "NoSuchKey" || err.Code == "NoSuchBucket" {
return true
return true, err
}
}

if util.IsX509UnknownAuthorityError(err) {
log.Error(err, "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. "+
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle "+
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already "+
"provided a CABundle, verify the validity of the provided CABundle.")
return false
errorMessage := "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. " +
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle " +
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already " +
"provided a CABundle, verify the validity of the provided CABundle."
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

// Every other error means the endpoint in inaccessible, or the credentials provided do not have, at a minimum GetObject, permissions
log.Info(fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error()))
return false
errorMessage := fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error())
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

// Getting here means the health check passed
return true
return true, nil
}

func (r *DSPAReconciler) isObjectStorageAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, error) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)
if params.ObjectStorageHealthCheckDisabled(dsp) {
log.V(1).Info("Object Storage health check disabled, assuming object store is available and ready.")
return true
infoMessage := "Object Storage health check disabled, assuming object store is available and ready."
log.V(1).Info(infoMessage)
return true, nil
}

log.Info("Performing Object Storage Health Check")

endpoint, err := joinHostPort(params.ObjectStorageConnection.Host, params.ObjectStorageConnection.Port)
if err != nil {
log.Error(err, "Could not determine Object Storage Endpoint")
return false
errorMessage := "Could not determine Object Storage Endpoint"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

accesskey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.AccessKeyID)
if err != nil {
log.Error(err, "Could not decode Object Storage Access Key ID")
return false
errorMessage := "Could not decode Object Storage Access Key ID"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

secretkey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.SecretAccessKey)
if err != nil {
log.Error(err, "Could not decode Object Storage Secret Access Key")
return false
errorMessage := "Could not decode Object Storage Secret Access Key"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

objStoreConnectionTimeout := config.GetDurationConfigWithDefault(config.ObjStoreConnectionTimeoutConfigName, config.DefaultObjStoreConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Object Store connection timeout: %s", objStoreConnectionTimeout))

verified := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
verified, err := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
*params.ObjectStorageConnection.Secure, params.APICustomPemCerts, objStoreConnectionTimeout)

if verified {
log.Info("Object Storage Health Check Successful")
} else {
if err != nil {
log.Info("Object Storage Health Check Failed")
} else {
log.Info("Object Storage Health Check Successful")
}
return verified
return verified, err
}

// ReconcileStorage will set up Storage Connection.
Expand Down
41 changes: 21 additions & 20 deletions controllers/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package controllers
import (
"context"
"encoding/base64"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -270,8 +271,8 @@ func TestDefaultDeployBehaviorStorage(t *testing.T) {

func TestIsDatabaseAccessibleTrue(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -302,14 +303,14 @@ func TestIsDatabaseAccessibleTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified, err)
}

func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return false, errors.New("Object Store is not Accessible")
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -340,14 +341,14 @@ func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
// Override the live connection function with a mock version that would always return false if called
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return false, errors.New("Object Store is not Accessible")
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -378,16 +379,16 @@ func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
// if health check is disabled this should always return True
// even thought the mock connection function would return false if called
assert.True(t, verified)
assert.True(t, verified, err)
}

func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -418,14 +419,14 @@ func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -456,8 +457,8 @@ func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestJoinHostPort(t *testing.T) {
Expand Down
Loading
Loading