Skip to content

Commit

Permalink
feat(Recorder): supports pagination (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwkwc authored Jun 1, 2024
1 parent d9669ea commit f97cf74
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 49 deletions.
30 changes: 24 additions & 6 deletions backends/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,61 @@ func runTest(t *testing.T, rec *agscheduler.Recorder) {
assert.NoError(t, err)

job2 := agscheduler.Job{
Name: "Job",
Name: "Job2",
Type: agscheduler.JOB_TYPE_DATETIME,
StartAt: "2023-09-22 07:30:08",
Func: examples.PrintMsg,
}
_, err = s.AddJob(job2)
assert.NoError(t, err)

records, err := rec.GetRecords(job.Id)
records, total, err := rec.GetRecords(job.Id, 1, 10)
assert.NoError(t, err)
assert.Len(t, records, 0)
assert.Equal(t, 0, int(total))

s.Start()

slog.Info("Sleep 5s......\n\n")
time.Sleep(5 * time.Second)

records, err = rec.GetRecords(job.Id)
records, total, err = rec.GetRecords(job.Id, 1, 10)
assert.NoError(t, err)
assert.Len(t, records, 2)
assert.Equal(t, 2, int(total))
assert.Equal(t, agscheduler.RECORD_STATUS_COMPLETED, records[0].Status)

records, err = rec.GetAllRecords()
records, total, err = rec.GetRecords(job.Id, 2, 1)
assert.NoError(t, err)
assert.Len(t, records, 1)
assert.Equal(t, 2, int(total))

records, total, err = rec.GetAllRecords(1, 10)
assert.NoError(t, err)
assert.Len(t, records, 3)
assert.Equal(t, 3, int(total))

records, total, err = rec.GetAllRecords(2, 2)
assert.NoError(t, err)
assert.Len(t, records, 1)
assert.Equal(t, 3, int(total))
assert.Equal(t, "Job2", records[0].JobName)
_, _, err = rec.GetAllRecords(10, 10)
assert.NoError(t, err)

err = rec.DeleteRecords(job.Id)
assert.NoError(t, err)
records, err = rec.GetAllRecords()
records, total, err = rec.GetAllRecords(1, 10)
assert.NoError(t, err)
assert.Len(t, records, 1)
assert.Equal(t, 1, int(total))

err = rec.DeleteAllRecords()
assert.NoError(t, err)
records, err = rec.GetAllRecords()
records, total, err = rec.GetAllRecords(1, 10)
assert.NoError(t, err)
assert.Len(t, records, 0)
assert.Equal(t, 0, int(total))

err = s.DeleteAllJobs()
assert.NoError(t, err)
Expand Down
22 changes: 15 additions & 7 deletions backends/gorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,22 @@ func (b *GORMBackend) RecordResult(id uint64, status string, result string) erro
Error
}

func (b *GORMBackend) _getRecords(query any, args ...any) ([]agscheduler.Record, error) {
func (b *GORMBackend) _getRecords(page, pageSize int, query any, args ...any) ([]agscheduler.Record, int64, error) {
var rsList []*Records
total := int64(0)

err := b.DB.Table(b.TableName).Where(query, args...).
Order("start_at desc").
Limit(pageSize).Offset((page - 1) * pageSize).
Find(&rsList).Error
if err != nil {
return nil, err
return nil, total, err
}

err = b.DB.Table(b.TableName).Where(query, args...).
Count(&total).Error
if err != nil {
return nil, total, err
}

var recordList []agscheduler.Record
Expand All @@ -86,15 +94,15 @@ func (b *GORMBackend) _getRecords(query any, args ...any) ([]agscheduler.Record,
})
}

return recordList, nil
return recordList, total, nil
}

func (b *GORMBackend) GetRecords(jId string) ([]agscheduler.Record, error) {
return b._getRecords("job_id = ?", jId)
func (b *GORMBackend) GetRecords(jId string, page, pageSize int) ([]agscheduler.Record, int64, error) {
return b._getRecords(page, pageSize, "job_id = ?", jId)
}

func (b *GORMBackend) GetAllRecords() ([]agscheduler.Record, error) {
return b._getRecords("1 = 1")
func (b *GORMBackend) GetAllRecords(page, pageSize int) ([]agscheduler.Record, int64, error) {
return b._getRecords(page, pageSize, "1 = 1")
}

func (b *GORMBackend) DeleteRecords(jId string) error {
Expand Down
33 changes: 29 additions & 4 deletions backends/memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backends

import (
"math"
"sort"
"time"

Expand Down Expand Up @@ -36,24 +37,30 @@ func (b *MemoryBackend) RecordResult(id uint64, status string, result string) er
return nil
}

func (b *MemoryBackend) GetRecords(jId string) ([]agscheduler.Record, error) {
func (b *MemoryBackend) GetRecords(jId string, page, pageSize int) ([]agscheduler.Record, int64, error) {
rs := []agscheduler.Record{}
for _, r := range b.records {
if r.JobId == jId {
rs = append(rs, r)
}
}
sort.Sort(agscheduler.RecordSlice(rs))
total := len(rs)
start, end := slicePage(page, pageSize, total)
rs = rs[start:end]

return rs, nil
return rs, int64(total), nil
}

func (b *MemoryBackend) GetAllRecords() ([]agscheduler.Record, error) {
func (b *MemoryBackend) GetAllRecords(page, pageSize int) ([]agscheduler.Record, int64, error) {
rs := make([]agscheduler.Record, len(b.records))
copy(rs, b.records)
sort.Sort(agscheduler.RecordSlice(rs))
total := len(rs)
start, end := slicePage(page, pageSize, total)
rs = rs[start:end]

return rs, nil
return rs, int64(total), nil
}

func (b *MemoryBackend) DeleteRecords(jId string) error {
Expand All @@ -77,3 +84,21 @@ func (b *MemoryBackend) DeleteAllRecords() error {
func (b *MemoryBackend) Clear() error {
return b.DeleteAllRecords()
}

func slicePage(page, pageSize, total int) (sliceStart, sliceEnd int) {
if pageSize > total {
return 0, total
}

pageCount := int(math.Ceil(float64(total) / float64(pageSize)))
if page > pageCount {
return 0, 0
}
sliceStart = (page - 1) * pageSize
sliceEnd = sliceStart + pageSize

if sliceEnd > total {
sliceEnd = total
}
return sliceStart, sliceEnd
}
29 changes: 19 additions & 10 deletions backends/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,28 @@ func (b *MongoDBBackend) RecordResult(id uint64, status string, result string) e
return err
}

func (b *MongoDBBackend) _getRecords(filter any) ([]agscheduler.Record, error) {
opts := options.Find().SetSort(bson.M{"start_at": -1})
cursor, err := b.coll.Find(ctx, filter, opts)
func (b *MongoDBBackend) _getRecords(page, pageSize int, filter any) ([]agscheduler.Record, int64, error) {
total := int64(0)

optsFind := options.Find().SetSort(bson.M{"start_at": -1}).
SetLimit(int64(pageSize)).SetSkip(int64((page - 1) * pageSize))
cursor, err := b.coll.Find(ctx, filter, optsFind)
if err != nil {
return nil, total, err
}

optsCount := options.Count().SetHint("_id_")
total, err = b.coll.CountDocuments(ctx, filter, optsCount)
if err != nil {
return nil, err
return nil, total, err
}

var recordList []agscheduler.Record
for cursor.Next(ctx) {
var result bson.M
err := cursor.Decode(&result)
if err != nil {
return nil, err
return nil, total, err
}
recordList = append(recordList, agscheduler.Record{
Id: uint64(result["_id"].(int64)),
Expand All @@ -109,15 +118,15 @@ func (b *MongoDBBackend) _getRecords(filter any) ([]agscheduler.Record, error) {
})
}

return recordList, nil
return recordList, total, nil
}

func (b *MongoDBBackend) GetRecords(jId string) ([]agscheduler.Record, error) {
return b._getRecords(bson.M{"job_id": jId})
func (b *MongoDBBackend) GetRecords(jId string, page, pageSize int) ([]agscheduler.Record, int64, error) {
return b._getRecords(page, pageSize, bson.M{"job_id": jId})
}

func (b *MongoDBBackend) GetAllRecords() ([]agscheduler.Record, error) {
return b._getRecords(bson.M{})
func (b *MongoDBBackend) GetAllRecords(page, pageSize int) ([]agscheduler.Record, int64, error) {
return b._getRecords(page, pageSize, bson.M{})
}

func (b *MongoDBBackend) DeleteRecords(jId string) error {
Expand Down
4 changes: 2 additions & 2 deletions examples/backends/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func runExample(rec *agscheduler.Recorder) {
slog.Info("Sleep 5s......\n\n")
time.Sleep(5 * time.Second)

records, _ := rec.GetRecords(job.Id)
records, _, _ := rec.GetRecords(job.Id, 1, 10)
slog.Info(fmt.Sprintf("Scheduler recorder get records %v.\n\n", records))

records, _ = rec.GetAllRecords()
records, _, _ = rec.GetAllRecords(1, 10)
slog.Info(fmt.Sprintf("Scheduler recorder get all records %v.\n\n", records))

rec.DeleteRecords(job.Id)
Expand Down
6 changes: 4 additions & 2 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ type Backend interface {
RecordResult(id uint64, status string, result string) error

// Get records by job id from this backend.
GetRecords(jId string) ([]Record, error)
// @return records, total, error.
GetRecords(jId string, page, pageSize int) ([]Record, int64, error)

// Get all records from this backend.
GetAllRecords() ([]Record, error)
// @return records, total, error.
GetAllRecords(page, pageSize int) ([]Record, int64, error)

// Delete records by job id from this backend.
DeleteRecords(jId string) error
Expand Down
30 changes: 26 additions & 4 deletions recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,24 @@ func (r *Recorder) RecordResult(id uint64, status string, result string) error {
return r.Backend.RecordResult(id, status, result)
}

func (r *Recorder) GetRecords(jId string) ([]Record, error) {
func (r *Recorder) GetRecords(jId string, page, pageSize int) ([]Record, int64, error) {
r.backendM.RLock()
defer r.backendM.RUnlock()

return r.Backend.GetRecords(jId)
page = fixPositiveNum(page, 1)
pageSize = fixPositiveNumMax(fixPositiveNum(pageSize, 10), 1000)

return r.Backend.GetRecords(jId, page, pageSize)
}

func (r *Recorder) GetAllRecords() ([]Record, error) {
func (r *Recorder) GetAllRecords(page, pageSize int) ([]Record, int64, error) {
r.backendM.RLock()
defer r.backendM.RUnlock()

return r.Backend.GetAllRecords()
page = fixPositiveNum(page, 1)
pageSize = fixPositiveNumMax(fixPositiveNum(pageSize, 10), 1000)

return r.Backend.GetAllRecords(page, pageSize)
}

func (r *Recorder) DeleteRecords(jId string) error {
Expand All @@ -136,3 +142,19 @@ func (r *Recorder) Clear() error {

return r.Backend.Clear()
}

func fixPositiveNum(num, numDef int) int {
if num < 1 {
return numDef
}

return num
}

func fixPositiveNumMax(num, numMax int) int {
if num > numMax {
return numMax
}

return num
}
Loading

0 comments on commit f97cf74

Please sign in to comment.