Skip to content

Commit

Permalink
Merge pull request #4919 from jackchenjc/issue-4915
Browse files Browse the repository at this point in the history
feat: Enhance Support Cron Scheduler persistence layer with PostgreSQL
  • Loading branch information
cloudxxx8 authored Sep 18, 2024
2 parents 01f2b53 + 669a647 commit 39fb801
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 224 deletions.
4 changes: 2 additions & 2 deletions cmd/support-cron-scheduler/res/db/sql/00-utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
--
-- SPDX-License-Identifier: Apache-2.0

-- schema for scheduler related tables
CREATE SCHEMA IF NOT EXISTS scheduler;
-- schema for support_scheduler related tables
CREATE SCHEMA IF NOT EXISTS support_scheduler;
13 changes: 5 additions & 8 deletions cmd/support-cron-scheduler/res/db/sql/01-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
--
-- SPDX-License-Identifier: Apache-2.0

-- scheduler.schedule_job is used to store the schedule job information
CREATE TABLE IF NOT EXISTS scheduler.schedule_job (
-- support_scheduler.job is used to store the schedule job information
CREATE TABLE IF NOT EXISTS support_scheduler.job (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
content JSONB NOT NULL,
created timestamp NOT NULL DEFAULT now(),
modified timestamp NOT NULL DEFAULT now()
content JSONB NOT NULL
);

-- scheduler.schedule_action_record is used to store the schedule action record
CREATE TABLE IF NOT EXISTS scheduler.schedule_action_record (
-- support_scheduler.record is used to store the schedule action record
CREATE TABLE IF NOT EXISTS support_scheduler.record (
id UUID PRIMARY KEY,
action_id UUID NOT NULL,
job_name TEXT NOT NULL,
Expand Down
39 changes: 33 additions & 6 deletions internal/pkg/infrastructure/postgres/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,28 @@ package postgres

// constants relate to the postgres db schema names
const (
coreDataSchema = "core_data"
coreKeeperSchema = "core_keeper"
coreDataSchema = "core_data"
coreKeeperSchema = "core_keeper"
supportSchedulerSchema = "support_scheduler"
)

// constants relate to the postgres db table names
const (
eventTableName = coreDataSchema + ".event"
readingTableName = coreDataSchema + ".reading"
configTableName = coreKeeperSchema + ".config"
registryTableName = coreKeeperSchema + ".registry"
configTableName = coreKeeperSchema + ".config"
eventTableName = coreDataSchema + ".event"
readingTableName = coreDataSchema + ".reading"
registryTableName = coreKeeperSchema + ".registry"
scheduleActionRecordTableName = supportSchedulerSchema + ".record"
scheduleJobTableName = supportSchedulerSchema + ".job"
)

// constants relate to the common db table column names
const (
contentCol = "content"
createdCol = "created"
idCol = "id"
modifiedCol = "modified"
statusCol = "status"
)

// constants relate to the event/reading postgres db table column names
Expand All @@ -40,3 +52,18 @@ const (
const (
keyCol = "key"
)

// constants relate to the schedule action record postgres db table column names
const (
actionCol = "action"
actionIdCol = "action_id"
jobNameCol = "job_name"
scheduledAtCol = "scheduled_at"
)

// constants relate to the field names in the content column
const (
labelsField = "Labels"
nameField = "Name"
serviceIdField = "ServiceId"
)
2 changes: 0 additions & 2 deletions internal/pkg/infrastructure/postgres/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

const serviceIdField = "ServiceId"

func (c *Client) AddRegistration(r models.Registration) (models.Registration, errors.EdgeX) {
ctx := context.Background()
exists, edgexErr := checkRegistrationExists(c.ConnPool, ctx, r.ServiceId)
Expand Down
36 changes: 14 additions & 22 deletions internal/pkg/infrastructure/postgres/scheduleactionrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ import (
pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres"
)

const (
scheduleActionRecordTable = "scheduler.schedule_action_record"
actionIdCol = "action_id"
jobNameCol = "job_name"
actionCol = "action"
scheduledAtCol = "scheduled_at"
)

// AddScheduleActionRecord adds a new schedule action record to the database
// Note: the scheduledAt field should be set manually before calling this function.
func (c *Client) AddScheduleActionRecord(ctx context.Context, scheduleActionRecord model.ScheduleActionRecord) (model.ScheduleActionRecord, errors.EdgeX) {
Expand Down Expand Up @@ -59,7 +51,7 @@ func (c *Client) AllScheduleActionRecords(ctx context.Context, start, end int64,
}

startTime, endTime := getUTCStartAndEndTime(start, end)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllWithPaginationAndTimeRange(scheduleActionRecordTable), startTime, endTime, offset, limit)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllWithPaginationAndTimeRange(scheduleActionRecordTableName), startTime, endTime, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule action records", err)
}
Expand All @@ -69,20 +61,20 @@ func (c *Client) AllScheduleActionRecords(ctx context.Context, start, end int64,

// LatestScheduleActionRecordsByJobName queries the latest schedule action records by job name
func (c *Client) LatestScheduleActionRecordsByJobName(ctx context.Context, jobName string) ([]model.ScheduleActionRecord, errors.EdgeX) {
sqlQueryLatestScheduleActionRecords := `
sqlQueryLatestScheduleActionRecords := fmt.Sprintf(`
SELECT id, action_id, job_name, action, status, scheduled_at, created
FROM(
SELECT *
FROM (
SELECT *,
RANK() OVER (PARTITION BY job_name, action_id ORDER BY created DESC) AS rnk
FROM scheduler.schedule_action_record
FROM %s
WHERE job_name = $1
) subquery
WHERE rnk = 1
)
ORDER BY job_name, created DESC;
`
`, scheduleActionRecordTableName)

records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryLatestScheduleActionRecords, jobName)
if err != nil {
Expand All @@ -101,7 +93,7 @@ func (c *Client) ScheduleActionRecordsByStatus(ctx context.Context, status strin
}

startTime, endTime := getUTCStartAndEndTime(start, end)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByStatusWithPaginationAndTimeRange(scheduleActionRecordTable), status, startTime, endTime, offset, limit)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByStatusWithPaginationAndTimeRange(scheduleActionRecordTableName), status, startTime, endTime, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by status %s", status), err)
}
Expand All @@ -118,7 +110,7 @@ func (c *Client) ScheduleActionRecordsByJobName(ctx context.Context, jobName str
}

startTime, endTime := getUTCStartAndEndTime(start, end)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTable, jobNameCol), jobName, startTime, endTime, offset, limit)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTableName, jobNameCol), jobName, startTime, endTime, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by job name %s", jobName), err)
}
Expand All @@ -135,7 +127,7 @@ func (c *Client) ScheduleActionRecordsByJobNameAndStatus(ctx context.Context, jo
}

startTime, endTime := getUTCStartAndEndTime(start, end)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTable, jobNameCol, statusCol), jobName, status, startTime, endTime, offset, limit)
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTableName, jobNameCol, statusCol), jobName, status, startTime, endTime, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by job name %s and status %s", jobName, status), err)
}
Expand All @@ -146,30 +138,30 @@ func (c *Client) ScheduleActionRecordsByJobNameAndStatus(ctx context.Context, jo
// ScheduleActionRecordTotalCount returns the total count of all the schedule action records
func (c *Client) ScheduleActionRecordTotalCount(ctx context.Context, start, end int64) (uint32, errors.EdgeX) {
startTime, endTime := getUTCStartAndEndTime(start, end)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTable, createdCol, nil), startTime, endTime)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTableName, createdCol, nil), startTime, endTime)
}

// ScheduleActionRecordCountByStatus returns the total count of the schedule action records by status
func (c *Client) ScheduleActionRecordCountByStatus(ctx context.Context, status string, start, end int64) (uint32, errors.EdgeX) {
startTime, endTime := getUTCStartAndEndTime(start, end)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTable, createdCol, nil, statusCol), startTime, endTime, status)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTableName, createdCol, nil, statusCol), startTime, endTime, status)
}

// ScheduleActionRecordCountByJobName returns the total count of the schedule action records by job name
func (c *Client) ScheduleActionRecordCountByJobName(ctx context.Context, jobName string, start, end int64) (uint32, errors.EdgeX) {
startTime, endTime := getUTCStartAndEndTime(start, end)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTable, createdCol, nil, jobNameCol), startTime, endTime, jobName)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTableName, createdCol, nil, jobNameCol), startTime, endTime, jobName)
}

// ScheduleActionRecordCountByJobNameAndStatus returns the total count of the schedule action records by job name and status
func (c *Client) ScheduleActionRecordCountByJobNameAndStatus(ctx context.Context, jobName, status string, start, end int64) (uint32, errors.EdgeX) {
startTime, endTime := getUTCStartAndEndTime(start, end)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTable, createdCol, nil, jobNameCol, statusCol), startTime, endTime, jobName, status)
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByTimeRangeCol(scheduleActionRecordTableName, createdCol, nil, jobNameCol, statusCol), startTime, endTime, jobName, status)
}

// DeleteScheduleActionRecordByAge deletes the schedule action records by age
func (c *Client) DeleteScheduleActionRecordByAge(ctx context.Context, age int64) errors.EdgeX {
return deleteScheduleActionRecord(ctx, c.ConnPool, sqlDeleteByAge(scheduleActionRecordTable), age)
return deleteScheduleActionRecord(ctx, c.ConnPool, sqlDeleteByAge(scheduleActionRecordTableName), age)
}

func addScheduleActionRecord(ctx context.Context, connPool *pgxpool.Pool, scheduleActionRecord model.ScheduleActionRecord) (model.ScheduleActionRecord, errors.EdgeX) {
Expand All @@ -185,7 +177,7 @@ func addScheduleActionRecord(ctx context.Context, connPool *pgxpool.Pool, schedu

_, err = connPool.Exec(
ctx,
sqlInsert(scheduleActionRecordTable, idCol, actionIdCol, jobNameCol, actionCol, statusCol, scheduledAtCol),
sqlInsert(scheduleActionRecordTableName, idCol, actionIdCol, jobNameCol, actionCol, statusCol, scheduledAtCol),
scheduleActionRecord.Id,
actionId,
scheduleActionRecord.JobName,
Expand Down Expand Up @@ -232,7 +224,7 @@ func queryScheduleActionRecords(ctx context.Context, connPool *pgxpool.Pool, sql
}

if readErr := rows.Err(); readErr != nil {
return nil, pgClient.WrapDBError("error occurred while query scheduler.schedule_action_record table", readErr)
return nil, pgClient.WrapDBError("error occurred while query support_scheduler.record table", readErr)
}
return scheduleActionRecords, nil
}
Expand Down
Loading

0 comments on commit 39fb801

Please sign in to comment.