Skip to content

Commit

Permalink
Merge pull request #145 from AGScheduler/dev
Browse files Browse the repository at this point in the history
feat(Recorder): add HTTP API && bugfix
  • Loading branch information
kwkwc authored Jun 1, 2024
2 parents f97cf74 + 8722579 commit fd73430
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 28 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ records, _ := rec.GetRecords(job.Id)
|---------------|-------------|---------------------------|
| GetNodes | GET | /cluster/nodes |

## Recorder API

| gRPC Function | HTTP Method | HTTP Path |
|---------------|-------------|---------------------------|
| GetRecords | GET | /recorder/records/:job_id |
| GetAllRecords | GET | /recorder/records |

## Examples

[Complete example][examples]
Expand Down
7 changes: 7 additions & 0 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ records, _ := rec.GetRecords(job.Id)
|---------------|-------------|---------------------------|
| GetNodes | GET | /cluster/nodes |

## Recorder API

| gRPC Function | HTTP Method | HTTP Path |
|---------------|-------------|---------------------------|
| GetRecords | GET | /recorder/records/:job_id |
| GetAllRecords | GET | /recorder/records |

## 示例

[完整示例][examples]
Expand Down
4 changes: 0 additions & 4 deletions backends/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (b *MemoryBackend) Clear() error {
}

func slicePage(page, pageSize, total int) (sliceStart, sliceEnd int) {
if pageSize > total {
return 0, total
}

pageCount := int(math.Ceil(float64(total) / float64(pageSize)))
if page > pageCount {
return 0, 0
Expand Down
22 changes: 0 additions & 22 deletions recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,13 @@ func (r *Recorder) GetRecords(jId string, page, pageSize int) ([]Record, int64,
r.backendM.RLock()
defer r.backendM.RUnlock()

page = fixPositiveNum(page, 1)
pageSize = fixPositiveNumMax(fixPositiveNum(pageSize, 10), 1000)

return r.Backend.GetRecords(jId, page, pageSize)
}

func (r *Recorder) GetAllRecords(page, pageSize int) ([]Record, int64, error) {
r.backendM.RLock()
defer r.backendM.RUnlock()

page = fixPositiveNum(page, 1)
pageSize = fixPositiveNumMax(fixPositiveNum(pageSize, 10), 1000)

return r.Backend.GetAllRecords(page, pageSize)
}

Expand All @@ -142,19 +136,3 @@ func (r *Recorder) Clear() error {

return r.Backend.Clear()
}

func fixPositiveNum(num, numDef int) int {
if num < 1 {
return numDef
}

return num
}

func fixPositiveNumMax(num, numMax int) int {
if num > numMax {
return numMax
}

return num
}
5 changes: 5 additions & 0 deletions services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (s *HTTPService) Start() error {
chs.registerRoutes(r)
}

if s.Scheduler.HasRecorder() {
rhs := &rHTTPService{recorder: agscheduler.GetRecorder(s.Scheduler)}
rhs.registerRoutes(r)
}

slog.Info(fmt.Sprintf("HTTP Service listening at: %s", s.Address))

s.srv = &http.Server{
Expand Down
11 changes: 9 additions & 2 deletions services/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/backends"
"github.com/agscheduler/agscheduler/stores"
)

Expand Down Expand Up @@ -50,12 +51,17 @@ func TestHTTPService(t *testing.T) {
agscheduler.FuncPkg{Func: dryRunHTTP},
)

store := &stores.MemoryStore{}

scheduler := &agscheduler.Scheduler{}

store := &stores.MemoryStore{}
err := scheduler.SetStore(store)
assert.NoError(t, err)

mb := &backends.MemoryBackend{}
recorder := &agscheduler.Recorder{Backend: mb}
err = scheduler.SetRecorder(recorder)
assert.NoError(t, err)

hservice := HTTPService{Scheduler: scheduler}
err = hservice.Start()
assert.NoError(t, err)
Expand All @@ -65,6 +71,7 @@ func TestHTTPService(t *testing.T) {
baseUrl := "http://" + hservice.Address
testHTTP(t, baseUrl)
testSchedulerHTTP(t, baseUrl)
testRecorderHTTP(t, baseUrl)

err = hservice.Stop()
assert.NoError(t, err)
Expand Down
76 changes: 76 additions & 0 deletions services/recorder_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package services

import (
"net/http"

"github.com/gin-gonic/gin"

"github.com/agscheduler/agscheduler"
)

type req struct {
Page int `form:"page"`
PageSize int `form:"page_size"`
}

type rHTTPService struct {
recorder *agscheduler.Recorder
}

func (rhs *rHTTPService) handleErr(err error) string {
if err != nil {
return err.Error()
} else {
return ""
}
}

func (rhs *rHTTPService) getRecords(c *gin.Context) {
var r req
if err := c.ShouldBindQuery(&r); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": rhs.handleErr(err)})
return
}

r.Page = fixPositiveNum(r.Page, 1)
r.PageSize = fixPositiveNumMax(fixPositiveNum(r.PageSize, 10), 1000)

var rs []agscheduler.Record
var total int64
var err error
jobId := c.Param("job_id")
if jobId != "" {
rs, total, err = rhs.recorder.GetRecords(c.Param("job_id"), r.Page, r.PageSize)
} else {
rs, total, err = rhs.recorder.GetAllRecords(r.Page, r.PageSize)
}
c.JSON(http.StatusOK, gin.H{
"data": gin.H{
"res": rs,
"page": r.Page,
"page_size": r.PageSize,
"total": total},
"error": rhs.handleErr(err),
})
}

func (rhs *rHTTPService) registerRoutes(r *gin.Engine) {
r.GET("/recorder/records/:job_id", rhs.getRecords)
r.GET("/recorder/records", rhs.getRecords)
}

func fixPositiveNum(num, numDef int) int {
if num < 1 {
return numDef
}

return num
}

func fixPositiveNumMax(num, numMax int) int {
if num > numMax {
return numMax
}

return num
}
34 changes: 34 additions & 0 deletions services/recorder_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package services

import (
"encoding/json"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func testRecorderHTTP(t *testing.T, baseUrl string) {
resp, err := http.Get(baseUrl + "/recorder/records/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)
body, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
rJ := &result{}
err = json.Unmarshal(body, &rJ)
assert.NoError(t, err)
total := rJ.Data.(map[string]any)["total"].(float64)

resp, err = http.Get(baseUrl + "/recorder/records")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)
body, err = io.ReadAll(resp.Body)
assert.NoError(t, err)
rJ = &result{}
err = json.Unmarshal(body, &rJ)
assert.NoError(t, err)
totalAll := rJ.Data.(map[string]any)["total"].(float64)

assert.Less(t, total, totalAll)
}

0 comments on commit fd73430

Please sign in to comment.