Skip to content

Commit

Permalink
feat: Support Postgres db in Core Keeper (#4911)
Browse files Browse the repository at this point in the history
* feat: Support Postgres db in Core Keeper

Resolves #4877. Support Postgres db in Core Keeper.

Signed-off-by: Lindsey Cheng <beckysocute@gmail.com>

* fix: Fix the base64 encode error in updateKVS method

Relates #4877. Fix the base64 encode error in updateKVS method.

Signed-off-by: Lindsey Cheng <beckysocute@gmail.com>

---------

Signed-off-by: Lindsey Cheng <beckysocute@gmail.com>
  • Loading branch information
lindseysimple authored Sep 13, 2024
1 parent c631058 commit 2694c23
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 1 deletion.
7 changes: 7 additions & 0 deletions cmd/core-keeper/res/db/sql/00-utils.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--
-- Copyright (C) 2024 IOTech Ltd
--
-- SPDX-License-Identifier: Apache-2.0

-- schema for core-keeper related tables
CREATE SCHEMA IF NOT EXISTS core_keeper;
13 changes: 13 additions & 0 deletions cmd/core-keeper/res/db/sql/01-tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--
-- Copyright (C) 2024 IOTech Ltd
--
-- SPDX-License-Identifier: Apache-2.0

-- core_keeper.config is used to store the config information
CREATE TABLE IF NOT EXISTS core_keeper.config (
id UUID PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL,
created timestamp NOT NULL DEFAULT now(),
modified timestamp NOT NULL DEFAULT now()
);
9 changes: 8 additions & 1 deletion internal/pkg/infrastructure/postgres/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ package postgres

// constants relate to the postgres db schema names
const (
coreDataSchema = "core_data"
coreDataSchema = "core_data"
coreKeeperSchema = "core_keeper"
)

// constants relate to the postgres db table names
const (
eventTableName = coreDataSchema + ".event"
readingTableName = coreDataSchema + ".reading"
configTableName = coreKeeperSchema + ".config"
)

// constants relate to the event/reading postgres db table column names
Expand All @@ -32,3 +34,8 @@ const (
mediaTypeCol = "mediatype"
objectValueCol = "objectvalue"
)

// constants relate to the keeper postgres db table column names
const (
keyCol = "key"
)
275 changes: 275 additions & 0 deletions internal/pkg/infrastructure/postgres/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
//
// Copyright (C) 2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package postgres

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"path"
"time"

pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres"

"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cast"
)

// KeeperKeys returns the values stored for the specified key or with the same key prefix
func (c *Client) KeeperKeys(key string, keyOnly bool, isRaw bool) ([]models.KVResponse, errors.EdgeX) {
var result []models.KVResponse
var sqlStatement string

if keyOnly {
sqlStatement = sqlQueryFieldsByColAndLikePat(configTableName, []string{keyCol}, keyCol)
} else {
sqlStatement = sqlQueryFieldsByColAndLikePat(configTableName, []string{keyCol, valueCol, createdCol, modifiedCol}, keyCol)
}

rows, err := c.ConnPool.Query(context.Background(), sqlStatement, key+"%")
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query rows by key '%s'", key), err)
}

var kvKey string
if keyOnly {
_, err = pgx.ForEachRow(rows, []any{&kvKey}, func() error {
keyOnlyModel := models.KeyOnly(kvKey)
result = append(result, &keyOnlyModel)
return nil
})
} else {
var kvVal string
var created, modified time.Time
_, err = pgx.ForEachRow(rows, []any{&kvKey, &kvVal, &created, &modified}, func() error {
var keyValue any
if isRaw {
decodeValue, decErr := base64.StdEncoding.DecodeString(kvVal)
if decErr != nil {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("decode the value of key %s failed", kvKey), err)
}
keyValue = string(decodeValue)
} else {
keyValue = kvVal
}
kvStore := models.KVS{
Key: kvKey,
StoredData: models.StoredData{
DBTimestamp: models.DBTimestamp{Created: created.UnixMilli(), Modified: modified.UnixMilli()},
Value: keyValue,
},
}
result = append(result, &kvStore)
return nil
})
}
if err != nil {
return nil, pgClient.WrapDBError("failed to scan row to models.KVResponse", err)
}

return result, nil
}

// AddKeeperKeys inserts or updates the key-value pair(s) based on the passed models.KVS
// if isFlatten is enabled, multiple key-value pair(s) will be updated based on the Value from models.KVS
func (c *Client) AddKeeperKeys(kv models.KVS, isFlatten bool) ([]models.KeyOnly, errors.EdgeX) {
var keyReps []models.KeyOnly

if isFlatten {
// process the value map and convert the fields and store to multiple key-value pairs
txErr := pgx.BeginFunc(context.Background(), c.ConnPool, func(tx pgx.Tx) error {
var err error
keyReps, err = updateMultiKVSInTx(tx, kv.Key, kv.Value)
return err
})
if txErr != nil {
return nil, errors.NewCommonEdgeXWrapper(txErr)
}
} else {
// store the value in a single key
err := updateKVS(c.ConnPool, kv.Key, kv.Value)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

keyReps = []models.KeyOnly{models.KeyOnly(kv.Key)}
}
return keyReps, nil
}

// DeleteKeeperKeys deletes one key or multiple keys(with isRecurse enabled)
func (c *Client) DeleteKeeperKeys(key string, isRecurse bool) ([]models.KeyOnly, errors.EdgeX) {
var exists bool
var resp []models.KeyOnly
var childKeyCount uint32
ctx := context.Background()
queryPattern := key + "/%"

// check if the exact same key exists
err := c.ConnPool.QueryRow(
context.Background(),
sqlCheckExistsByCol(configTableName, keyCol),
key,
).Scan(&exists)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key '%s'", key), err)
}

// check if the key(s) start with the keyPrefix exist and get the count of the result
err = c.ConnPool.QueryRow(
context.Background(),
sqlQueryCountByColAndLikePat(configTableName, keyCol),
queryPattern,
).Scan(&childKeyCount)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key starts with '%s'", key), err)
}

if exists {
// delete the exact same key
_, err = c.ConnPool.Exec(ctx, sqlDeleteByColumn(configTableName, keyCol), key)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query row by key '%s'", key), err)
}
resp = []models.KeyOnly{models.KeyOnly(key)}
}

if childKeyCount == 0 {
if !exists {
// key is not found and no other keys starts with key exist
return nil, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, fmt.Sprintf("key '%s' not found", key), nil)
}
} else {
if isRecurse {
// also delete the keys starts with the same key (e.g., edgex/v3/core-data/Writable, edgex/v3/core-data/Database all starts with edgex/v3/core-data)
sqlStatement := sqlDeleteByColAndLikePat(configTableName, keyCol, keyCol)
rows, err := c.ConnPool.Query(ctx, sqlStatement, queryPattern)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to delete row by key starts with '%s'", key), err)
}

var returnedKey string
_, err = pgx.ForEachRow(rows, []any{&returnedKey}, func() error {
resp = append(resp, models.KeyOnly(returnedKey))
return nil
})
if err != nil {
return nil, pgClient.WrapDBError("failed to scan returned keys to models.KeyOnly", err)
}
} else {
// if isRecurse is not enabled, don't delete the keys starts with the same key
return nil, errors.NewCommonEdgeX(errors.KindStatusConflict, fmt.Sprintf("keys having the same prefix %s exist and cannot be deleted", key), nil)
}
}

return resp, nil
}

// updateKVS insert or update a single key-value pair with value is simply a string or a map
func updateKVS(connPool *pgxpool.Pool, key string, value any) errors.EdgeX {
ctx := context.Background()
var storedValueBytes []byte

switch v := value.(type) {
case bool, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, string:
storedValueStr := cast.ToString(v)
storedValueBytes = []byte(storedValueStr)
default:
encBytes, err := json.Marshal(v)
if err != nil {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to marshal stored value %v with key '%s'", value, key), err)
}
storedValueBytes = encBytes
}

// encode the value to a base64 string
storedValue := base64.StdEncoding.EncodeToString(storedValueBytes)

var exists bool
err := connPool.QueryRow(ctx, sqlCheckExistsByCol(configTableName, keyCol), key).Scan(&exists)
if err != nil {
return pgClient.WrapDBError(fmt.Sprintf("failed to query value by key '%s'", key), err)
}

if exists {
// update the key
_, err = connPool.Exec(ctx, sqlUpdateColsByCondCol(configTableName, keyCol, valueCol, modifiedCol),
storedValue,
time.Now().UTC(),
key,
)
if err != nil {
return pgClient.WrapDBError(fmt.Sprintf("failed to modified value by key '%s'", key), err)
}
} else {
// insert the key
_, err = connPool.Exec(ctx, sqlInsert(configTableName, keyCol, valueCol),
key,
storedValue,
)
if err != nil {
return pgClient.WrapDBError(fmt.Sprintf("failed to insert value by key '%s'", key), err)
}
}
return nil
}

// updateMultiKVSInTx insert or update the key-value pairs in a map within a transaction
func updateMultiKVSInTx(tx pgx.Tx, currentKey string, value any) ([]models.KeyOnly, errors.EdgeX) {
ctx := context.Background()
var keyReps []models.KeyOnly

switch v := value.(type) {
case bool, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, string, []any:
var exists bool
var sqlStatement string

err := tx.QueryRow(
context.Background(),
sqlCheckExistsByCol(configTableName, keyCol),
currentKey,
).Scan(&exists)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to query rows by key '%s'", currentKey), err)
}

storedValueStr := cast.ToString(v)
encStr := base64.StdEncoding.EncodeToString([]byte(storedValueStr))
if exists {
sqlStatement = sqlUpdateColsByCondCol(configTableName, keyCol, valueCol, modifiedCol)
_, err = tx.Exec(ctx, sqlStatement, encStr, time.Now().UTC(), currentKey)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to update row by key '%s'", currentKey), err)
}
} else {
sqlStatement = sqlInsert(configTableName, keyCol, valueCol)
_, err = tx.Exec(ctx, sqlStatement, currentKey, encStr)
if err != nil {
return nil, pgClient.WrapDBError(fmt.Sprintf("failed to insert row by key '%s'", currentKey), err)
}
}
keyReps = append(keyReps, models.KeyOnly(currentKey))
case map[string]any:
for innerKey, element := range v {
// if the element type is an empty map, do not add the inner key to the upper level Hash field
if eleMap, ok := element.(map[string]any); ok && len(eleMap) == 0 {
continue
}

resp, err := updateMultiKVSInTx(tx, path.Join(currentKey, innerKey), element)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
keyReps = append(keyReps, resp...)
}
}
return keyReps, nil
}
25 changes: 25 additions & 0 deletions internal/pkg/infrastructure/postgres/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// Copyright (C) 2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package postgres

import (
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
)

func (c *Client) AddRegistration(r models.Registration) (models.Registration, errors.EdgeX) {
return models.Registration{}, nil
}

func (c *Client) DeleteRegistrationByServiceId(id string) errors.EdgeX { return nil }

func (c *Client) Registrations() ([]models.Registration, errors.EdgeX) { return nil, nil }

func (c *Client) RegistrationByServiceId(id string) (models.Registration, errors.EdgeX) {
return models.Registration{}, nil
}

func (c *Client) UpdateRegistration(r models.Registration) errors.EdgeX { return nil }
Loading

0 comments on commit 2694c23

Please sign in to comment.