From a7e1f4810001a539a514073afce4719385e7be4a Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Tue, 3 Sep 2024 10:44:59 +0800 Subject: [PATCH] feat: Support Postgres db in Core Keeper Resolves #4877. Support Postgres db in Core Keeper. Signed-off-by: Lindsey Cheng --- cmd/core-keeper/res/db/sql/00-utils.sql | 7 + cmd/core-keeper/res/db/sql/01-tables.sql | 13 + .../pkg/infrastructure/postgres/consts.go | 9 +- internal/pkg/infrastructure/postgres/kv.go | 263 ++++++++++++++++++ .../pkg/infrastructure/postgres/registry.go | 25 ++ internal/pkg/infrastructure/postgres/sql.go | 62 +++++ 6 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 cmd/core-keeper/res/db/sql/00-utils.sql create mode 100644 cmd/core-keeper/res/db/sql/01-tables.sql create mode 100644 internal/pkg/infrastructure/postgres/kv.go create mode 100644 internal/pkg/infrastructure/postgres/registry.go diff --git a/cmd/core-keeper/res/db/sql/00-utils.sql b/cmd/core-keeper/res/db/sql/00-utils.sql new file mode 100644 index 0000000000..c064fc33e9 --- /dev/null +++ b/cmd/core-keeper/res/db/sql/00-utils.sql @@ -0,0 +1,7 @@ +-- +-- Copyright (C) 2024 IOTech Ltd +-- +-- SPDX-License-Identifier: Apache-2.0 + +-- schema for core-keeper related tables +CREATE SCHEMA IF NOT EXISTS core_keeper; diff --git a/cmd/core-keeper/res/db/sql/01-tables.sql b/cmd/core-keeper/res/db/sql/01-tables.sql new file mode 100644 index 0000000000..0283132ea9 --- /dev/null +++ b/cmd/core-keeper/res/db/sql/01-tables.sql @@ -0,0 +1,13 @@ +-- +-- Copyright (C) 2024 IOTech Ltd +-- +-- SPDX-License-Identifier: Apache-2.0 + +-- core_keeper.config is used to store the config information +CREATE TABLE IF NOT EXISTS core_keeper.config ( + id UUID PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + created timestamp NOT NULL DEFAULT now(), + modified timestamp NOT NULL DEFAULT now() +); diff --git a/internal/pkg/infrastructure/postgres/consts.go b/internal/pkg/infrastructure/postgres/consts.go index 86b2f7ae8d..164ae13b6d 100644 --- a/internal/pkg/infrastructure/postgres/consts.go +++ b/internal/pkg/infrastructure/postgres/consts.go @@ -7,13 +7,15 @@ package postgres // constants relate to the postgres db schema names const ( - coreDataSchema = "core_data" + coreDataSchema = "core_data" + coreKeeperSchema = "core_keeper" ) // constants relate to the postgres db table names const ( eventTableName = coreDataSchema + ".event" readingTableName = coreDataSchema + ".reading" + configTableName = coreKeeperSchema + ".config" ) // constants relate to the event/reading postgres db table column names @@ -32,3 +34,8 @@ const ( mediaTypeCol = "mediatype" objectValueCol = "objectvalue" ) + +// constants relate to the keeper postgres db table column names +const ( + keyCol = "key" +) diff --git a/internal/pkg/infrastructure/postgres/kv.go b/internal/pkg/infrastructure/postgres/kv.go new file mode 100644 index 0000000000..606aff3df5 --- /dev/null +++ b/internal/pkg/infrastructure/postgres/kv.go @@ -0,0 +1,263 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "path" + "time" + + pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" + + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/spf13/cast" +) + +// KeeperKeys returns the values stored for the specified key or with the same key prefix +func (c *Client) KeeperKeys(key string, keyOnly bool, isRaw bool) ([]models.KVResponse, errors.EdgeX) { + var result []models.KVResponse + var sqlStatement string + + if keyOnly { + sqlStatement = sqlQueryFieldsByColAndLikePat(configTableName, []string{keyCol}, keyCol) + } else { + sqlStatement = sqlQueryFieldsByColAndLikePat(configTableName, []string{keyCol, valueCol, createdCol, modifiedCol}, keyCol) + } + + rows, err := c.ConnPool.Query(context.Background(), sqlStatement, key+"%") + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query rows by key '%s'", key), err) + } + + var kvKey string + if keyOnly { + _, err = pgx.ForEachRow(rows, []any{&kvKey}, func() error { + keyOnlyModel := models.KeyOnly(kvKey) + result = append(result, &keyOnlyModel) + return nil + }) + } else { + var kvVal string + var created, modified time.Time + _, err = pgx.ForEachRow(rows, []any{&kvKey, &kvVal, &created, &modified}, func() error { + var keyValue any + if isRaw { + decodeValue, decErr := base64.StdEncoding.DecodeString(kvVal) + if decErr != nil { + return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("decode the value of key %s failed", kvKey), err) + } + keyValue = string(decodeValue) + } else { + keyValue = kvVal + } + kvStore := models.KVS{ + Key: kvKey, + StoredData: models.StoredData{ + DBTimestamp: models.DBTimestamp{Created: created.UnixMilli(), Modified: modified.UnixMilli()}, + Value: keyValue, + }, + } + result = append(result, &kvStore) + return nil + }) + } + if err != nil { + return nil, pgClient.WrapDBError("failed to scan row to models.KVResponse", err) + } + + return result, nil +} + +// AddKeeperKeys inserts or updates the key-value pair(s) based on the passed models.KVS +// if isFlatten is enabled, multiple key-value pair(s) will be updated based on the Value from models.KVS +func (c *Client) AddKeeperKeys(kv models.KVS, isFlatten bool) ([]models.KeyOnly, errors.EdgeX) { + var keyReps []models.KeyOnly + + if isFlatten { + // process the value map and convert the fields and store to multiple key-value pairs + txErr := pgx.BeginFunc(context.Background(), c.ConnPool, func(tx pgx.Tx) error { + var err error + keyReps, err = updateMultiKVSInTx(tx, kv.Key, kv.Value) + return err + }) + if txErr != nil { + return nil, errors.NewCommonEdgeXWrapper(txErr) + } + } else { + // store the value in a single key + err := updateKVS(c.ConnPool, kv.Key, kv.Value) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + + keyReps = []models.KeyOnly{models.KeyOnly(kv.Key)} + } + return keyReps, nil +} + +// DeleteKeeperKeys deletes one key or multiple keys(with isRecurse enabled) +func (c *Client) DeleteKeeperKeys(key string, isRecurse bool) ([]models.KeyOnly, errors.EdgeX) { + var exists bool + var resp []models.KeyOnly + var childKeyCount uint32 + ctx := context.Background() + queryPattern := key + "/%" + + // check if the exact same key exists + err := c.ConnPool.QueryRow( + context.Background(), + sqlCheckExistsByCol(configTableName, keyCol), + key, + ).Scan(&exists) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key '%s'", key), err) + } + + // check if the key(s) start with the keyPrefix exist and get the count of the result + err = c.ConnPool.QueryRow( + context.Background(), + sqlQueryCountByColAndLikePat(configTableName, keyCol), + queryPattern, + ).Scan(&childKeyCount) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key starts with '%s'", key), err) + } + + if exists { + // delete the exact same key + _, err = c.ConnPool.Exec(ctx, sqlDeleteByColumn(configTableName, keyCol), key) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key '%s'", key), err) + } + resp = []models.KeyOnly{models.KeyOnly(key)} + } + + if childKeyCount == 0 { + if !exists { + // key is not found and no other keys starts with key exist + return nil, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, fmt.Sprintf("key '%s' not found", key), nil) + } + } else { + if isRecurse { + // also delete the keys starts with the same key (e.g., edgex/v3/core-data/Writable, edgex/v3/core-data/Database all starts with edgex/v3/core-data) + sqlStatement := sqlDeleteByColAndLikePat(configTableName, keyCol, keyCol) + rows, err := c.ConnPool.Query(ctx, sqlStatement, queryPattern) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to delete row by key starts with '%s'", key), err) + } + + var returnedKey string + _, err = pgx.ForEachRow(rows, []any{&returnedKey}, func() error { + resp = append(resp, models.KeyOnly(returnedKey)) + return nil + }) + if err != nil { + return nil, pgClient.WrapDBError("failed to scan returned keys to models.KeyOnly", err) + } + } else { + // if isRecurse is not enabled, don't delete the keys starts with the same key + return nil, errors.NewCommonEdgeX(errors.KindStatusConflict, fmt.Sprintf("keys having the same prefix %s exist and cannot be deleted", key), nil) + } + } + + return resp, nil +} + +// updateKVS insert or update a single key-value pair with value is simply a string or a map +func updateKVS(connPool *pgxpool.Pool, key string, value any) errors.EdgeX { + ctx := context.Background() + storedValueBytes, err := json.Marshal(value) + if err != nil { + return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to marshal stored value %v with key '%s'", value, key), err) + } + var encBytes []byte + base64.StdEncoding.Encode(encBytes, storedValueBytes) + + var exists bool + err = connPool.QueryRow(ctx, sqlCheckExistsByCol(configTableName, keyCol), key).Scan(&exists) + if err != nil { + return pgClient.WrapDBError(fmt.Sprintf("failed to query value by key '%s'", key), err) + } + + if exists { + _, err = connPool.Exec(ctx, sqlUpdateColsByCondCol(configTableName, keyCol, valueCol, modifiedCol), + encBytes, + time.Now().UTC(), + key, + ) + if err != nil { + return pgClient.WrapDBError(fmt.Sprintf("failed to modified value by key '%s'", key), err) + } + } else { + _, err = connPool.Exec(ctx, sqlInsert(configTableName, keyCol, valueCol), + key, + encBytes, + ) + if err != nil { + return pgClient.WrapDBError(fmt.Sprintf("failed to insert value by key '%s'", key), err) + } + } + return nil +} + +// updateMultiKVSInTx insert or update the key-value pairs in a map within a transaction +func updateMultiKVSInTx(tx pgx.Tx, currentKey string, value any) ([]models.KeyOnly, errors.EdgeX) { + ctx := context.Background() + var keyReps []models.KeyOnly + + switch v := value.(type) { + case bool, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, string, []any: + var exists bool + var sqlStatement string + + err := tx.QueryRow( + context.Background(), + sqlCheckExistsByCol(configTableName, keyCol), + currentKey, + ).Scan(&exists) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query rows by key '%s'", currentKey), err) + } + + storedValueStr := cast.ToString(v) + encStr := base64.StdEncoding.EncodeToString([]byte(storedValueStr)) + if exists { + sqlStatement = sqlUpdateColsByCondCol(configTableName, keyCol, valueCol, modifiedCol) + _, err = tx.Exec(ctx, sqlStatement, encStr, time.Now().UTC(), currentKey) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to update row by key '%s'", currentKey), err) + } + } else { + sqlStatement = sqlInsert(configTableName, keyCol, valueCol) + _, err = tx.Exec(ctx, sqlStatement, currentKey, encStr) + if err != nil { + return nil, pgClient.WrapDBError(fmt.Sprintf("failed to insert row by key '%s'", currentKey), err) + } + } + keyReps = append(keyReps, models.KeyOnly(currentKey)) + case map[string]any: + for innerKey, element := range v { + // if the element type is an empty map, do not add the inner key to the upper level Hash field + if eleMap, ok := element.(map[string]any); ok && len(eleMap) == 0 { + continue + } + + resp, err := updateMultiKVSInTx(tx, path.Join(currentKey, innerKey), element) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + keyReps = append(keyReps, resp...) + } + } + return keyReps, nil +} diff --git a/internal/pkg/infrastructure/postgres/registry.go b/internal/pkg/infrastructure/postgres/registry.go new file mode 100644 index 0000000000..4e4807bf08 --- /dev/null +++ b/internal/pkg/infrastructure/postgres/registry.go @@ -0,0 +1,25 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + "github.com/edgexfoundry/go-mod-core-contracts/v3/models" +) + +func (c *Client) AddRegistration(r models.Registration) (models.Registration, errors.EdgeX) { + return models.Registration{}, nil +} + +func (c *Client) DeleteRegistrationByServiceId(id string) errors.EdgeX { return nil } + +func (c *Client) Registrations() ([]models.Registration, errors.EdgeX) { return nil, nil } + +func (c *Client) RegistrationByServiceId(id string) (models.Registration, errors.EdgeX) { + return models.Registration{}, nil +} + +func (c *Client) UpdateRegistration(r models.Registration) errors.EdgeX { return nil } diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index a624048b85..7bc41681f2 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -67,6 +67,14 @@ func sqlQueryFieldsByCol(table string, fields []string, columns ...string) strin return fmt.Sprintf("SELECT %s FROM %s WHERE %s", queryFieldStr, table, whereCondition) } +// sqlQueryFieldsByColAndLikePat returns the SQL statement for selecting the given fields of rows from the table by the conditions composed of given columns with LIKE operator +func sqlQueryFieldsByColAndLikePat(table string, fields []string, columns ...string) string { + whereCondition := constructWhereLikeCond(columns...) + queryFieldStr := strings.Join(fields, ", ") + + return fmt.Sprintf("SELECT %s FROM %s WHERE %s", queryFieldStr, table, whereCondition) +} + // sqlQueryAllWithTimeRange returns the SQL statement for selecting all rows from the table with a time range. //func sqlQueryAllWithTimeRange(table string) string { // return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2", table, createdCol, createdCol) @@ -182,6 +190,12 @@ func sqlCheckExistsByName(table string) string { // return fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE %s = $1)", table, idCol) //} +// sqlCheckExistsByCol returns the SQL statement for checking if a row exists in the table by where condition. +func sqlCheckExistsByCol(table string, columns ...string) string { + whereCondition := constructWhereCondition(columns...) + return fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE %s)", table, whereCondition) +} + // sqlQueryCount returns the SQL statement for counting the number of rows in the table. func sqlQueryCount(table string) string { return fmt.Sprintf("SELECT COUNT(*) FROM %s", table) @@ -205,6 +219,12 @@ func sqlQueryCountByTimeRangeCol(table string, timeRangeCol string, arrayColName return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition) } +// sqlQueryCountByColAndLikePat returns the SQL statement for counting the number of rows by the given column name with LIKE pattern. +func sqlQueryCountByColAndLikePat(table string, columns ...string) string { + whereCondition := constructWhereLikeCond(columns...) + return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition) +} + // sqlQueryCountByJSONField returns the SQL statement for counting the number of rows in the table by the given JSON query string //func sqlQueryCountByJSONField(table string) (string, errors.EdgeX) { // return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE content @> $1::jsonb", table), nil @@ -232,6 +252,13 @@ func sqlUpdateContentByName(table string) string { return fmt.Sprintf("UPDATE %s SET %s = $1 , %s = $2 WHERE %s = $3", table, contentCol, modifiedCol, nameCol) } +// sqlUpdateColsByCondCol returns the SQL statement for updating the passed columns of a row in the table by condCol. +func sqlUpdateColsByCondCol(table string, condCol string, cols ...string) string { + columnCount := len(cols) + updatedValues := constructUpdatedValues(cols...) + return fmt.Sprintf("UPDATE %s SET %s WHERE %s = $%d", table, updatedValues, condCol, columnCount+1) +} + // sqlUpdateContentById returns the SQL statement for updating the content and modified timestamp of a row in the table by id. //func sqlUpdateContentById(table string) string { // return fmt.Sprintf("UPDATE %s SET %s = $1 , %s = $2 WHERE %s = $3", table, contentCol, modifiedCol, idCol) @@ -267,6 +294,17 @@ func sqlDeleteByColumn(table string, column string) string { return fmt.Sprintf("DELETE FROM %s WHERE %s = $1", table, column) } +// sqlDeleteByColAndLikePat returns the SQL statement for deleting rows by the specified column with LIKE pattern +// and append returnCol as result if not empty +func sqlDeleteByColAndLikePat(table string, column string, returnCol ...string) string { + whereCond := constructWhereLikeCond(column) + deleteStmt := fmt.Sprintf("DELETE FROM %s WHERE %s", table, whereCond) + if len(returnCol) > 0 { + deleteStmt += " RETURNING " + strings.Join(returnCol, ", ") + } + return deleteStmt +} + // sqlDeleteByJSONField returns the SQL statement for deleting rows from the table by the given JSON query string //func sqlDeleteByJSONField(table string) (string, errors.EdgeX) { // return fmt.Sprintf("DELETE FROM %s WHERE content @> $1::jsonb", table), nil @@ -307,3 +345,27 @@ func constructWhereCondWithTimeRange(timeRangeCol string, arrayColNames []string return strings.Join(conditions, " AND ") } + +// constructWhereLikeCond constructs the WHERE condition for the given columns with LIKE operator +func constructWhereLikeCond(columns ...string) string { + columnCount := len(columns) + conditions := make([]string, columnCount) + + for i, column := range columns { + conditions[i] = fmt.Sprintf("%s LIKE $%d", column, i+1) + } + + return strings.Join(conditions, " AND ") +} + +// constructWhereLikeCond constructs the updated values for SET keyword composed of the given columns +func constructUpdatedValues(columns ...string) string { + columnCount := len(columns) + conditions := make([]string, columnCount) + + for i, column := range columns { + conditions[i] = fmt.Sprintf("%s = $%d", column, i+1) + } + + return strings.Join(conditions, ", ") +}