diff --git a/Makefile b/Makefile index f61b995..2925bfc 100644 --- a/Makefile +++ b/Makefile @@ -119,5 +119,9 @@ examples-backend: go run examples/backends/base.go examples/backends/gorm.go go run examples/backends/base.go examples/backends/mongodb.go +.PHONY: examples-event +examples-event: + go run examples/event/event.go + .PHONY: examples-all -examples-all: examples-store examples-api examples-queue examples-backend +examples-all: examples-store examples-api examples-queue examples-backend examples-event diff --git a/README.md b/README.md index da62b08..434f0f9 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler) [![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE) -> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, remote call, and cluster +> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, event listening, remote call, and cluster English | [简体中文](README.zh-CN.md) @@ -36,6 +36,9 @@ English | [简体中文](README.zh-CN.md) - [x] Memory (Cluster mode is not supported) - [x] [GORM](https://gorm.io/) (any RDBMS supported by GORM works) - [x] [MongoDB](https://www.mongodb.com/) +- Supports event listening + - [x] Scheduler event + - [x] Job event - Supports remote call - [x] [gRPC](https://grpc.io/) - [x] HTTP @@ -78,8 +81,9 @@ func main() { agscheduler.FuncPkg{Func: printMsg}, ) - store := &stores.MemoryStore{} scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} scheduler.SetStore(store) job1 := agscheduler.Job{ @@ -132,7 +136,7 @@ func main() { ```go mq := &queues.MemoryQueue{} -brk := &agscheduler.Broker{ +broker := &agscheduler.Broker{ Queues: map[string]agscheduler.QueuePkg{ "default": { Queue: mq, @@ -141,21 +145,40 @@ brk := &agscheduler.Broker{ }, } -scheduler.SetStore(store) -scheduler.SetBroker(brk) +scheduler.SetBroker(broker) ``` ## Result Collection ```go mb := &backends.MemoryBackend{} -rec := &agscheduler.Recorder{Backend: mb} +recorder := &agscheduler.Recorder{Backend: mb} -scheduler.SetStore(store) -scheduler.SetRecorder(rec) +scheduler.SetRecorder(recorder) job, _ = scheduler.AddJob(job) -records, _ := rec.GetRecords(job.Id) +records, _ := recorder.GetRecords(job.Id) +``` + +## Event listening + +```go +func jobCallback(ep agscheduler.EventPkg) { + slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId)) +} + +...... + +listener := &agscheduler.Listener{ + Callbacks: []agscheduler.CallbackPkg{ + { + Callback: jobCallback, + Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED, + }, + }, +} + +scheduler.SetListener(listener) ``` ## gRPC @@ -202,7 +225,6 @@ cnMain := &agscheduler.ClusterNode{ EndpointHTTP: "127.0.0.1:36370", Queue: "default", } -schedulerMain.SetStore(storeMain) schedulerMain.SetClusterNode(ctx, cnMain) cserviceMain := &services.ClusterService{Cn: cnMain} cserviceMain.Start() @@ -215,7 +237,6 @@ cnNode := &agscheduler.ClusterNode{ EndpointHTTP: "127.0.0.1:36371", Queue: "worker", } -schedulerNode.SetStore(storeNode) schedulerNode.SetClusterNode(ctx, cnNode) cserviceNode := &services.ClusterService{Cn: cnNode} cserviceNode.Start() diff --git a/README.zh-CN.md b/README.zh-CN.md index 05d34c7..816594a 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -8,7 +8,7 @@ ![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler) [![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE) -> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持远程调用,支持集群 +> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持事件监听, 支持远程调用,支持集群 [English](README.md) | 简体中文 @@ -36,6 +36,9 @@ - [x] Memory (不支持集群模式) - [x] [GORM](https://gorm.io/) (任何 GORM 支持的 RDBMS 都能运行) - [x] [MongoDB](https://www.mongodb.com/) +- 支持事件监听 + - [x] 调度器事件 + - [x] 作业事件 - 支持远程调用 - [x] [gRPC](https://grpc.io/) - [x] HTTP @@ -78,8 +81,9 @@ func main() { agscheduler.FuncPkg{Func: printMsg}, ) - store := &stores.MemoryStore{} scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} scheduler.SetStore(store) job1 := agscheduler.Job{ @@ -132,7 +136,7 @@ func main() { ```go mq := &queues.MemoryQueue{} -brk := &agscheduler.Broker{ +broker := &agscheduler.Broker{ Queues: map[string]agscheduler.QueuePkg{ "default": { Queue: mq, @@ -141,21 +145,39 @@ brk := &agscheduler.Broker{ }, } -scheduler.SetStore(store) -scheduler.SetBroker(brk) +scheduler.SetBroker(broker) ``` ## 结果回收 ```go mb := &backends.MemoryBackend{} -rec := &agscheduler.Recorder{Backend: mb} +recorder := &agscheduler.Recorder{Backend: mb} -scheduler.SetStore(store) -scheduler.SetRecorder(rec) +scheduler.SetRecorder(recorder) job, _ = scheduler.AddJob(job) -records, _ := rec.GetRecords(job.Id) +records, _ := recorder.GetRecords(job.Id) +``` + +## 事件监听 +```go +func jobCallback(ep agscheduler.EventPkg) { + slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId)) +} + +...... + +listener := &agscheduler.Listener{ + Callbacks: []agscheduler.CallbackPkg{ + { + Callback: jobCallback, + Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED, + }, + }, +} + +scheduler.SetListener(listener) ``` ## gRPC @@ -202,7 +224,6 @@ cnMain := &agscheduler.ClusterNode{ EndpointHTTP: "127.0.0.1:36370", Queue: "default", } -schedulerMain.SetStore(storeMain) schedulerMain.SetClusterNode(ctx, cnMain) cserviceMain := &services.ClusterService{Cn: cnMain} cserviceMain.Start() @@ -215,7 +236,6 @@ cnNode := &agscheduler.ClusterNode{ EndpointHTTP: "127.0.0.1:36371", Queue: "worker", } -schedulerNode.SetStore(storeNode) schedulerNode.SetClusterNode(ctx, cnNode) cserviceNode := &services.ClusterService{Cn: cnNode} cserviceNode.Start() diff --git a/backends/base_test.go b/backends/base_test.go index 6fb3a49..b75d2c9 100644 --- a/backends/base_test.go +++ b/backends/base_test.go @@ -1,6 +1,7 @@ package backends import ( + "context" "log/slog" "testing" "time" @@ -8,13 +9,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/agscheduler/agscheduler" - "github.com/agscheduler/agscheduler/examples" "github.com/agscheduler/agscheduler/stores" ) +func dryRunRecorder(ctx context.Context, j agscheduler.Job) (result string) { return } + func runTest(t *testing.T, rec *agscheduler.Recorder) { agscheduler.RegisterFuncs( - agscheduler.FuncPkg{Func: examples.PrintMsg}, + agscheduler.FuncPkg{Func: dryRunRecorder}, ) s := &agscheduler.Scheduler{} @@ -28,7 +30,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) { Name: "Job", Type: agscheduler.JOB_TYPE_INTERVAL, Interval: "2s", - Func: examples.PrintMsg, + Func: dryRunRecorder, } job, err = s.AddJob(job) assert.NoError(t, err) @@ -37,7 +39,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) { Name: "Job2", Type: agscheduler.JOB_TYPE_DATETIME, StartAt: "2023-09-22 07:30:08", - Func: examples.PrintMsg, + Func: dryRunRecorder, } _, err = s.AddJob(job2) assert.NoError(t, err) diff --git a/examples/cluster/cluster_node.go b/examples/cluster/cluster_node.go index 0275182..1450388 100644 --- a/examples/cluster/cluster_node.go +++ b/examples/cluster/cluster_node.go @@ -42,7 +42,14 @@ func main() { flag.Parse() + scheduler := &agscheduler.Scheduler{} + store := &stores.MemoryStore{} + err := scheduler.SetStore(store) + if err != nil { + slog.Error(fmt.Sprintf("Failed to set store: %s", err)) + os.Exit(1) + } cn := &agscheduler.ClusterNode{ EndpointMain: *endpointMain, @@ -52,13 +59,6 @@ func main() { Queue: *queue, Mode: *mode, } - - scheduler := &agscheduler.Scheduler{} - err := scheduler.SetStore(store) - if err != nil { - slog.Error(fmt.Sprintf("Failed to set store: %s", err)) - os.Exit(1) - } err = scheduler.SetClusterNode(context.TODO(), cn) if err != nil { slog.Error(fmt.Sprintf("Failed to set cluster node: %s", err)) diff --git a/examples/event/event.go b/examples/event/event.go new file mode 100644 index 0000000..c575913 --- /dev/null +++ b/examples/event/event.go @@ -0,0 +1,61 @@ +// go run examples/event/event.go + +package main + +import ( + "fmt" + "log/slog" + "os" + "time" + + "github.com/agscheduler/agscheduler" + "github.com/agscheduler/agscheduler/examples" + "github.com/agscheduler/agscheduler/stores" +) + +func jobCallback(ep agscheduler.EventPkg) { + slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId)) +} + +func main() { + agscheduler.RegisterFuncs( + agscheduler.FuncPkg{Func: examples.PrintMsg}, + ) + + s := &agscheduler.Scheduler{} + + sto := &stores.MemoryStore{} + err := s.SetStore(sto) + if err != nil { + slog.Error(fmt.Sprintf("Failed to set store: %s", err)) + os.Exit(1) + } + + lis := &agscheduler.Listener{ + Callbacks: []agscheduler.CallbackPkg{ + { + Callback: jobCallback, + Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED, + }, + }, + } + err = s.SetListener(lis) + if err != nil { + slog.Error(fmt.Sprintf("Failed to set listener: %s", err)) + os.Exit(1) + } + + job := agscheduler.Job{ + Name: "Job", + Type: agscheduler.JOB_TYPE_INTERVAL, + Interval: "2s", + Func: examples.PrintMsg, + } + job, _ = s.AddJob(job) + + job, _ = s.PauseJob(job.Id) + + _ = s.DeleteJob(job.Id) + + time.Sleep(1 * time.Second) +} diff --git a/examples/grpc/grpc.go b/examples/grpc/grpc.go index ca8ad73..c326a50 100644 --- a/examples/grpc/grpc.go +++ b/examples/grpc/grpc.go @@ -123,9 +123,9 @@ func main() { agscheduler.FuncPkg{Func: examples.PrintMsg}, ) - store := &stores.MemoryStore{} - scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} err := scheduler.SetStore(store) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) diff --git a/examples/grpc/grpc_server.go b/examples/grpc/grpc_server.go index 591a6b2..485f2b0 100644 --- a/examples/grpc/grpc_server.go +++ b/examples/grpc/grpc_server.go @@ -18,9 +18,9 @@ func main() { agscheduler.FuncPkg{Func: examples.PrintMsg}, ) - store := &stores.MemoryStore{} - scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} err := scheduler.SetStore(store) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) diff --git a/examples/http/http.go b/examples/http/http.go index 93fb773..6e26b6a 100644 --- a/examples/http/http.go +++ b/examples/http/http.go @@ -145,9 +145,9 @@ func main() { agscheduler.FuncPkg{Func: examples.PrintMsg}, ) - store := &stores.MemoryStore{} - scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} err := scheduler.SetStore(store) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) diff --git a/examples/http/http_server.go b/examples/http/http_server.go index 038c28d..645787d 100644 --- a/examples/http/http_server.go +++ b/examples/http/http_server.go @@ -18,9 +18,9 @@ func main() { agscheduler.FuncPkg{Func: examples.PrintMsg}, ) - store := &stores.MemoryStore{} - scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} err := scheduler.SetStore(store) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) diff --git a/examples/queues/base.go b/examples/queues/base.go index 4d0ba3c..0d5c8b7 100644 --- a/examples/queues/base.go +++ b/examples/queues/base.go @@ -22,13 +22,15 @@ func runExample(brk *agscheduler.Broker) { agscheduler.FuncPkg{Func: examples.PrintMsgSleep}, ) - sto := &stores.MemoryStore{} s := &agscheduler.Scheduler{} + + sto := &stores.MemoryStore{} err := s.SetStore(sto) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) os.Exit(1) } + ctx, cancel := context.WithCancel(ctx) err = s.SetBroker(ctx, brk) if err != nil { diff --git a/examples/stores/base.go b/examples/stores/base.go index 2d32a1e..bf12d38 100644 --- a/examples/stores/base.go +++ b/examples/stores/base.go @@ -19,6 +19,7 @@ func runExample(sto agscheduler.Store) { ) s := &agscheduler.Scheduler{} + err := s.SetStore(sto) if err != nil { slog.Error(fmt.Sprintf("Failed to set store: %s", err)) diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..b193287 --- /dev/null +++ b/listener.go @@ -0,0 +1,77 @@ +package agscheduler + +import ( + "fmt" + "log/slog" + "runtime/debug" +) + +type event uint32 + +// constant indicating the event. +const ( + EVENT_SCHEDULER_STARTED event = 1 << iota + EVENT_SCHEDULER_STOPPED + + EVENT_JOB_ADDED + EVENT_JOB_UPDATED + EVENT_JOB_DELETED + EVENT_ALL_JOBS_DELETED + EVENT_JOB_PAUSED + EVENT_JOB_RESUMED + EVENT_JOB_EXECUTED + EVENT_JOB_ERROR + EVENT_JOB_TIMEOUT + + EVENT_ALL event = EVENT_SCHEDULER_STARTED | EVENT_SCHEDULER_STOPPED | + EVENT_JOB_ADDED | EVENT_JOB_UPDATED | + EVENT_JOB_DELETED | EVENT_ALL_JOBS_DELETED | + EVENT_JOB_PAUSED | EVENT_JOB_RESUMED | + EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_TIMEOUT +) + +type EventPkg struct { + Event event + JobId string + Data any +} + +// Event listener. +type Listener struct { + Callbacks []CallbackPkg +} + +type CallbackPkg struct { + Callback func(ep EventPkg) + Event event +} + +// Initialization functions for each Listener, +// called when the scheduler run `SetListener`. +func (l *Listener) init() error { + slog.Info("Listener init...") + + return nil +} + +// Event handler. +func (l *Listener) handleEvent(eP EventPkg) error { + for _, cP := range l.Callbacks { + if cP.Event&eP.Event == 0 { + continue + } + + go func(cP CallbackPkg) { + defer func() { + if err := recover(); err != nil { + slog.Error(fmt.Sprintf("Listener handle event error: %s", err)) + slog.Debug(string(debug.Stack())) + } + }() + + cP.Callback(eP) + }(cP) + } + + return nil +} diff --git a/listener_test.go b/listener_test.go new file mode 100644 index 0000000..a5c290a --- /dev/null +++ b/listener_test.go @@ -0,0 +1,56 @@ +package agscheduler_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/agscheduler/agscheduler" + "github.com/agscheduler/agscheduler/stores" +) + +func dryRunListener(ctx context.Context, j agscheduler.Job) (result string) { return } + +func dryCallbackListener(ep agscheduler.EventPkg) {} + +func TestListener(t *testing.T) { + agscheduler.RegisterFuncs( + agscheduler.FuncPkg{Func: dryRunListener}, + ) + + s := &agscheduler.Scheduler{} + + sto := &stores.MemoryStore{} + err := s.SetStore(sto) + assert.NoError(t, err) + + lis := &agscheduler.Listener{ + Callbacks: []agscheduler.CallbackPkg{ + { + Callback: dryCallbackListener, + Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED, + }, + }, + } + err = s.SetListener(lis) + assert.NoError(t, err) + + job := agscheduler.Job{ + Name: "Job", + Type: agscheduler.JOB_TYPE_INTERVAL, + Interval: "2s", + Func: dryRunListener, + } + job, err = s.AddJob(job) + assert.NoError(t, err) + + job, err = s.PauseJob(job.Id) + assert.NoError(t, err) + + err = s.DeleteJob(job.Id) + assert.NoError(t, err) + + time.Sleep(1 * time.Second) +} diff --git a/queues/base_test.go b/queues/base_test.go index df14dd9..7df23f4 100644 --- a/queues/base_test.go +++ b/queues/base_test.go @@ -24,8 +24,9 @@ func runTest(t *testing.T, brk *agscheduler.Broker) { agscheduler.FuncPkg{Func: runQueuesSleep}, ) - sto := &stores.MemoryStore{} s := &agscheduler.Scheduler{} + + sto := &stores.MemoryStore{} err := s.SetStore(sto) assert.NoError(t, err) diff --git a/raft_test.go b/raft_test.go index 58acbc5..aa7db0e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -21,6 +21,9 @@ func TestRaft(t *testing.T) { store := &stores.MemoryStore{} + schedulerMain := &agscheduler.Scheduler{} + err := schedulerMain.SetStore(store) + assert.NoError(t, err) cnMain := &agscheduler.ClusterNode{ EndpointMain: "127.0.0.1:36387", Endpoint: "127.0.0.1:36387", @@ -28,9 +31,6 @@ func TestRaft(t *testing.T) { EndpointHTTP: "127.0.0.1:36377", Mode: "HA", } - schedulerMain := &agscheduler.Scheduler{} - err := schedulerMain.SetStore(store) - assert.NoError(t, err) err = schedulerMain.SetClusterNode(ctx, cnMain) assert.NoError(t, err) cserviceMain := &services.ClusterService{Cn: cnMain} @@ -39,6 +39,9 @@ func TestRaft(t *testing.T) { time.Sleep(2 * time.Second) + schedulerNode := &agscheduler.Scheduler{} + err = schedulerNode.SetStore(store) + assert.NoError(t, err) cnNode := &agscheduler.ClusterNode{ EndpointMain: cnMain.Endpoint, Endpoint: "127.0.0.1:36388", @@ -46,15 +49,15 @@ func TestRaft(t *testing.T) { EndpointHTTP: "127.0.0.1:36378", Mode: "HA", } - schedulerNode := &agscheduler.Scheduler{} - err = schedulerNode.SetStore(store) - assert.NoError(t, err) err = schedulerNode.SetClusterNode(ctx, cnNode) assert.NoError(t, err) cserviceNode := &services.ClusterService{Cn: cnNode} err = cserviceNode.Start() assert.NoError(t, err) + schedulerNode2 := &agscheduler.Scheduler{} + err = schedulerNode2.SetStore(store) + assert.NoError(t, err) cnNode2 := &agscheduler.ClusterNode{ EndpointMain: cnMain.Endpoint, Endpoint: "127.0.0.1:36389", @@ -62,9 +65,6 @@ func TestRaft(t *testing.T) { EndpointHTTP: "127.0.0.1:36379", Mode: "HA", } - schedulerNode2 := &agscheduler.Scheduler{} - err = schedulerNode2.SetStore(store) - assert.NoError(t, err) err = schedulerNode2.SetClusterNode(ctx, cnNode2) assert.NoError(t, err) cserviceNode2 := &services.ClusterService{Cn: cnNode2} diff --git a/recorder_test.go b/recorder_test.go index 88399ea..b21065c 100644 --- a/recorder_test.go +++ b/recorder_test.go @@ -85,8 +85,8 @@ func TestPbRecordsPtrToRecords(t *testing.T) { func TestRecorderRecordMetadata(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -101,8 +101,8 @@ func TestRecorderRecordMetadata(t *testing.T) { func TestRecorderRecordResult(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -119,8 +119,8 @@ func TestRecorderRecordResult(t *testing.T) { func TestRecorderGetRecords(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -141,8 +141,8 @@ func TestRecorderGetRecords(t *testing.T) { func TestRecorderGetAllRecords(t *testing.T) { j := agscheduler.Job{Id: "1"} j2 := agscheduler.Job{Id: "2"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -176,8 +176,8 @@ func TestRecorderGetAllRecords(t *testing.T) { func TestRecorderDeleteRecords(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -196,8 +196,8 @@ func TestRecorderDeleteRecords(t *testing.T) { func TestRecorderDeleteAllRecords(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) @@ -216,8 +216,8 @@ func TestRecorderDeleteAllRecords(t *testing.T) { func TestRecorderClear(t *testing.T) { j := agscheduler.Job{Id: "1"} - rec := getRecorder() s := &agscheduler.Scheduler{} + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) diff --git a/scheduler.go b/scheduler.go index a0c7d96..4b332c4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,6 +19,7 @@ var GetStore = (*Scheduler).getStore var GetClusterNode = (*Scheduler).getClusterNode var GetBroker = (*Scheduler).getBroker var GetRecorder = (*Scheduler).getRecorder +var GetListener = (*Scheduler).getListener // In standalone mode, the scheduler only needs to run jobs on a regular basis. // In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes. @@ -39,6 +40,7 @@ type Scheduler struct { broker *Broker // When recorder exist, record the results of job runs. recorder *Recorder + listener *Listener statusM sync.RWMutex storeM sync.RWMutex @@ -129,6 +131,26 @@ func (s *Scheduler) HasRecorder() bool { return s.recorder != nil } +// Bind the listener +func (s *Scheduler) SetListener(lis *Listener) error { + slog.Info("Scheduler set Listener.") + + s.listener = lis + if err := s.listener.init(); err != nil { + return err + } + + return nil +} + +func (s *Scheduler) getListener() *Listener { + return s.listener +} + +func (s *Scheduler) HasListener() bool { + return s.listener != nil +} + // Calculate the next run time, different job type will be calculated in different ways, // when the job is paused, will return `9999-09-09 09:09:09`. func CalcNextRunTime(j Job) (time.Time, error) { @@ -189,6 +211,7 @@ func (s *Scheduler) AddJob(j Job) (Job, error) { s.wakeup() } + s.dispatchEvent(EventPkg{EVENT_JOB_ADDED, j.Id, nil}) return j, nil } @@ -238,6 +261,7 @@ func (s *Scheduler) _updateJob(j Job) (Job, error) { s.wakeup() } + s.dispatchEvent(EventPkg{EVENT_JOB_UPDATED, j.Id, nil}) return j, nil } @@ -260,7 +284,12 @@ func (s *Scheduler) _deleteJob(id string) error { return err } - return s.store.DeleteJob(id) + if err := s.store.DeleteJob(id); err != nil { + return err + } + + s.dispatchEvent(EventPkg{EVENT_JOB_DELETED, id, nil}) + return nil } func (s *Scheduler) DeleteJob(id string) error { @@ -276,7 +305,12 @@ func (s *Scheduler) DeleteAllJobs() error { slog.Info("Scheduler delete all jobs.") - return s.store.DeleteAllJobs() + if err := s.store.DeleteAllJobs(); err != nil { + return err + } + + s.dispatchEvent(EventPkg{EVENT_ALL_JOBS_DELETED, "", nil}) + return nil } func (s *Scheduler) PauseJob(id string) (Job, error) { @@ -297,6 +331,7 @@ func (s *Scheduler) PauseJob(id string) (Job, error) { return Job{}, err } + s.dispatchEvent(EventPkg{EVENT_JOB_PAUSED, j.Id, nil}) return j, nil } @@ -318,6 +353,7 @@ func (s *Scheduler) ResumeJob(id string) (Job, error) { return Job{}, err } + s.dispatchEvent(EventPkg{EVENT_JOB_RESUMED, j.Id, nil}) return j, nil } @@ -374,6 +410,7 @@ func (s *Scheduler) _runJob(j Job) { defer func() { if err := recover(); err != nil { slog.Error(fmt.Sprintf("Job `%s` run error: %s", j.FullName(), err)) + s.dispatchEvent(EventPkg{EVENT_JOB_ERROR, j.Id, err}) slog.Debug(string(debug.Stack())) status = RECORD_STATUS_ERROR result = fmt.Sprintf("%s", err) @@ -386,11 +423,13 @@ func (s *Scheduler) _runJob(j Job) { select { case <-ch: + s.dispatchEvent(EventPkg{EVENT_JOB_EXECUTED, j.Id, nil}) if status == "" { status = RECORD_STATUS_COMPLETED } case <-ctx.Done(): slog.Warn(fmt.Sprintf("Job `%s` run timeout", j.FullName())) + s.dispatchEvent(EventPkg{EVENT_JOB_TIMEOUT, j.Id, nil}) status = RECORD_STATUS_TIMEOUT } @@ -588,6 +627,7 @@ func (s *Scheduler) Start() { go s.run() slog.Info("Scheduler start.") + s.dispatchEvent(EventPkg{EVENT_SCHEDULER_STARTED, "", nil}) } // In addition to being called manually, @@ -605,9 +645,10 @@ func (s *Scheduler) Stop() { s.isRunning = false slog.Info("Scheduler stop.") + s.dispatchEvent(EventPkg{EVENT_SCHEDULER_STOPPED, "", nil}) } -// Dynamically calculate the next wakeup interval, avoid frequent wakeup of the scheduler +// Dynamically calculate the next wakeup interval, avoid frequent wakeup of the scheduler. func (s *Scheduler) getNextWakeupInterval() time.Duration { nextRunTimeMin, err := s.store.GetNextRunTime() if err != nil { @@ -630,6 +671,14 @@ func (s *Scheduler) wakeup() { } } +// Send an event to the listener. +func (s *Scheduler) dispatchEvent(eP EventPkg) { + if !s.HasListener() { + return + } + s.listener.handleEvent(eP) +} + func (s *Scheduler) Info() map[string]any { info := map[string]any{ "scheduler": map[string]any{ diff --git a/scheduler_test.go b/scheduler_test.go index 004dacc..169de9c 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -18,9 +18,11 @@ func dryRunScheduler(ctx context.Context, j agscheduler.Job) (result string) { r func runSchedulerPanic(ctx context.Context, j agscheduler.Job) (result string) { panic(nil); return } +func dryCallbackScheduler(ep agscheduler.EventPkg) {} + func getSchedulerWithStore(t *testing.T) *agscheduler.Scheduler { - store := &stores.MemoryStore{} scheduler := &agscheduler.Scheduler{} + store := &stores.MemoryStore{} err := scheduler.SetStore(store) assert.NoError(t, err) @@ -76,12 +78,22 @@ func getRecorder() *agscheduler.Recorder { return &agscheduler.Recorder{Backend: mb} } +func getListener() *agscheduler.Listener { + return &agscheduler.Listener{ + Callbacks: []agscheduler.CallbackPkg{ + { + Callback: dryCallbackScheduler, + Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED, + }, + }, + } +} + func TestSchedulerSetStore(t *testing.T) { - store := &stores.MemoryStore{} s := &agscheduler.Scheduler{} - assert.Nil(t, agscheduler.GetStore(s)) + store := &stores.MemoryStore{} err := s.SetStore(store) assert.NoError(t, err) @@ -89,13 +101,12 @@ func TestSchedulerSetStore(t *testing.T) { } func TestSchedulerSetClusterNode(t *testing.T) { - cn := getClusterNode() s := &agscheduler.Scheduler{} - assert.Nil(t, agscheduler.GetClusterNode(s)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + cn := getClusterNode() err := s.SetClusterNode(ctx, cn) assert.NoError(t, err) @@ -103,13 +114,12 @@ func TestSchedulerSetClusterNode(t *testing.T) { } func TestSchedulerSetBroker(t *testing.T) { - brk := &agscheduler.Broker{} s := &agscheduler.Scheduler{} - assert.Nil(t, agscheduler.GetBroker(s)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + brk := &agscheduler.Broker{} err := s.SetBroker(ctx, brk) assert.NoError(t, err) @@ -117,17 +127,27 @@ func TestSchedulerSetBroker(t *testing.T) { } func TestSchedulerSetRecorder(t *testing.T) { - rec := getRecorder() s := &agscheduler.Scheduler{} - assert.Nil(t, agscheduler.GetRecorder(s)) + rec := getRecorder() err := s.SetRecorder(rec) assert.NoError(t, err) assert.NotNil(t, agscheduler.GetRecorder(s)) } +func TestSchedulerSetListener(t *testing.T) { + s := &agscheduler.Scheduler{} + assert.Nil(t, agscheduler.GetListener(s)) + + lis := getListener() + err := s.SetListener(lis) + assert.NoError(t, err) + + assert.NotNil(t, agscheduler.GetListener(s)) +} + func TestSchedulerAddJob(t *testing.T) { s := getSchedulerWithStore(t) defer s.Stop() diff --git a/services/cluster_test.go b/services/cluster_test.go index b4fc51c..121be3b 100644 --- a/services/cluster_test.go +++ b/services/cluster_test.go @@ -19,14 +19,15 @@ import ( func TestClusterService(t *testing.T) { gin.SetMode(gin.ReleaseMode) - store := &stores.MemoryStore{} - cnMain := &agscheduler.ClusterNode{ - EndpointMain: "127.0.0.1:36380", - } scheduler := &agscheduler.Scheduler{} + + store := &stores.MemoryStore{} err := scheduler.SetStore(store) assert.NoError(t, err) + cnMain := &agscheduler.ClusterNode{ + EndpointMain: "127.0.0.1:36380", + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() err = scheduler.SetClusterNode(ctx, cnMain) diff --git a/services/proxy_test.go b/services/proxy_test.go index 0bda035..00866fd 100644 --- a/services/proxy_test.go +++ b/services/proxy_test.go @@ -23,7 +23,11 @@ func TestClusterProxy(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + schedulerMain := &agscheduler.Scheduler{} + store := &stores.MemoryStore{} + err := schedulerMain.SetStore(store) + assert.NoError(t, err) cnMain := &agscheduler.ClusterNode{ EndpointMain: "127.0.0.1:36380", @@ -31,9 +35,6 @@ func TestClusterProxy(t *testing.T) { EndpointGRPC: "127.0.0.1:36360", EndpointHTTP: "127.0.0.1:36370", } - schedulerMain := &agscheduler.Scheduler{} - err := schedulerMain.SetStore(store) - assert.NoError(t, err) err = schedulerMain.SetClusterNode(ctx, cnMain) assert.NoError(t, err) cserviceMain := &ClusterService{Cn: cnMain} @@ -42,6 +43,10 @@ func TestClusterProxy(t *testing.T) { time.Sleep(2 * time.Second) + scheduler := &agscheduler.Scheduler{} + err = scheduler.SetStore(store) + assert.NoError(t, err) + cnNode := &agscheduler.ClusterNode{ EndpointMain: cnMain.Endpoint, Endpoint: "127.0.0.1:36381", @@ -49,9 +54,6 @@ func TestClusterProxy(t *testing.T) { EndpointHTTP: "127.0.0.1:36371", Queue: "node", } - scheduler := &agscheduler.Scheduler{} - err = scheduler.SetStore(store) - assert.NoError(t, err) err = scheduler.SetClusterNode(ctx, cnNode) assert.NoError(t, err) cservice := &ClusterService{Cn: cnNode} diff --git a/stores/base_test.go b/stores/base_test.go index 9492ce1..c621456 100644 --- a/stores/base_test.go +++ b/stores/base_test.go @@ -17,6 +17,7 @@ func runTest(t *testing.T, sto agscheduler.Store) { ) s := &agscheduler.Scheduler{} + err := s.SetStore(sto) assert.NoError(t, err) diff --git a/version.go b/version.go index f148d26..8e4c7fd 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package agscheduler -const Version = "0.10.0" +const Version = "0.11.0"