From 42b02080ff91a05b52ef5a1b02c7991a4307c968 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Thu, 14 Nov 2024 20:26:22 +0530 Subject: [PATCH] feat: add maintenance mode for upgrades (#2211) - Introduces Maintenance mode (status is available via dynamic config: `PEERDB_MAINTENANCE_MODE_ENABLED`) - Maintenance mode consists of 2 Workflows: - `StartMaintenance` - for pre-upgrade, responsible for - Waiting for running snapshots - Updating dynamic config to true - Pausing and backing up currently running mirrors - `EndMaintenance` - for post-upgrade, responsible for - Resuming backed up mirrors - Updating dynamic config to false - During the upgrade (between `Start` and `End`), mirrors cannot be mutated/created in any way, - There is also an instance info API which returns `Ready`/`Maintenance` which can be used for UI changes later. There are 2 ways to trigger these 2 workflows: 1. API call to flow-api 2. Running the new `maintenance` entrypoint with the respective args A new task queue is added so that the maintenance tasks can be spun up even during pre-upgrade hooks (from version earlier than ones containing this PR) and this also ensures that always the latest version of the maintenance flows run irrespective of the old version. --- docker-bake.hcl | 30 ++ flow/activities/maintenance_activity.go | 284 ++++++++++++++++ flow/alerting/alerting.go | 6 +- flow/cmd/api.go | 41 +-- flow/cmd/handler.go | 66 ++++ flow/cmd/maintenance.go | 246 ++++++++++++++ flow/cmd/mirror_status.go | 15 +- flow/cmd/settings.go | 3 +- flow/cmd/validate_mirror.go | 12 + flow/cmd/worker.go | 14 +- flow/go.mod | 47 ++- flow/go.sum | 103 +++++- flow/main.go | 88 +++++ flow/peerdbenv/config.go | 6 + flow/peerdbenv/dynamicconf.go | 32 ++ flow/shared/constants.go | 5 +- flow/shared/telemetry/event_types.go | 10 +- flow/shared/worklow.go | 27 ++ flow/workflows/activities.go | 5 +- flow/workflows/maintenance_flow.go | 305 ++++++++++++++++++ flow/workflows/register.go | 3 + .../migrations/V40__maintenance_flows.sql | 29 ++ protos/flow.proto | 25 ++ protos/route.proto | 40 +++ stacks/flow.Dockerfile | 15 + stacks/peerdb-server.Dockerfile | 4 + stacks/peerdb-ui.Dockerfile | 3 + 27 files changed, 1393 insertions(+), 71 deletions(-) create mode 100644 flow/activities/maintenance_activity.go create mode 100644 flow/cmd/maintenance.go create mode 100644 flow/shared/worklow.go create mode 100644 flow/workflows/maintenance_flow.go create mode 100644 nexus/catalog/migrations/V40__maintenance_flows.sql diff --git a/docker-bake.hcl b/docker-bake.hcl index 6e6098ca14..4927cd5505 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -16,6 +16,7 @@ group "default" { "flow-worker", "flow-api", "flow-snapshot-worker", + "flow-maintenance", "peerdb-ui" ] } @@ -45,6 +46,9 @@ target "flow-snapshot-worker" { "linux/amd64", "linux/arm64", ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } tags = [ "${REGISTRY}/flow-snapshot-worker:${TAG}", "${REGISTRY}/flow-snapshot-worker:${SHA_SHORT}", @@ -59,12 +63,32 @@ target "flow-worker" { "linux/amd64", "linux/arm64", ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } tags = [ "${REGISTRY}/flow-worker:${TAG}", "${REGISTRY}/flow-worker:${SHA_SHORT}", ] } +target "flow-maintenance" { + context = "." + dockerfile = "stacks/flow.Dockerfile" + target = "flow-maintenance" + platforms = [ + "linux/amd64", + "linux/arm64", + ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } + tags = [ + "${REGISTRY}/flow-maintenance:${TAG}", + "${REGISTRY}/flow-maintenance:${SHA_SHORT}", + ] +} + target "peerdb" { context = "." dockerfile = "stacks/peerdb-server.Dockerfile" @@ -72,6 +96,9 @@ target "peerdb" { "linux/amd64", "linux/arm64", ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } tags = [ "${REGISTRY}/peerdb-server:${TAG}", "${REGISTRY}/peerdb-server:${SHA_SHORT}", @@ -85,6 +112,9 @@ target "peerdb-ui" { "linux/amd64", "linux/arm64", ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } tags = [ "${REGISTRY}/peerdb-ui:${TAG}", "${REGISTRY}/peerdb-ui:${SHA_SHORT}", diff --git a/flow/activities/maintenance_activity.go b/flow/activities/maintenance_activity.go new file mode 100644 index 0000000000..be42cc8e56 --- /dev/null +++ b/flow/activities/maintenance_activity.go @@ -0,0 +1,284 @@ +package activities + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/PeerDB-io/peer-flow/alerting" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/shared/telemetry" +) + +const ( + mirrorStateBackup = "backup" + mirrorStateRestored = "restore" +) + +type MaintenanceActivity struct { + CatalogPool *pgxpool.Pool + Alerter *alerting.Alerter + TemporalClient client.Client +} + +func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error) { + rows, err := a.CatalogPool.Query(ctx, ` + select distinct on(name) + id, name, workflow_id, + created_at, coalesce(query_string, '')='' is_cdc + from flows + `) + if err != nil { + return &protos.MaintenanceMirrors{}, err + } + + maintenanceMirrorItems, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MaintenanceMirror, error) { + var info protos.MaintenanceMirror + var createdAt time.Time + err := row.Scan(&info.MirrorId, &info.MirrorName, &info.WorkflowId, &createdAt, &info.IsCdc) + info.MirrorCreatedAt = timestamppb.New(createdAt) + return &info, err + }) + return &protos.MaintenanceMirrors{ + Mirrors: maintenanceMirrorItems, + }, err +} + +func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirror *protos.MaintenanceMirror) (protos.FlowStatus, error) { + return shared.GetWorkflowStatus(ctx, a.TemporalClient, mirror.WorkflowId) +} + +func (a *MaintenanceActivity) WaitForRunningSnapshots(ctx context.Context) (*protos.MaintenanceMirrors, error) { + mirrors, err := a.GetAllMirrors(ctx) + if err != nil { + return &protos.MaintenanceMirrors{}, err + } + + slog.Info("Found mirrors for snapshot check", "mirrors", mirrors, "len", len(mirrors.Mirrors)) + + for _, mirror := range mirrors.Mirrors { + lastStatus, err := a.checkAndWaitIfSnapshot(ctx, mirror, 2*time.Minute) + if err != nil { + return &protos.MaintenanceMirrors{}, err + } + slog.Info("Finished checking and waiting for snapshot", + "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "lastStatus", lastStatus.String()) + } + slog.Info("Finished checking and waiting for all mirrors to finish snapshot") + return mirrors, nil +} + +func (a *MaintenanceActivity) checkAndWaitIfSnapshot( + ctx context.Context, + mirror *protos.MaintenanceMirror, + logEvery time.Duration, +) (protos.FlowStatus, error) { + // In case a mirror was just kicked off, it shows up in the running state, we wait for a bit before checking for snapshot + if mirror.MirrorCreatedAt.AsTime().After(time.Now().Add(-30 * time.Second)) { + slog.Info("Mirror was created less than 30 seconds ago, waiting for it to be ready before checking for snapshot", + "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId) + time.Sleep(30 * time.Second) + } + + flowStatus, err := RunEveryIntervalUntilFinish(ctx, func() (bool, protos.FlowStatus, error) { + activity.RecordHeartbeat(ctx, fmt.Sprintf("Waiting for mirror %s to finish snapshot", mirror.MirrorName)) + mirrorStatus, err := a.getMirrorStatus(ctx, mirror) + if err != nil { + return false, mirrorStatus, err + } + if mirrorStatus == protos.FlowStatus_STATUS_SNAPSHOT || mirrorStatus == protos.FlowStatus_STATUS_SETUP { + return false, mirrorStatus, nil + } + return true, mirrorStatus, nil + }, 10*time.Second, fmt.Sprintf("Waiting for mirror %s to finish snapshot", mirror.MirrorName), logEvery) + return flowStatus, err +} + +func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error { + slog.Info("Enabling maintenance mode") + return peerdbenv.UpdatePeerDBMaintenanceModeEnabled(ctx, a.CatalogPool, true) +} + +func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error { + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return err + } + defer shared.RollbackTx(tx, slog.Default()) + + for _, mirror := range mirrors.Mirrors { + _, err := tx.Exec(ctx, ` + insert into maintenance.maintenance_flows + (flow_id, flow_name, workflow_id, flow_created_at, is_cdc, state, from_version) + values + ($1, $2, $3, $4, $5, $6, $7) + `, mirror.MirrorId, mirror.MirrorName, mirror.WorkflowId, mirror.MirrorCreatedAt.AsTime(), mirror.IsCdc, mirrorStateBackup, + peerdbenv.PeerDBVersionShaShort()) + if err != nil { + return err + } + } + return tx.Commit(ctx) +} + +func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error) { + mirrorStatus, err := a.getMirrorStatus(ctx, mirror) + if err != nil { + return false, err + } + + slog.Info("Checking if mirror is running", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", mirrorStatus.String()) + + if mirrorStatus != protos.FlowStatus_STATUS_RUNNING { + return false, nil + } + + slog.Info("Pausing mirror for maintenance", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId) + + if err := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.PauseSignal); err != nil { + slog.Error("Error signaling mirror running to pause for maintenance", + "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "error", err) + return false, err + } + + return RunEveryIntervalUntilFinish(ctx, func() (bool, bool, error) { + updatedMirrorStatus, statusErr := a.getMirrorStatus(ctx, mirror) + if statusErr != nil { + return false, false, statusErr + } + activity.RecordHeartbeat(ctx, "Waiting for mirror to pause with current status "+updatedMirrorStatus.String()) + if statusErr := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", + model.PauseSignal); statusErr != nil { + return false, false, statusErr + } + if updatedMirrorStatus == protos.FlowStatus_STATUS_PAUSED { + return true, true, nil + } + return false, false, nil + }, 10*time.Second, "Waiting for mirror to pause", 30*time.Second) +} + +func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error { + _, err := a.CatalogPool.Exec(ctx, ` + update maintenance.maintenance_flows + set state = $1, + restored_at = now(), + to_version = $2 + where state = $3 + `, mirrorStateRestored, peerdbenv.PeerDBVersionShaShort(), mirrorStateBackup) + return err +} + +func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error) { + rows, err := a.CatalogPool.Query(ctx, ` + select flow_id, flow_name, workflow_id, flow_created_at, is_cdc + from maintenance.maintenance_flows + where state = $1 + `, mirrorStateBackup) + if err != nil { + return nil, err + } + + maintenanceMirrorItems, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MaintenanceMirror, error) { + var info protos.MaintenanceMirror + var createdAt time.Time + err := row.Scan(&info.MirrorId, &info.MirrorName, &info.WorkflowId, &createdAt, &info.IsCdc) + info.MirrorCreatedAt = timestamppb.New(createdAt) + return &info, err + }) + if err != nil { + return nil, err + } + + return &protos.MaintenanceMirrors{ + Mirrors: maintenanceMirrorItems, + }, nil +} + +func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error { + mirrorStatus, err := a.getMirrorStatus(ctx, mirror) + if err != nil { + return err + } + + if mirrorStatus != protos.FlowStatus_STATUS_PAUSED { + slog.Error("Cannot resume mirror that is not paused", + "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", mirrorStatus.String()) + return nil + } + + // There can also be "workflow already completed" errors, what should we do in that case? + if err := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.NoopSignal); err != nil { + slog.Error("Error signaling mirror to resume for maintenance", + "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "error", err) + return err + } + return nil +} + +func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error { + slog.Info("Disabling maintenance mode") + return peerdbenv.UpdatePeerDBMaintenanceModeEnabled(ctx, a.CatalogPool, false) +} + +func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error { + heartbeatTicker := time.NewTicker(30 * time.Second) + defer heartbeatTicker.Stop() + + alertTicker := time.NewTicker(time.Duration(peerdbenv.PeerDBMaintenanceModeWaitAlertSeconds()) * time.Second) + defer alertTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-heartbeatTicker.C: + activity.RecordHeartbeat(ctx, "Maintenance Workflow is still running") + case <-alertTicker.C: + slog.Warn("Maintenance Workflow is still running") + a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, "Waiting", "Maintenance mode is still running") + } + } +} + +func RunEveryIntervalUntilFinish[T any]( + ctx context.Context, + runFunc func() (finished bool, result T, err error), + runInterval time.Duration, + logMessage string, + logInterval time.Duration, +) (T, error) { + runTicker := time.NewTicker(runInterval) + defer runTicker.Stop() + + logTicker := time.NewTicker(logInterval) + defer logTicker.Stop() + var lastResult T + for { + select { + case <-ctx.Done(): + return lastResult, ctx.Err() + case <-runTicker.C: + finished, result, err := runFunc() + lastResult = result + if err != nil { + return lastResult, err + } + if finished { + return lastResult, err + } + case <-logTicker.C: + slog.Info(logMessage, "lastResult", lastResult) + } + } +} diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index e9df410f91..5f05005d14 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -377,10 +377,10 @@ func (a *Alerter) sendTelemetryMessage( } if a.snsTelemetrySender != nil { - if status, err := a.snsTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil { + if response, err := a.snsTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil { logger.Warn("failed to send message to snsTelemetrySender", slog.Any("error", err)) } else { - logger.Info("received status from snsTelemetrySender", slog.String("status", status)) + logger.Info("received response from snsTelemetrySender", slog.String("response", response)) } } @@ -388,7 +388,7 @@ func (a *Alerter) sendTelemetryMessage( if status, err := a.incidentIoTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil { logger.Warn("failed to send message to incidentIoTelemetrySender", slog.Any("error", err)) } else { - logger.Info("received status from incident.io", slog.String("status", status)) + logger.Info("received response from incident.io", slog.String("response", status)) } } } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index ca225e4292..f81f9d923d 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -191,24 +191,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error { Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), } - if peerdbenv.PeerDBTemporalEnableCertAuth() { - slog.Info("Using temporal certificate/key for authentication") - - certs, err := parseTemporalCertAndKey(ctx) - if err != nil { - return fmt.Errorf("unable to base64 decode certificate and key: %w", err) - } - - connOptions := client.ConnectionOptions{ - TLS: &tls.Config{ - Certificates: certs, - MinVersion: tls.VersionTLS13, - }, - } - clientOptions.ConnectionOptions = connOptions - } - - tc, err := client.Dial(clientOptions) + tc, err := setupTemporalClient(ctx, clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) } @@ -309,3 +292,25 @@ func APIMain(ctx context.Context, args *APIServerParams) error { return nil } + +func setupTemporalClient(ctx context.Context, clientOptions client.Options) (client.Client, error) { + if peerdbenv.PeerDBTemporalEnableCertAuth() { + slog.Info("Using temporal certificate/key for authentication") + + certs, err := parseTemporalCertAndKey(ctx) + if err != nil { + return nil, fmt.Errorf("unable to base64 decode certificate and key: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, + } + clientOptions.ConnectionOptions = connOptions + } + + tc, err := client.Dial(clientOptions) + return tc, err +} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e2d1da2e39..6caefaf47e 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -19,6 +19,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -327,6 +328,17 @@ func (h *FlowRequestHandler) FlowStateChange( ) (*protos.FlowStateChangeResponse, error) { logs := slog.String("flowJobName", req.FlowJobName) slog.Info("FlowStateChange called", logs, slog.Any("req", req)) + underMaintenance, err := peerdbenv.PeerDBMaintenanceModeEnabled(ctx, nil) + if err != nil { + slog.Error("unable to check maintenance mode", logs, slog.Any("error", err)) + return nil, fmt.Errorf("unable to load dynamic config: %w", err) + } + + if underMaintenance { + slog.Warn("Flow state change request denied due to maintenance", logs) + return nil, errors.New("PeerDB is under maintenance") + } + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) if err != nil { slog.Error("[flow-state-change] unable to get workflowID", logs, slog.Any("error", err)) @@ -488,6 +500,14 @@ func (h *FlowRequestHandler) ResyncMirror( ctx context.Context, req *protos.ResyncMirrorRequest, ) (*protos.ResyncMirrorResponse, error) { + underMaintenance, err := peerdbenv.PeerDBMaintenanceModeEnabled(ctx, nil) + if err != nil { + return nil, fmt.Errorf("unable to get maintenance mode status: %w", err) + } + if underMaintenance { + return nil, errors.New("PeerDB is under maintenance") + } + isCDC, err := h.isCDCFlow(ctx, req.FlowJobName) if err != nil { return nil, err @@ -521,3 +541,49 @@ func (h *FlowRequestHandler) ResyncMirror( } return &protos.ResyncMirrorResponse{}, nil } + +func (h *FlowRequestHandler) GetInstanceInfo(ctx context.Context, in *protos.InstanceInfoRequest) (*protos.InstanceInfoResponse, error) { + enabled, err := peerdbenv.PeerDBMaintenanceModeEnabled(ctx, nil) + if err != nil { + slog.Error("unable to get maintenance mode status", slog.Any("error", err)) + return &protos.InstanceInfoResponse{ + Status: protos.InstanceStatus_INSTANCE_STATUS_UNKNOWN, + }, fmt.Errorf("unable to get maintenance mode status: %w", err) + } + if enabled { + return &protos.InstanceInfoResponse{ + Status: protos.InstanceStatus_INSTANCE_STATUS_MAINTENANCE, + }, nil + } + return &protos.InstanceInfoResponse{ + Status: protos.InstanceStatus_INSTANCE_STATUS_READY, + }, nil +} + +func (h *FlowRequestHandler) Maintenance(ctx context.Context, in *protos.MaintenanceRequest) (*protos.MaintenanceResponse, error) { + taskQueueId := shared.MaintenanceFlowTaskQueue + if in.UsePeerflowTaskQueue { + taskQueueId = shared.PeerFlowTaskQueue + } + switch { + case in.Status == protos.MaintenanceStatus_MAINTENANCE_STATUS_START: + workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, h.temporalClient, &protos.StartMaintenanceFlowInput{}, taskQueueId) + if err != nil { + return nil, err + } + return &protos.MaintenanceResponse{ + WorkflowId: workflowRun.GetID(), + RunId: workflowRun.GetRunID(), + }, nil + case in.Status == protos.MaintenanceStatus_MAINTENANCE_STATUS_END: + workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, h.temporalClient, &protos.EndMaintenanceFlowInput{}, taskQueueId) + if err != nil { + return nil, err + } + return &protos.MaintenanceResponse{ + WorkflowId: workflowRun.GetID(), + RunId: workflowRun.GetRunID(), + }, nil + } + return nil, errors.New("invalid maintenance status") +} diff --git a/flow/cmd/maintenance.go b/flow/cmd/maintenance.go new file mode 100644 index 0000000000..474a67db37 --- /dev/null +++ b/flow/cmd/maintenance.go @@ -0,0 +1,246 @@ +package cmd + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/aws/smithy-go/ptr" + "go.temporal.io/sdk/client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +type MaintenanceCLIParams struct { + TemporalHostPort string + TemporalNamespace string + Mode string + FlowGrpcAddress string + SkipIfK8sServiceMissing string + FlowTlsEnabled bool + SkipOnApiVersionMatch bool + SkipOnNoMirrors bool + UseMaintenanceTaskQueue bool + AssumeSkippedMaintenanceWorkflows bool +} + +type StartMaintenanceResult struct { + SkippedReason *string `json:"skippedReason,omitempty"` + APIVersion string `json:"apiVersion,omitempty"` + CLIVersion string `json:"cliVersion,omitempty"` + Skipped bool `json:"skipped,omitempty"` +} + +// MaintenanceMain is the entry point for the maintenance command, requires access to Temporal client, will exit after +// running the requested maintenance workflow +func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error { + slog.Info("Starting Maintenance Mode CLI") + clientOptions := client.Options{ + HostPort: args.TemporalHostPort, + Namespace: args.TemporalNamespace, + Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), + } + tc, err := setupTemporalClient(ctx, clientOptions) + if err != nil { + return fmt.Errorf("unable to create Temporal client: %w", err) + } + + taskQueueId := shared.MaintenanceFlowTaskQueue + if !args.UseMaintenanceTaskQueue { + taskQueueId = shared.PeerFlowTaskQueue + } + + if args.Mode == "start" { + if args.AssumeSkippedMaintenanceWorkflows { + slog.Info("Assuming maintenance workflows were skipped") + return WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String("Assumed skipped by CLI Flag"), + CLIVersion: peerdbenv.PeerDBVersionShaShort(), + }) + } + skipped, err := skipStartMaintenanceIfNeeded(ctx, args) + if err != nil { + return err + } + if skipped { + return nil + } + slog.Info("Running start maintenance workflow") + workflowRun, err := peerflow.RunStartMaintenanceWorkflow(ctx, tc, &protos.StartMaintenanceFlowInput{}, taskQueueId) + if err != nil { + slog.Error("Error running start maintenance workflow", "error", err) + return err + } + var output *protos.StartMaintenanceFlowOutput + if err := workflowRun.Get(ctx, &output); err != nil { + slog.Error("Error in start maintenance workflow", "error", err) + return err + } + slog.Info("Start maintenance workflow completed", "output", output) + return WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: false, + CLIVersion: peerdbenv.PeerDBVersionShaShort(), + }) + } else if args.Mode == "end" { + if input, err := ReadLastMaintenanceOutput(ctx); input != nil || err != nil { + if err != nil { + return err + } + slog.Info("Checking if end maintenance workflow should be skipped", "input", input) + if input.Skipped { + slog.Info("Skipping end maintenance workflow as start maintenance was skipped", "reason", input.SkippedReason) + return nil + } + } + workflowRun, err := peerflow.RunEndMaintenanceWorkflow(ctx, tc, &protos.EndMaintenanceFlowInput{}, taskQueueId) + if err != nil { + slog.Error("Error running end maintenance workflow", "error", err) + return err + } + var output *protos.EndMaintenanceFlowOutput + if err := workflowRun.Get(ctx, &output); err != nil { + slog.Error("Error in end maintenance workflow", "error", err) + return err + } + slog.Info("End maintenance workflow completed", "output", output) + } else { + return fmt.Errorf("unknown flow type %s", args.Mode) + } + slog.Info("Maintenance workflow completed with type", "type", args.Mode) + return nil +} + +func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParams) (bool, error) { + if args.SkipIfK8sServiceMissing != "" { + slog.Info("Checking if k8s service exists", "service", args.SkipIfK8sServiceMissing) + exists, err := CheckK8sServiceExistence(ctx, args.SkipIfK8sServiceMissing) + if err != nil { + return false, err + } + if !exists { + slog.Info("Skipping maintenance workflow due to missing k8s service", "service", args.SkipIfK8sServiceMissing) + return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String(fmt.Sprintf("K8s service %s missing", args.SkipIfK8sServiceMissing)), + CLIVersion: peerdbenv.PeerDBVersionShaShort(), + }) + } + } + if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors { + if args.FlowGrpcAddress == "" { + return false, errors.New("flow address is required when skipping based on API") + } + slog.Info("Constructing flow client") + transportCredentials := credentials.NewTLS(&tls.Config{ + MinVersion: tls.VersionTLS12, + }) + if !args.FlowTlsEnabled { + transportCredentials = insecure.NewCredentials() + } + conn, err := grpc.NewClient(args.FlowGrpcAddress, + grpc.WithTransportCredentials(transportCredentials), + ) + if err != nil { + return false, fmt.Errorf("unable to dial grpc flow server: %w", err) + } + peerFlowClient := protos.NewFlowServiceClient(conn) + if args.SkipOnApiVersionMatch { + slog.Info("Checking if CLI version matches API version", "cliVersion", peerdbenv.PeerDBVersionShaShort()) + version, err := peerFlowClient.GetVersion(ctx, &protos.PeerDBVersionRequest{}) + if err != nil { + return false, err + } + slog.Info("Got version from flow", "version", version.Version) + if version.Version == peerdbenv.PeerDBVersionShaShort() { + slog.Info("Skipping maintenance workflow due to matching versions") + return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String(fmt.Sprintf("CLI version %s matches API version %s", peerdbenv.PeerDBVersionShaShort(), + version.Version)), + APIVersion: version.Version, + CLIVersion: peerdbenv.PeerDBVersionShaShort(), + }) + } + } + if args.SkipOnNoMirrors { + slog.Info("Checking if there are any mirrors") + mirrors, err := peerFlowClient.ListMirrors(ctx, &protos.ListMirrorsRequest{}) + if err != nil { + return false, err + } + slog.Info("Got mirrors from flow", "mirrors", mirrors.Mirrors) + if len(mirrors.Mirrors) == 0 { + slog.Info("Skipping maintenance workflow due to no mirrors") + return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{ + Skipped: true, + SkippedReason: ptr.String("No mirrors found"), + }) + } + } + } + return false, nil +} + +func WriteMaintenanceOutputToCatalog(ctx context.Context, result StartMaintenanceResult) error { + pool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + return err + } + _, err = pool.Exec(ctx, ` + insert into maintenance.start_maintenance_outputs + (cli_version, api_version, skipped, skipped_reason) + values + ($1, $2, $3, $4) + `, result.CLIVersion, result.APIVersion, result.Skipped, result.SkippedReason) + return err +} + +func ReadLastMaintenanceOutput(ctx context.Context) (*StartMaintenanceResult, error) { + pool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + return nil, err + } + var result StartMaintenanceResult + if err := pool.QueryRow(ctx, ` + select cli_version, api_version, skipped, skipped_reason + from maintenance.start_maintenance_outputs + order by created_at desc + limit 1 + `).Scan(&result.CLIVersion, &result.APIVersion, &result.Skipped, &result.SkippedReason); err != nil { + return nil, err + } + return &result, nil +} + +func CheckK8sServiceExistence(ctx context.Context, serviceName string) (bool, error) { + config, err := rest.InClusterConfig() + if err != nil { + return false, err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return false, err + } + _, err = clientset.CoreV1().Services(peerdbenv.GetEnvString("POD_NAMESPACE", "")).Get(ctx, serviceName, v1.GetOptions{}) + if err != nil { + if k8sErrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index a0c4a989e2..58cf20a80a 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -447,20 +447,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) } func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) { - res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) - if err != nil { - slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) - return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) - } - var state protos.FlowStatus - err = res.Get(&state) - if err != nil { - slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) - return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) - } - return state, nil + return shared.GetWorkflowStatus(ctx, h.temporalClient, workflowID) } func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context, diff --git a/flow/cmd/settings.go b/flow/cmd/settings.go index 12e0728590..dd4755f4ae 100644 --- a/flow/cmd/settings.go +++ b/flow/cmd/settings.go @@ -55,8 +55,7 @@ func (h *FlowRequestHandler) PostDynamicSetting( ctx context.Context, req *protos.PostDynamicSettingRequest, ) (*protos.PostDynamicSettingResponse, error) { - _, err := h.pool.Exec(ctx, `insert into dynamic_settings (config_name, config_value) values ($1, $2) - on conflict (config_name) do update set config_value = $2`, req.Name, req.Value) + err := peerdbenv.UpdateDynamicSetting(ctx, h.pool, req.Name, req.Value) if err != nil { slog.Error("[PostDynamicConfig] failed to execute update setting", slog.Any("error", err)) return nil, err diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 3e870aa667..83c9d2a073 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -14,6 +14,7 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared/telemetry" ) @@ -25,6 +26,17 @@ var ( func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, error) { + underMaintenance, err := peerdbenv.PeerDBMaintenanceModeEnabled(ctx, nil) + if err != nil { + slog.Error("unable to check maintenance mode", slog.Any("error", err)) + return nil, fmt.Errorf("unable to load dynamic config: %w", err) + } + + if underMaintenance { + slog.Warn("Validate request denied due to maintenance", "flowName", req.ConnectionConfigs.FlowJobName) + return nil, errors.New("PeerDB is under maintenance") + } + if !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 9db97288cc..5c16376a12 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -30,6 +30,7 @@ type WorkerSetupOptions struct { TemporalMaxConcurrentWorkflowTasks int EnableProfiling bool EnableOtelMetrics bool + UseMaintenanceTaskQueue bool } type workerSetupResponse struct { @@ -124,8 +125,11 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { return nil, fmt.Errorf("unable to create Temporal client: %w", err) } slog.Info("Created temporal client") - - taskQueue := peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue) + queueId := shared.PeerFlowTaskQueue + if opts.UseMaintenanceTaskQueue { + queueId = shared.MaintenanceFlowTaskQueue + } + taskQueue := peerdbenv.PeerFlowTaskQueueName(queueId) slog.Info( fmt.Sprintf("Creating temporal worker for queue %v: %v workflow workers %v activity workers", taskQueue, @@ -170,6 +174,12 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { OtelManager: otelManager, }) + w.RegisterActivity(&activities.MaintenanceActivity{ + CatalogPool: conn, + Alerter: alerting.NewAlerter(context.Background(), conn), + TemporalClient: c, + }) + return &workerSetupResponse{ Client: c, Worker: w, diff --git a/flow/go.mod b/flow/go.mod index b7eb9d1d65..a11ffb5a7e 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -53,22 +53,28 @@ require ( github.com/urfave/cli/v3 v3.0.0-alpha9.2 github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 github.com/yuin/gopher-lua v1.1.1 - go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 - go.opentelemetry.io/otel/metric v1.31.0 - go.opentelemetry.io/otel/sdk v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 + go.opentelemetry.io/otel/metric v1.32.0 + go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel/trace v1.32.0 go.temporal.io/api v1.41.0 go.temporal.io/sdk v1.30.0 go.temporal.io/sdk/contrib/opentelemetry v0.6.0 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.28.0 - golang.org/x/sync v0.8.0 + golang.org/x/sync v0.9.0 google.golang.org/api v0.204.0 google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.1 + k8s.io/apimachinery v0.31.2 + k8s.io/client-go v0.31.2 ) require ( @@ -105,18 +111,29 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect github.com/dvsekhvalnov/jose2go v1.7.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/go-control-plane v0.13.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect github.com/getsentry/sentry-go v0.29.1 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.7.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-openapi/jsonpointer v0.19.6 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.22.4 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect @@ -124,6 +141,9 @@ require ( github.com/lestrrat-go/httprc v1.0.6 // indirect github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nexus-rpc/sdk-go v0.0.11 // indirect @@ -138,14 +158,23 @@ require ( github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect + github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/term v0.25.0 // indirect google.golang.org/grpc/stats/opentelemetry v0.0.0-20241028142157-ada6787961b3 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/api v0.31.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) require ( @@ -165,7 +194,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.3 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect @@ -193,7 +222,7 @@ require ( github.com/pborman/uuid v1.2.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect @@ -201,8 +230,8 @@ require ( golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect diff --git a/flow/go.sum b/flow/go.sum index 71299452d2..7a0380da03 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -180,8 +180,9 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/danieljoos/wincred v1.2.2 h1:774zMFJrqaeYCK2W57BgAem/MLi6mtSE47MB6BOJ0i0= github.com/danieljoos/wincred v1.2.2/go.mod h1:w7w4Utbrz8lqeMbDAK0lkNJUv5sAOkFi7nd/ogr0Uh8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -197,6 +198,8 @@ github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHo github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk= github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -211,6 +214,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= github.com/getsentry/sentry-go v0.29.1 h1:DyZuChN8Hz3ARxGVV8ePaNXh1dQ7d76AiB117xcREwA= @@ -228,9 +233,18 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= +github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= +github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU= +github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= @@ -270,6 +284,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -279,10 +295,16 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM= github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -330,6 +352,10 @@ github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKuao2vNdfD82fjjgPLfyHLpR41Z88viRWs= github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -341,6 +367,7 @@ github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -365,10 +392,17 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.13.0 h1:L8eI8GcuciwUkt41Ej62joSZS4kKaYIUdze+6for9NU= github.com/linkedin/goavro/v2 v2.13.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= github.com/microsoft/go-mssqldb v1.7.2/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= @@ -377,6 +411,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI= github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= @@ -397,8 +435,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= @@ -428,6 +467,8 @@ github.com/slack-go/slack v0.15.0 h1:LE2lj2y9vqqiOf+qIIy0GvEoxgF1N5yLGZffmEZykt0 github.com/slack-go/slack v0.15.0/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/snowflakedb/gosnowflake v1.12.0 h1:Saez8egtn5xAoVMBxFaMu9MYfAG9SS9dpAEXD1/ECIo= github.com/snowflakedb/gosnowflake v1.12.0/go.mod h1:wHfYmZi3zvtWItojesAhWWXBN7+niex2R1h/S7QCZYg= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -458,6 +499,8 @@ github.com/twpayne/go-geos v0.19.0 h1:V7vnLe7gY7JOHLTg8+2oykZOw6wpBLHVNlcnzS2FlG github.com/twpayne/go-geos v0.19.0/go.mod h1:XGpUjCtZf4Ul6BMii6KA4EmJ9JCNhVP1mohdoReopZ8= github.com/urfave/cli/v3 v3.0.0-alpha9.2 h1:CL8llQj3dGRLVQQzHxS+ZYRLanOuhyK1fXgLKD+qV+Y= github.com/urfave/cli/v3 v3.0.0-alpha9.2/go.mod h1:FnIeEMYu+ko8zP1F9Ypr3xkZMIDqW3DR92yUtY39q1Y= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= @@ -486,20 +529,26 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6ei8GFW7kyPYdxJaV2rgI6M+4tvZzhYsQ2wgyVC08= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.temporal.io/api v1.41.0 h1:VYzyWJjJk1jeB9urntA/t7Hiyo2tHdM5xEdtdib4EO8= @@ -512,6 +561,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -554,8 +605,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -570,8 +621,8 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= @@ -579,8 +630,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -645,6 +696,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= @@ -655,5 +708,23 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.31.2 h1:3wLBbL5Uom/8Zy98GRPXpJ254nEFpl+hwndmk9RwmL0= +k8s.io/api v0.31.2/go.mod h1:bWmGvrGPssSK1ljmLzd3pwCQ9MgoTsRCuK35u6SygUk= +k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw= +k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc= +k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/flow/main.go b/flow/main.go index 4001a88912..9d499e957d 100644 --- a/flow/main.go +++ b/flow/main.go @@ -70,6 +70,60 @@ func main() { Sources: cli.EnvVars("TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASKS"), } + maintenanceModeWorkflowFlag := &cli.StringFlag{ + Name: "run-maintenance-flow", + Value: "", + Usage: "Run a maintenance flow. Options are 'start' or 'end'", + Sources: cli.EnvVars("RUN_MAINTENANCE_FLOW"), + } + + maintenanceSkipOnApiVersionMatchFlag := &cli.BoolFlag{ + Name: "skip-on-api-version-match", + Value: false, + Usage: "Skip maintenance flow if the API version matches", + Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_API_VERSION_MATCH"), + } + + maintenanceSkipOnNoMirrorsFlag := &cli.BoolFlag{ + Name: "skip-on-no-mirrors", + Value: false, + Usage: "Skip maintenance flow if there are no mirrors", + Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_MIRRORS"), + } + + flowGrpcAddressFlag := &cli.StringFlag{ + Name: "flow-grpc-address", + Value: "", + Usage: "Address of the flow gRPC server", + Sources: cli.EnvVars("FLOW_GRPC_ADDRESS"), + } + + flowTlsEnabledFlag := &cli.BoolFlag{ + Name: "flow-tls-enabled", + Value: false, + Usage: "Enable TLS for the flow gRPC server", + Sources: cli.EnvVars("FLOW_TLS_ENABLED"), + } + + useMaintenanceTaskQueueFlag := &cli.BoolFlag{ + Name: "use-maintenance-task-queue", + Value: false, + Usage: "Use the maintenance task queue for the worker", + Sources: cli.EnvVars("USE_MAINTENANCE_TASK_QUEUE"), + } + + assumedSkippedMaintenanceWorkflowsFlag := &cli.BoolFlag{ + Name: "assume-skipped-workflow", + Value: false, + Usage: "Skip running maintenance workflows and simply output to catalog", + } + + skipIfK8sServiceMissingFlag := &cli.StringFlag{ + Name: "skip-if-k8s-service-missing", + Value: "", + Usage: "Skip maintenance if the k8s service is missing, generally used during pre-upgrade hook", + } + app := &cli.Command{ Name: "PeerDB Flows CLI", Commands: []*cli.Command{ @@ -85,6 +139,7 @@ func main() { TemporalNamespace: clicmd.String("temporal-namespace"), TemporalMaxConcurrentActivities: int(clicmd.Int("temporal-max-concurrent-activities")), TemporalMaxConcurrentWorkflowTasks: int(clicmd.Int("temporal-max-concurrent-workflow-tasks")), + UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name), }) if err != nil { return err @@ -100,6 +155,7 @@ func main() { temporalNamespaceFlag, temporalMaxConcurrentActivitiesFlag, temporalMaxConcurrentWorkflowTasksFlag, + useMaintenanceTaskQueueFlag, }, }, { @@ -148,6 +204,37 @@ func main() { }) }, }, + { + Name: "maintenance", + Flags: []cli.Flag{ + temporalHostPortFlag, + temporalNamespaceFlag, + maintenanceModeWorkflowFlag, + maintenanceSkipOnApiVersionMatchFlag, + maintenanceSkipOnNoMirrorsFlag, + flowGrpcAddressFlag, + flowTlsEnabledFlag, + useMaintenanceTaskQueueFlag, + assumedSkippedMaintenanceWorkflowsFlag, + skipIfK8sServiceMissingFlag, + }, + Action: func(ctx context.Context, clicmd *cli.Command) error { + temporalHostPort := clicmd.String("temporal-host-port") + + return cmd.MaintenanceMain(ctx, &cmd.MaintenanceCLIParams{ + TemporalHostPort: temporalHostPort, + TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name), + Mode: clicmd.String(maintenanceModeWorkflowFlag.Name), + SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name), + SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name), + FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name), + FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name), + UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name), + AssumeSkippedMaintenanceWorkflows: clicmd.Bool(assumedSkippedMaintenanceWorkflowsFlag.Name), + SkipIfK8sServiceMissing: clicmd.String(skipIfK8sServiceMissingFlag.Name), + }) + }, + }, }, } @@ -164,5 +251,6 @@ func main() { if err := app.Run(appCtx, os.Args); err != nil { log.Printf("error running app: %+v", err) + panic(err) } } diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index e033b87195..9aa9d2c5ed 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -166,3 +166,9 @@ func PeerDBRAPIRequestLoggingEnabled() bool { } return requestLoggingEnabled } + +// PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS tells how long to wait before alerting that peerdb has been stuck in maintenance mode +// for too long +func PeerDBMaintenanceModeWaitAlertSeconds() int { + return getEnvInt("PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS", 600) +} diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index c86c4616a1..566c8ead11 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -8,8 +8,10 @@ import ( "strconv" "time" + "github.com/aws/smithy-go/ptr" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/exp/constraints" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -186,6 +188,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_MAINTENANCE_MODE_ENABLED", + Description: "Whether PeerDB is in maintenance mode, which disables any modifications to mirrors", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, } var DynamicIndex = func() map[string]int { @@ -267,6 +277,20 @@ func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bo return value, nil } +func UpdateDynamicSetting(ctx context.Context, pool *pgxpool.Pool, name string, value *string) error { + if pool == nil { + var err error + pool, err = GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + shared.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool for dynamic setting update", slog.Any("error", err)) + return fmt.Errorf("failed to get catalog connection pool: %w", err) + } + } + _, err := pool.Exec(ctx, `insert into dynamic_settings (config_name, config_value) values ($1, $2) + on conflict (config_name) do update set config_value = $2`, name, value) + return err +} + // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context, env map[string]string) (uint32, error) { return dynamicConfUnsigned[uint32](ctx, env, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD") @@ -364,3 +388,11 @@ func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env m func PeerDBApplicationNamePerMirrorName(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME") } + +func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_MAINTENANCE_MODE_ENABLED") +} + +func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool *pgxpool.Pool, enabled bool) error { + return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled))) +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 2dc5a8a64e..955ecfc4b5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -11,8 +11,9 @@ type ( const ( // Task Queues - PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue" - SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue" + PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue" + SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue" + MaintenanceFlowTaskQueue TaskQueueID = "maintenance-flow-task-queue" // Queries CDCFlowStateQuery = "q-cdc-flow-state" diff --git a/flow/shared/telemetry/event_types.go b/flow/shared/telemetry/event_types.go index 0d87ba3540..a68fab869f 100644 --- a/flow/shared/telemetry/event_types.go +++ b/flow/shared/telemetry/event_types.go @@ -3,7 +3,11 @@ package telemetry type EventType string const ( - CreatePeer EventType = "CreatePeer" - CreateMirror EventType = "CreateMirror" - Other EventType = "Other" + CreatePeer EventType = "CreatePeer" + CreateMirror EventType = "CreateMirror" + StartMaintenance EventType = "StartMaintenance" + EndMaintenance EventType = "EndMaintenance" + MaintenanceWait EventType = "MaintenanceWait" + + Other EventType = "Other" ) diff --git a/flow/shared/worklow.go b/flow/shared/worklow.go new file mode 100644 index 0000000000..c9cafc37e2 --- /dev/null +++ b/flow/shared/worklow.go @@ -0,0 +1,27 @@ +package shared + +import ( + "context" + "fmt" + "log/slog" + + "go.temporal.io/sdk/client" + + "github.com/PeerDB-io/peer-flow/generated/protos" +) + +func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workflowID string) (protos.FlowStatus, error) { + res, err := temporalClient.QueryWorkflow(ctx, workflowID, "", FlowStatusQuery) + if err != nil { + slog.Error("failed to query status in workflow with ID "+workflowID, slog.Any("error", err)) + return protos.FlowStatus_STATUS_UNKNOWN, + fmt.Errorf("failed to query status in workflow with ID %s: %w", workflowID, err) + } + var state protos.FlowStatus + if err := res.Get(&state); err != nil { + slog.Error("failed to get status in workflow with ID "+workflowID, slog.Any("error", err)) + return protos.FlowStatus_STATUS_UNKNOWN, + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) + } + return state, nil +} diff --git a/flow/workflows/activities.go b/flow/workflows/activities.go index 0b23d10dd1..5fe699419c 100644 --- a/flow/workflows/activities.go +++ b/flow/workflows/activities.go @@ -3,6 +3,7 @@ package peerflow import "github.com/PeerDB-io/peer-flow/activities" var ( - flowable *activities.FlowableActivity - snapshot *activities.SnapshotActivity + flowable *activities.FlowableActivity + snapshot *activities.SnapshotActivity + maintenance *activities.MaintenanceActivity ) diff --git a/flow/workflows/maintenance_flow.go b/flow/workflows/maintenance_flow.go new file mode 100644 index 0000000000..c48750a807 --- /dev/null +++ b/flow/workflows/maintenance_flow.go @@ -0,0 +1,305 @@ +package peerflow + +import ( + "context" + "log/slog" + "time" + + tEnums "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared" +) + +func getMaintenanceWorkflowOptions(workflowIDPrefix string, taskQueueId shared.TaskQueueID) client.StartWorkflowOptions { + maintenanceWorkflowOptions := client.StartWorkflowOptions{ + WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, + TaskQueue: peerdbenv.PeerFlowTaskQueueName(taskQueueId), + ID: workflowIDPrefix, + } + if deploymentUid := peerdbenv.PeerDBDeploymentUID(); deploymentUid != "" { + maintenanceWorkflowOptions.ID += "-" + deploymentUid + } + return maintenanceWorkflowOptions +} + +// RunStartMaintenanceWorkflow is a helper function to start the StartMaintenanceWorkflow with sane defaults +func RunStartMaintenanceWorkflow( + ctx context.Context, + temporalClient client.Client, + input *protos.StartMaintenanceFlowInput, + taskQueueId shared.TaskQueueID, +) (client.WorkflowRun, error) { + workflowOptions := getMaintenanceWorkflowOptions("start-maintenance", taskQueueId) + workflowRun, err := temporalClient.ExecuteWorkflow(ctx, workflowOptions, StartMaintenanceWorkflow, input) + if err != nil { + return nil, err + } + return workflowRun, nil +} + +// RunEndMaintenanceWorkflow is a helper function to start the EndMaintenanceWorkflow with sane defaults +func RunEndMaintenanceWorkflow( + ctx context.Context, + temporalClient client.Client, + input *protos.EndMaintenanceFlowInput, + taskQueueId shared.TaskQueueID, +) (client.WorkflowRun, error) { + workflowOptions := getMaintenanceWorkflowOptions("end-maintenance", taskQueueId) + workflowRun, err := temporalClient.ExecuteWorkflow(ctx, workflowOptions, EndMaintenanceWorkflow, &protos.EndMaintenanceFlowInput{}) + if err != nil { + return nil, err + } + return workflowRun, nil +} + +func StartMaintenanceWorkflow(ctx workflow.Context, input *protos.StartMaintenanceFlowInput) (*protos.StartMaintenanceFlowOutput, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Starting StartMaintenance workflow", "input", input) + defer runBackgroundAlerter(ctx)() + + maintenanceFlowOutput, err := startMaintenance(ctx, logger) + if err != nil { + slog.Error("Error in StartMaintenance workflow", "error", err) + return nil, err + } + return maintenanceFlowOutput, nil +} + +func startMaintenance(ctx workflow.Context, logger log.Logger) (*protos.StartMaintenanceFlowOutput, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + }) + + snapshotWaitCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 1 * time.Minute, + }) + waitSnapshotsFuture := workflow.ExecuteActivity(snapshotWaitCtx, + maintenance.WaitForRunningSnapshots, + ) + err := waitSnapshotsFuture.Get(snapshotWaitCtx, nil) + if err != nil { + return nil, err + } + + enableCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + enableMaintenanceFuture := workflow.ExecuteActivity(enableCtx, maintenance.EnableMaintenanceMode) + + if err := enableMaintenanceFuture.Get(enableCtx, nil); err != nil { + return nil, err + } + + logger.Info("Waiting for all snapshot mirrors to finish snapshotting") + waitSnapshotsPostEnableFuture := workflow.ExecuteActivity(snapshotWaitCtx, + maintenance.WaitForRunningSnapshots, + ) + + if err := waitSnapshotsPostEnableFuture.Get(snapshotWaitCtx, nil); err != nil { + return nil, err + } + + mirrorsList, err := getAllMirrors(ctx) + if err != nil { + return nil, err + } + + runningMirrors, err := pauseAndGetRunningMirrors(ctx, mirrorsList, logger) + if err != nil { + return nil, err + } + + backupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 2 * time.Minute, + }) + future := workflow.ExecuteActivity(backupCtx, maintenance.BackupAllPreviouslyRunningFlows, runningMirrors) + + if err := future.Get(backupCtx, nil); err != nil { + return nil, err + } + version, err := GetPeerDBVersion(ctx) + if err != nil { + return nil, err + } + logger.Info("StartMaintenance workflow completed", "version", version) + return &protos.StartMaintenanceFlowOutput{ + Version: version, + }, nil +} + +func pauseAndGetRunningMirrors( + ctx workflow.Context, + mirrorsList *protos.MaintenanceMirrors, + logger log.Logger, +) (*protos.MaintenanceMirrors, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 1 * time.Minute, + }) + selector := workflow.NewSelector(ctx) + runningMirrors := make([]bool, len(mirrorsList.Mirrors)) + for i, mirror := range mirrorsList.Mirrors { + f := workflow.ExecuteActivity( + ctx, + maintenance.PauseMirrorIfRunning, + mirror, + ) + + selector.AddFuture(f, func(f workflow.Future) { + var wasRunning bool + err := f.Get(ctx, &wasRunning) + if err != nil { + logger.Error("Error checking and pausing mirror", "mirror", mirror, "error", err) + } else { + logger.Info("Finished check and pause for mirror", "mirror", mirror, "wasRunning", wasRunning) + runningMirrors[i] = wasRunning + } + }) + } + onlyRunningMirrors := make([]*protos.MaintenanceMirror, 0, len(mirrorsList.Mirrors)) + for range mirrorsList.Mirrors { + selector.Select(ctx) + if err := ctx.Err(); err != nil { + return nil, err + } + } + for i, mirror := range mirrorsList.Mirrors { + if runningMirrors[i] { + onlyRunningMirrors = append(onlyRunningMirrors, mirror) + } + } + return &protos.MaintenanceMirrors{ + Mirrors: onlyRunningMirrors, + }, nil +} + +func getAllMirrors(ctx workflow.Context) (*protos.MaintenanceMirrors, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 2 * time.Minute, + }) + getMirrorsFuture := workflow.ExecuteActivity(ctx, maintenance.GetAllMirrors) + var mirrorsList protos.MaintenanceMirrors + err := getMirrorsFuture.Get(ctx, &mirrorsList) + return &mirrorsList, err +} + +func EndMaintenanceWorkflow(ctx workflow.Context, input *protos.EndMaintenanceFlowInput) (*protos.EndMaintenanceFlowOutput, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Starting EndMaintenance workflow", "input", input) + defer runBackgroundAlerter(ctx)() + + flowOutput, err := endMaintenance(ctx, logger) + if err != nil { + slog.Error("Error in EndMaintenance workflow", "error", err) + return nil, err + } + return flowOutput, nil +} + +func endMaintenance(ctx workflow.Context, logger log.Logger) (*protos.EndMaintenanceFlowOutput, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 1 * time.Minute, + }) + + mirrorsList, err := resumeBackedUpMirrors(ctx, logger) + if err != nil { + return nil, err + } + + clearBackupsFuture := workflow.ExecuteActivity(ctx, maintenance.CleanBackedUpFlows) + if err := clearBackupsFuture.Get(ctx, nil); err != nil { + return nil, err + } + + logger.Info("Resumed backed up mirrors", "mirrors", mirrorsList) + + disableCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + future := workflow.ExecuteActivity(disableCtx, maintenance.DisableMaintenanceMode) + if err := future.Get(disableCtx, nil); err != nil { + return nil, err + } + logger.Info("Disabled maintenance mode") + version, err := GetPeerDBVersion(ctx) + if err != nil { + return nil, err + } + + logger.Info("EndMaintenance workflow completed", "version", version) + return &protos.EndMaintenanceFlowOutput{ + Version: version, + }, nil +} + +func resumeBackedUpMirrors(ctx workflow.Context, logger log.Logger) (*protos.MaintenanceMirrors, error) { + future := workflow.ExecuteActivity(ctx, maintenance.GetBackedUpFlows) + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + var mirrorsList *protos.MaintenanceMirrors + err := future.Get(ctx, &mirrorsList) + if err != nil { + return nil, err + } + + selector := workflow.NewSelector(ctx) + for _, mirror := range mirrorsList.Mirrors { + activityInput := mirror + f := workflow.ExecuteActivity( + ctx, + maintenance.ResumeMirror, + activityInput, + ) + + selector.AddFuture(f, func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + logger.Error("Error resuming mirror", "mirror", mirror, "error", err) + } else { + logger.Info("Finished resuming mirror", "mirror", mirror) + } + }) + } + + for range mirrorsList.Mirrors { + selector.Select(ctx) + if err := ctx.Err(); err != nil { + return nil, err + } + } + return mirrorsList, nil +} + +// runBackgroundAlerter Alerts every few minutes regarding currently running maintenance workflows +func runBackgroundAlerter(ctx workflow.Context) workflow.CancelFunc { + activityCtx, cancelActivity := workflow.WithCancel(ctx) + alerterCtx := workflow.WithActivityOptions(activityCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 1 * time.Minute, + }) + workflow.ExecuteActivity(alerterCtx, maintenance.BackgroundAlerter) + return cancelActivity +} + +func GetPeerDBVersion(wCtx workflow.Context) (string, error) { + activityCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ + StartToCloseTimeout: time.Minute, + }) + getVersionActivity := func(ctx context.Context) (string, error) { + return peerdbenv.PeerDBVersionShaShort(), nil + } + var version string + future := workflow.ExecuteLocalActivity(activityCtx, getVersionActivity) + err := future.Get(activityCtx, &version) + return version, err +} diff --git a/flow/workflows/register.go b/flow/workflows/register.go index 35adf135bf..2c4b32ba3c 100644 --- a/flow/workflows/register.go +++ b/flow/workflows/register.go @@ -18,4 +18,7 @@ func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { w.RegisterWorkflow(GlobalScheduleManagerWorkflow) w.RegisterWorkflow(HeartbeatFlowWorkflow) w.RegisterWorkflow(RecordSlotSizeWorkflow) + + w.RegisterWorkflow(StartMaintenanceWorkflow) + w.RegisterWorkflow(EndMaintenanceWorkflow) } diff --git a/nexus/catalog/migrations/V40__maintenance_flows.sql b/nexus/catalog/migrations/V40__maintenance_flows.sql new file mode 100644 index 0000000000..e43e8eb927 --- /dev/null +++ b/nexus/catalog/migrations/V40__maintenance_flows.sql @@ -0,0 +1,29 @@ +CREATE SCHEMA IF NOT EXISTS maintenance; + +CREATE TABLE IF NOT EXISTS maintenance.maintenance_flows +( + id SERIAL PRIMARY KEY, + flow_id BIGINT NOT NULL, + flow_name TEXT NOT NULL, + workflow_id TEXT NOT NULL, + flow_created_at TIMESTAMP NOT NULL, + is_cdc BOOLEAN NOT NULL, + state TEXT NOT NULL, + restored_at TIMESTAMP, + from_version TEXT, + to_version TEXT +); + +CREATE INDEX IF NOT EXISTS idx_maintenance_flows_state ON maintenance.maintenance_flows (state); + +CREATE TABLE IF NOT EXISTS maintenance.start_maintenance_outputs +( + id SERIAL PRIMARY KEY, + api_version TEXT NOT NULL, + cli_version TEXT NOT NULL, + skipped BOOLEAN NOT NULL, + skipped_reason TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_start_maintenance_outputs_created_at ON maintenance.start_maintenance_outputs (created_at DESC); diff --git a/protos/flow.proto b/protos/flow.proto index d1681fd8d5..de7bf740d0 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -466,3 +466,28 @@ message DropFlowActivityInput { string peer_name = 2; } +message StartMaintenanceFlowInput { +} + +message StartMaintenanceFlowOutput { + string version = 1; +} + +message EndMaintenanceFlowInput { +} + +message EndMaintenanceFlowOutput { + string version = 1; +} + +message MaintenanceMirror { + int64 mirror_id = 1; + string mirror_name = 2; + string workflow_id = 3; + bool is_cdc = 4; + google.protobuf.Timestamp mirror_created_at = 5; +} + +message MaintenanceMirrors { + repeated MaintenanceMirror mirrors = 1; +} diff --git a/protos/route.proto b/protos/route.proto index 0265f221ee..1c6d38ed69 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -441,6 +441,38 @@ message ResyncMirrorRequest { message ResyncMirrorResponse { } +message PeerDBStateRequest { +} + +enum InstanceStatus { + INSTANCE_STATUS_UNKNOWN = 0; + INSTANCE_STATUS_READY = 1; + INSTANCE_STATUS_MAINTENANCE = 3; +} + +message InstanceInfoRequest { +} + +message InstanceInfoResponse { + InstanceStatus status = 1; +} + +enum MaintenanceStatus { + MAINTENANCE_STATUS_UNKNOWN = 0; + MAINTENANCE_STATUS_START = 1; + MAINTENANCE_STATUS_END = 2; +} + +message MaintenanceRequest { + MaintenanceStatus status = 1; + bool use_peerflow_task_queue = 2; +} + +message MaintenanceResponse { + string workflow_id = 1; + string run_id = 2; +} + service FlowService { rpc ValidatePeer(ValidatePeerRequest) returns (ValidatePeerResponse) { option (google.api.http) = { @@ -595,4 +627,12 @@ service FlowService { rpc ResyncMirror(ResyncMirrorRequest) returns (ResyncMirrorResponse) { option (google.api.http) = { post: "/v1/mirrors/resync", body: "*" }; } + + rpc GetInstanceInfo(InstanceInfoRequest) returns (InstanceInfoResponse) { + option (google.api.http) = { get: "/v1/instance/info" }; + } + + rpc Maintenance(MaintenanceRequest) returns (MaintenanceResponse) { + option (google.api.http) = { post: "/v1/instance/maintenance", body: "*" }; + } } diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 0f997777e9..13fc5b0895 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -45,6 +45,8 @@ FROM flow-base AS flow-worker # Sane defaults for OpenTelemetry ENV OTEL_METRIC_EXPORT_INTERVAL=10000 ENV OTEL_EXPORTER_OTLP_COMPRESSION=gzip +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} ENTRYPOINT [\ "./peer-flow",\ @@ -52,7 +54,20 @@ ENTRYPOINT [\ ] FROM flow-base AS flow-snapshot-worker + +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} ENTRYPOINT [\ "./peer-flow",\ "snapshot-worker"\ ] + + +FROM flow-base AS flow-maintenance + +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} +ENTRYPOINT [\ + "./peer-flow",\ + "maintenance"\ + ] diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index 689e3cf5b9..3e9db5240d 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -29,4 +29,8 @@ RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \ USER peerdb WORKDIR /home/peerdb COPY --from=builder --chown=peerdb /root/nexus/target/release/peerdb-server . + +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} + ENTRYPOINT ["./peerdb-server"] diff --git a/stacks/peerdb-ui.Dockerfile b/stacks/peerdb-ui.Dockerfile index cd99e61a5f..def0aad72e 100644 --- a/stacks/peerdb-ui.Dockerfile +++ b/stacks/peerdb-ui.Dockerfile @@ -35,5 +35,8 @@ ENV PORT 3000 # set hostname to localhost ENV HOSTNAME "0.0.0.0" +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} + ENTRYPOINT ["/app/entrypoint.sh"] CMD ["node", "server.js"]