Skip to content

Commit

Permalink
Merge pull request #165 from AGScheduler/dev
Browse files Browse the repository at this point in the history
feat(Listener): support event listening
  • Loading branch information
kwkwc authored Jun 20, 2024
2 parents aac2882 + 7035b12 commit 83b3ac0
Show file tree
Hide file tree
Showing 23 changed files with 401 additions and 83 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,9 @@ examples-backend:
go run examples/backends/base.go examples/backends/gorm.go
go run examples/backends/base.go examples/backends/mongodb.go

.PHONY: examples-event
examples-event:
go run examples/event/event.go

.PHONY: examples-all
examples-all: examples-store examples-api examples-queue examples-backend
examples-all: examples-store examples-api examples-queue examples-backend examples-event
43 changes: 32 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler)
[![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE)

> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, remote call, and cluster
> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, event listening, remote call, and cluster
English | [简体中文](README.zh-CN.md)

Expand Down Expand Up @@ -36,6 +36,9 @@ English | [简体中文](README.zh-CN.md)
- [x] Memory (Cluster mode is not supported)
- [x] [GORM](https://gorm.io/) (any RDBMS supported by GORM works)
- [x] [MongoDB](https://www.mongodb.com/)
- Supports event listening
- [x] Scheduler event
- [x] Job event
- Supports remote call
- [x] [gRPC](https://grpc.io/)
- [x] HTTP
Expand Down Expand Up @@ -78,8 +81,9 @@ func main() {
agscheduler.FuncPkg{Func: printMsg},
)

store := &stores.MemoryStore{}
scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
scheduler.SetStore(store)

job1 := agscheduler.Job{
Expand Down Expand Up @@ -132,7 +136,7 @@ func main() {

```go
mq := &queues.MemoryQueue{}
brk := &agscheduler.Broker{
broker := &agscheduler.Broker{
Queues: map[string]agscheduler.QueuePkg{
"default": {
Queue: mq,
Expand All @@ -141,21 +145,40 @@ brk := &agscheduler.Broker{
},
}

scheduler.SetStore(store)
scheduler.SetBroker(brk)
scheduler.SetBroker(broker)
```

## Result Collection

```go
mb := &backends.MemoryBackend{}
rec := &agscheduler.Recorder{Backend: mb}
recorder := &agscheduler.Recorder{Backend: mb}

scheduler.SetStore(store)
scheduler.SetRecorder(rec)
scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := rec.GetRecords(job.Id)
records, _ := recorder.GetRecords(job.Id)
```

## Event listening

```go
func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}

scheduler.SetListener(listener)
```

## gRPC
Expand Down Expand Up @@ -202,7 +225,6 @@ cnMain := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
Expand All @@ -215,7 +237,6 @@ cnNode := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
Expand Down
42 changes: 31 additions & 11 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler)
[![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE)

> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持远程调用,支持集群
> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持事件监听, 支持远程调用,支持集群
[English](README.md) | 简体中文

Expand Down Expand Up @@ -36,6 +36,9 @@
- [x] Memory (不支持集群模式)
- [x] [GORM](https://gorm.io/) (任何 GORM 支持的 RDBMS 都能运行)
- [x] [MongoDB](https://www.mongodb.com/)
- 支持事件监听
- [x] 调度器事件
- [x] 作业事件
- 支持远程调用
- [x] [gRPC](https://grpc.io/)
- [x] HTTP
Expand Down Expand Up @@ -78,8 +81,9 @@ func main() {
agscheduler.FuncPkg{Func: printMsg},
)

store := &stores.MemoryStore{}
scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
scheduler.SetStore(store)

job1 := agscheduler.Job{
Expand Down Expand Up @@ -132,7 +136,7 @@ func main() {

```go
mq := &queues.MemoryQueue{}
brk := &agscheduler.Broker{
broker := &agscheduler.Broker{
Queues: map[string]agscheduler.QueuePkg{
"default": {
Queue: mq,
Expand All @@ -141,21 +145,39 @@ brk := &agscheduler.Broker{
},
}

scheduler.SetStore(store)
scheduler.SetBroker(brk)
scheduler.SetBroker(broker)
```

## 结果回收

```go
mb := &backends.MemoryBackend{}
rec := &agscheduler.Recorder{Backend: mb}
recorder := &agscheduler.Recorder{Backend: mb}

scheduler.SetStore(store)
scheduler.SetRecorder(rec)
scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := rec.GetRecords(job.Id)
records, _ := recorder.GetRecords(job.Id)
```

## 事件监听
```go
func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}

scheduler.SetListener(listener)
```

## gRPC
Expand Down Expand Up @@ -202,7 +224,6 @@ cnMain := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
Expand All @@ -215,7 +236,6 @@ cnNode := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
Expand Down
10 changes: 6 additions & 4 deletions backends/base_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package backends

import (
"context"
"log/slog"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/examples"
"github.com/agscheduler/agscheduler/stores"
)

func dryRunRecorder(ctx context.Context, j agscheduler.Job) (result string) { return }

func runTest(t *testing.T, rec *agscheduler.Recorder) {
agscheduler.RegisterFuncs(
agscheduler.FuncPkg{Func: examples.PrintMsg},
agscheduler.FuncPkg{Func: dryRunRecorder},
)

s := &agscheduler.Scheduler{}
Expand All @@ -28,7 +30,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) {
Name: "Job",
Type: agscheduler.JOB_TYPE_INTERVAL,
Interval: "2s",
Func: examples.PrintMsg,
Func: dryRunRecorder,
}
job, err = s.AddJob(job)
assert.NoError(t, err)
Expand All @@ -37,7 +39,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) {
Name: "Job2",
Type: agscheduler.JOB_TYPE_DATETIME,
StartAt: "2023-09-22 07:30:08",
Func: examples.PrintMsg,
Func: dryRunRecorder,
}
_, err = s.AddJob(job2)
assert.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions examples/cluster/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func main() {

flag.Parse()

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}

cn := &agscheduler.ClusterNode{
EndpointMain: *endpointMain,
Expand All @@ -52,13 +59,6 @@ func main() {
Queue: *queue,
Mode: *mode,
}

scheduler := &agscheduler.Scheduler{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}
err = scheduler.SetClusterNode(context.TODO(), cn)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set cluster node: %s", err))
Expand Down
61 changes: 61 additions & 0 deletions examples/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// go run examples/event/event.go

package main

import (
"fmt"
"log/slog"
"os"
"time"

"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/examples"
"github.com/agscheduler/agscheduler/stores"
)

func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

func main() {
agscheduler.RegisterFuncs(
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

s := &agscheduler.Scheduler{}

sto := &stores.MemoryStore{}
err := s.SetStore(sto)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}

lis := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}
err = s.SetListener(lis)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set listener: %s", err))
os.Exit(1)
}

job := agscheduler.Job{
Name: "Job",
Type: agscheduler.JOB_TYPE_INTERVAL,
Interval: "2s",
Func: examples.PrintMsg,
}
job, _ = s.AddJob(job)

job, _ = s.PauseJob(job.Id)

_ = s.DeleteJob(job.Id)

time.Sleep(1 * time.Second)
}
4 changes: 2 additions & 2 deletions examples/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/grpc/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/http/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
Loading

0 comments on commit 83b3ac0

Please sign in to comment.