-
Notifications
You must be signed in to change notification settings - Fork 11
/
event.go
141 lines (113 loc) · 2.38 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//go:generate go-enum -f=$GOFILE --nocase
package cache
import (
"context"
"encoding/json"
"errors"
"sync"
)
var (
// errSelfEvent indicates event triggered by itself.
errSelfEvent = errors.New("event triggered by itself")
// errNoEventType indicates no event types
errNoEventType = errors.New("no event types")
)
// eventType is an enumeration of events used to communicate with each other via Pubsub.
/*
ENUM(
None // Not registered Event by default.
Evict // Evict presents eviction event.
)
*/
type eventType int32
var regTopicEventMap map[string]eventType
func init() {
regTopicEventMap = map[string]eventType{}
for typ := range _eventTypeMap {
if typ == EventTypeNone {
continue
}
regTopicEventMap[typ.Topic()] = typ
}
}
// Topic generates the topic for specified event.
func (x eventType) Topic() string {
return getTopic(x.String())
}
type event struct {
Type eventType
Body eventBody
}
type eventBody struct {
FID string
Keys []string
}
type messageBroker struct {
pubsub Pubsub
fid string
wg sync.WaitGroup
}
func newMessageBroker(fid string, pb Pubsub) *messageBroker {
return &messageBroker{
fid: fid,
pubsub: pb,
}
}
func (mb *messageBroker) registered() bool {
return mb.pubsub != nil
}
func (mb *messageBroker) close() {
if !mb.registered() {
return
}
// close s
mb.pubsub.Close()
mb.wg.Wait()
}
func (mb *messageBroker) send(ctx context.Context, e event) error {
if !mb.registered() {
return nil
}
e.Body.FID = mb.fid
bs, err := json.Marshal(e.Body)
if err != nil {
return err
}
return mb.pubsub.Pub(ctx, e.Type.Topic(), bs)
}
func (mb *messageBroker) listen(
ctx context.Context, types []eventType, cb func(context.Context, *event, error),
) error {
if !mb.registered() {
return nil
}
if len(types) == 0 {
return errNoEventType
}
topics := make([]string, len(types))
for i := 0; i < len(types); i++ {
topics[i] = types[i].Topic()
}
mb.wg.Add(1)
go func() {
defer mb.wg.Done()
for mess := range mb.pubsub.Sub(ctx, topics...) {
typ, ok := regTopicEventMap[mess.Topic()]
if !ok {
cb(ctx, nil, errors.New("no such topic registered"))
continue
}
e := event{Type: typ}
if err := json.Unmarshal(mess.Content(), &e.Body); err != nil {
cb(ctx, nil, err)
continue
}
if e.Body.FID == mb.fid {
cb(ctx, &e, errSelfEvent)
continue
}
cb(ctx, &e, nil)
}
}()
return nil
}