Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Listener): support event listening #165

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,9 @@ examples-backend:
go run examples/backends/base.go examples/backends/gorm.go
go run examples/backends/base.go examples/backends/mongodb.go

.PHONY: examples-event
examples-event:
go run examples/event/event.go

.PHONY: examples-all
examples-all: examples-store examples-api examples-queue examples-backend
examples-all: examples-store examples-api examples-queue examples-backend examples-event
43 changes: 32 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler)
[![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE)

> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, remote call, and cluster
> Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, job queues, job result collection, event listening, remote call, and cluster

English | [简体中文](README.zh-CN.md)

Expand Down Expand Up @@ -36,6 +36,9 @@ English | [简体中文](README.zh-CN.md)
- [x] Memory (Cluster mode is not supported)
- [x] [GORM](https://gorm.io/) (any RDBMS supported by GORM works)
- [x] [MongoDB](https://www.mongodb.com/)
- Supports event listening
- [x] Scheduler event
- [x] Job event
- Supports remote call
- [x] [gRPC](https://grpc.io/)
- [x] HTTP
Expand Down Expand Up @@ -78,8 +81,9 @@ func main() {
agscheduler.FuncPkg{Func: printMsg},
)

store := &stores.MemoryStore{}
scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
scheduler.SetStore(store)

job1 := agscheduler.Job{
Expand Down Expand Up @@ -132,7 +136,7 @@ func main() {

```go
mq := &queues.MemoryQueue{}
brk := &agscheduler.Broker{
broker := &agscheduler.Broker{
Queues: map[string]agscheduler.QueuePkg{
"default": {
Queue: mq,
Expand All @@ -141,21 +145,40 @@ brk := &agscheduler.Broker{
},
}

scheduler.SetStore(store)
scheduler.SetBroker(brk)
scheduler.SetBroker(broker)
```

## Result Collection

```go
mb := &backends.MemoryBackend{}
rec := &agscheduler.Recorder{Backend: mb}
recorder := &agscheduler.Recorder{Backend: mb}

scheduler.SetStore(store)
scheduler.SetRecorder(rec)
scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := rec.GetRecords(job.Id)
records, _ := recorder.GetRecords(job.Id)
```

## Event listening

```go
func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}

scheduler.SetListener(listener)
```

## gRPC
Expand Down Expand Up @@ -202,7 +225,6 @@ cnMain := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
Expand All @@ -215,7 +237,6 @@ cnNode := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
Expand Down
42 changes: 31 additions & 11 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/agscheduler/agscheduler)
[![license](https://img.shields.io/github/license/agscheduler/agscheduler)](https://github.com/agscheduler/agscheduler/blob/main/LICENSE)

> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持远程调用,支持集群
> Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持作业队列,支持作业结果回收,支持事件监听, 支持远程调用,支持集群

[English](README.md) | 简体中文

Expand Down Expand Up @@ -36,6 +36,9 @@
- [x] Memory (不支持集群模式)
- [x] [GORM](https://gorm.io/) (任何 GORM 支持的 RDBMS 都能运行)
- [x] [MongoDB](https://www.mongodb.com/)
- 支持事件监听
- [x] 调度器事件
- [x] 作业事件
- 支持远程调用
- [x] [gRPC](https://grpc.io/)
- [x] HTTP
Expand Down Expand Up @@ -78,8 +81,9 @@ func main() {
agscheduler.FuncPkg{Func: printMsg},
)

store := &stores.MemoryStore{}
scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
scheduler.SetStore(store)

job1 := agscheduler.Job{
Expand Down Expand Up @@ -132,7 +136,7 @@ func main() {

```go
mq := &queues.MemoryQueue{}
brk := &agscheduler.Broker{
broker := &agscheduler.Broker{
Queues: map[string]agscheduler.QueuePkg{
"default": {
Queue: mq,
Expand All @@ -141,21 +145,39 @@ brk := &agscheduler.Broker{
},
}

scheduler.SetStore(store)
scheduler.SetBroker(brk)
scheduler.SetBroker(broker)
```

## 结果回收

```go
mb := &backends.MemoryBackend{}
rec := &agscheduler.Recorder{Backend: mb}
recorder := &agscheduler.Recorder{Backend: mb}

scheduler.SetStore(store)
scheduler.SetRecorder(rec)
scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := rec.GetRecords(job.Id)
records, _ := recorder.GetRecords(job.Id)
```

## 事件监听
```go
func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}

scheduler.SetListener(listener)
```

## gRPC
Expand Down Expand Up @@ -202,7 +224,6 @@ cnMain := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
Expand All @@ -215,7 +236,6 @@ cnNode := &agscheduler.ClusterNode{
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
Expand Down
10 changes: 6 additions & 4 deletions backends/base_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package backends

import (
"context"
"log/slog"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/examples"
"github.com/agscheduler/agscheduler/stores"
)

func dryRunRecorder(ctx context.Context, j agscheduler.Job) (result string) { return }

func runTest(t *testing.T, rec *agscheduler.Recorder) {
agscheduler.RegisterFuncs(
agscheduler.FuncPkg{Func: examples.PrintMsg},
agscheduler.FuncPkg{Func: dryRunRecorder},
)

s := &agscheduler.Scheduler{}
Expand All @@ -28,7 +30,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) {
Name: "Job",
Type: agscheduler.JOB_TYPE_INTERVAL,
Interval: "2s",
Func: examples.PrintMsg,
Func: dryRunRecorder,
}
job, err = s.AddJob(job)
assert.NoError(t, err)
Expand All @@ -37,7 +39,7 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) {
Name: "Job2",
Type: agscheduler.JOB_TYPE_DATETIME,
StartAt: "2023-09-22 07:30:08",
Func: examples.PrintMsg,
Func: dryRunRecorder,
}
_, err = s.AddJob(job2)
assert.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions examples/cluster/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func main() {

flag.Parse()

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}

cn := &agscheduler.ClusterNode{
EndpointMain: *endpointMain,
Expand All @@ -52,13 +59,6 @@ func main() {
Queue: *queue,
Mode: *mode,
}

scheduler := &agscheduler.Scheduler{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}
err = scheduler.SetClusterNode(context.TODO(), cn)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set cluster node: %s", err))
Expand Down
61 changes: 61 additions & 0 deletions examples/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// go run examples/event/event.go

package main

import (
"fmt"
"log/slog"
"os"
"time"

"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/examples"
"github.com/agscheduler/agscheduler/stores"
)

func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

func main() {
agscheduler.RegisterFuncs(
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

s := &agscheduler.Scheduler{}

sto := &stores.MemoryStore{}
err := s.SetStore(sto)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
os.Exit(1)
}

lis := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}
err = s.SetListener(lis)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set listener: %s", err))
os.Exit(1)
}

job := agscheduler.Job{
Name: "Job",
Type: agscheduler.JOB_TYPE_INTERVAL,
Interval: "2s",
Func: examples.PrintMsg,
}
job, _ = s.AddJob(job)

job, _ = s.PauseJob(job.Id)

_ = s.DeleteJob(job.Id)

time.Sleep(1 * time.Second)
}
4 changes: 2 additions & 2 deletions examples/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/grpc/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions examples/http/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func main() {
agscheduler.FuncPkg{Func: examples.PrintMsg},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set store: %s", err))
Expand Down
Loading