-
Notifications
You must be signed in to change notification settings - Fork 4
/
delayq.go
70 lines (59 loc) · 1.89 KB
/
delayq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package delayq
import (
"context"
"encoding/json"
"errors"
"time"
)
var ErrNoItem = errors.New("no ready items")
// DelayQ is a queue with a time constraint on each item. Items in
// the queue should be ready for dequeue only at/after the At timestamp
// set for each item.
type DelayQ interface {
// Enqueue should enqueue all items atomically into the queue storage.
// If any item fails, all items should be dequeued.
Enqueue(ctx context.Context, items ...Item) error
// Dequeue should dequeue available items relative to the given time
// and invoke 'fn' for each item safely. Dequeue must ensure that in
// case of failures or context cancellation, the items being dequeued
// are released back to the queue storage.
Dequeue(ctx context.Context, relativeTo time.Time, fn Process) error
// Delete should delete the given item from the queue storage.
Delete(ctx context.Context, items ...Item) error
}
// Process function is invoked for every item that becomes ready. An item
// remains on the queue until this function returns without error.
type Process func(ctx context.Context, item Item) error
// Item represents an item to be pushed to the queue.
type Item struct {
At time.Time `json:"at"`
Value string `json:"value"`
}
// JSON returns the JSON representation of the item.
func (itm *Item) JSON() string {
b, err := json.Marshal(itm)
if err != nil {
// this should never happen since Item is meant for JSON.
panic(err)
}
return string(b)
}
func (itm *Item) MarshalJSON() ([]byte, error) {
return json.Marshal(itemJSONModel{
At: itm.At.Unix(),
Value: itm.Value,
})
}
func (itm *Item) UnmarshalJSON(bytes []byte) error {
var model itemJSONModel
if err := json.Unmarshal(bytes, &model); err != nil {
return err
}
itm.At = time.Unix(model.At, 0).UTC()
itm.Value = model.Value
return nil
}
type itemJSONModel struct {
At int64 `json:"at"`
Value string `json:"value"`
}