Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine/cleanupmgr: individualize schedule data cleanup jobs #4209

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions engine/cleanupmanager/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,28 @@ WHERE id = ANY (
FOR UPDATE
SKIP LOCKED);

-- name: CleanupMgrScheduleData :one
-- CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval.
-- name: CleanupMgrScheduleNeedsCleanup :many
-- CleanupMgrScheduleNeedsCleanup will find schedules that need to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval.
SELECT
schedule_id,
data
schedule_id
FROM
schedule_data
WHERE
data NOTNULL
AND (last_cleanup_at ISNULL
OR last_cleanup_at <= now() - '1 day'::interval * sqlc.arg(cleanup_interval_days)::int)
ORDER BY
last_cleanup_at ASC nulls FIRST
last_cleanup_at ASC nulls FIRST;

-- name: CleanupMgrScheduleData :one
-- CleanupMgrScheduleData will select the schedule data for the given schedule id.
SELECT
data
FROM
schedule_data
WHERE
schedule_id = $1
FOR UPDATE
SKIP LOCKED
LIMIT 1;

-- name: CleanupMgrUpdateScheduleData :exec
Expand Down
86 changes: 66 additions & 20 deletions engine/cleanupmanager/scheddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,67 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/target/goalert/config"
"github.com/target/goalert/gadb"
"github.com/target/goalert/schedule"
)

type SchedDataArgs struct{}
type SchedDataArgs struct {
ScheduleID uuid.UUID
}

func (SchedDataArgs) Kind() string { return "cleanup-manager-sched-data" }

type SchedDataLFW struct{}

func (SchedDataLFW) Kind() string { return "cleanup-manager-sched-data-lfw" }

// LookForWorkScheduleData will automatically look for schedules that need their JSON data cleaned up and insert them into the queue.
func (db *DB) LookForWorkScheduleData(ctx context.Context, j *river.Job[SchedDataLFW]) error {
cfg := config.FromContext(ctx)
if cfg.Maintenance.ScheduleCleanupDays <= 0 {
return nil
}
var outOfDate []uuid.UUID
err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error {
var err error
// Grab schedules that haven't been cleaned up in the last 30 days.
outOfDate, err = gadb.New(tx).CleanupMgrScheduleNeedsCleanup(ctx, 30)
return err
})
if errors.Is(err, sql.ErrNoRows) {
return nil
}
if err != nil {
return err
}

var params []river.InsertManyParams
for _, id := range outOfDate {
params = append(params, river.InsertManyParams{
Args: SchedDataArgs{ScheduleID: id},
InsertOpts: &river.InsertOpts{
Queue: QueueName,
Priority: PriorityTempSched,
UniqueOpts: river.UniqueOpts{ByArgs: true},
},
})
}

if len(params) == 0 {
return nil
}

_, err = river.ClientFromContext[pgx.Tx](ctx).InsertMany(ctx, params)
if err != nil {
return fmt.Errorf("insert many: %w", err)
}

return nil
}

// CleanupScheduleData will automatically cleanup schedule data.
// - Remove temporary-schedule shifts for users that no longer exist.
// - Remove temporary-schedule shifts that occur in the past.
Expand All @@ -29,28 +80,23 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg
if cfg.Maintenance.ScheduleCleanupDays <= 0 {
return nil
}
log := db.logger.With(slog.String("schedule_id", j.Args.ScheduleID.String()))

err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) {
err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) (err error) {
// Grab the next schedule that hasn't been cleaned up in the last 30 days.
dataRow, err := gadb.New(tx).CleanupMgrScheduleData(ctx, 30)
rawData, err := gadb.New(tx).CleanupMgrScheduleData(ctx, j.Args.ScheduleID)
if errors.Is(err, sql.ErrNoRows) {
return true, nil
return nil
}
if err != nil {
return false, fmt.Errorf("get schedule data: %w", err)
return fmt.Errorf("get schedule data: %w", err)
}
log := db.logger.With(slog.String("schedule_id", dataRow.ScheduleID.String()))
gdb := gadb.New(tx)

var data schedule.Data
err = json.Unmarshal(dataRow.Data, &data)
err = json.Unmarshal(rawData, &data)
if err != nil {
log.ErrorContext(ctx,
"failed to unmarshal schedule data, skipping.",
slog.String("error", err.Error()))

// Mark as skipped so we don't keep trying to process it.
return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID)
return fmt.Errorf("unmarshal schedule data: %w", err)
}

// We want to remove shifts for users that no longer exist, so to do that we'll get the set of users from the schedule data and verify them.
Expand All @@ -59,28 +105,28 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg
if len(users) > 0 {
validUsers, err = gdb.CleanupMgrVerifyUsers(ctx, users)
if err != nil {
return false, fmt.Errorf("lookup valid users: %w", err)
return fmt.Errorf("lookup valid users: %w", err)
}
}

now, err := gdb.Now(ctx)
if err != nil {
return false, fmt.Errorf("get current time: %w", err)
return fmt.Errorf("get current time: %w", err)
}
changed := cleanupData(&data, validUsers, now)
if !changed {
return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID)
return gdb.CleanupMgrScheduleDataSkip(ctx, j.Args.ScheduleID)
}

rawData, err := json.Marshal(data)
rawData, err = json.Marshal(data)
if err != nil {
return false, fmt.Errorf("marshal schedule data: %w", err)
return fmt.Errorf("marshal schedule data: %w", err)
}

log.InfoContext(ctx, "Updated schedule data.")
return false, gdb.CleanupMgrUpdateScheduleData(ctx,
return gdb.CleanupMgrUpdateScheduleData(ctx,
gadb.CleanupMgrUpdateScheduleDataParams{
ScheduleID: dataRow.ScheduleID,
ScheduleID: j.Args.ScheduleID,
Data: rawData,
})
})
Expand Down
39 changes: 37 additions & 2 deletions engine/cleanupmanager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ var _ processinglock.Setupable = &DB{}

const QueueName = "cleanup-manager"

const (
PriorityAlertCleanup = iota + 1
PrioritySchedHistory
PriorityTempSchedLFW
PriorityTempSched
)

// whileWork will run the provided function in a loop until it returns done=true.
func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *sql.Tx) (done bool, err error)) error {
var done bool
Expand Down Expand Up @@ -45,8 +52,9 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error {
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts))
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts))
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData))
river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData))

err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2})
err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5})
if err != nil {
return fmt.Errorf("add queue: %w", err)
}
Expand All @@ -56,7 +64,34 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error {
river.PeriodicInterval(time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return AlertArgs{}, &river.InsertOpts{
Queue: QueueName,
Queue: QueueName,
Priority: PriorityAlertCleanup,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
})

args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return ShiftArgs{}, &river.InsertOpts{
Queue: QueueName,
Priority: PrioritySchedHistory,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
})

args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return SchedDataLFW{}, &river.InsertOpts{
Queue: QueueName,
Priority: PriorityTempSchedLFW,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
Expand Down
62 changes: 44 additions & 18 deletions gadb/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading