-
Notifications
You must be signed in to change notification settings - Fork 11
/
factory.go
185 lines (161 loc) · 4.64 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package cache
import (
"context"
"encoding/json"
"errors"
"sync"
"github.com/google/uuid"
)
var (
// usedPrefixs records the prefixes registered before
usedPrefixs = map[string]struct{}{}
// decoupling
uuidString = uuid.New().String
)
func newFactory(sharedCache Adapter, localCache Adapter, options ...FactoryOptions) Factory {
// load options
o := loadFactoryOptions(options...)
// need to specify marshalFunc and unmarshalFunc at the same time
if o.marshalFunc == nil && o.unmarshalFunc != nil {
panic(errors.New("both of Marshal and Unmarshal functions need to be specified"))
} else if o.marshalFunc != nil && o.unmarshalFunc == nil {
panic(errors.New("both of Marshal and Unmarshal functions need to be specified"))
}
var marshalFunc MarshalFunc
var unmarshalFunc UnmarshalFunc
marshalFunc = json.Marshal
unmarshalFunc = json.Unmarshal
if o.marshalFunc != nil {
marshalFunc = o.marshalFunc
}
if o.unmarshalFunc != nil {
unmarshalFunc = o.unmarshalFunc
}
id := uuidString()
f := &factory{
id: id,
sharedCache: sharedCache,
localCache: localCache,
mb: newMessageBroker(id, o.pubsub),
marshal: marshalFunc,
unmarshal: unmarshalFunc,
onCacheHit: o.onCacheHit,
onCacheMiss: o.onCacheMiss,
onLCCostAdd: o.onLCCostAdd,
onLCCostEvict: o.onLCCostEvict,
}
// subscribing events
f.mb.listen(context.TODO(), []eventType{EventTypeEvict}, f.subscribedEventsHandler())
return f
}
type factory struct {
sharedCache Adapter
localCache Adapter
mb *messageBroker
marshal MarshalFunc
unmarshal UnmarshalFunc
onCacheHit func(prefix string, key string, count int)
onCacheMiss func(prefix string, key string, count int)
onLCCostAdd func(prefix string, key string, cost int)
onLCCostEvict func(prefix string, key string, cost int)
id string
closeOnce sync.Once
}
func (f *factory) NewCache(settings []Setting) Cache {
m := map[string]*config{}
for _, setting := range settings {
// check prefix
if setting.Prefix == "" {
panic(errors.New("not allowed empty prefix"))
}
if _, ok := usedPrefixs[setting.Prefix]; ok {
panic(errors.New("duplicated prefix"))
}
usedPrefixs[setting.Prefix] = struct{}{}
cfg := &config{
mGetter: setting.MGetter,
marshal: f.marshal,
unmarshal: f.unmarshal,
}
// need to specify marshalFunc and unmarshalFunc at the same time
if setting.MarshalFunc == nil && setting.UnmarshalFunc != nil {
panic(errors.New("both of Marshal and Unmarshal functions need to be specified"))
} else if setting.MarshalFunc != nil && setting.UnmarshalFunc == nil {
panic(errors.New("both of Marshal and Unmarshal functions need to be specified"))
}
if setting.MarshalFunc != nil {
cfg.marshal = setting.MarshalFunc
}
if setting.UnmarshalFunc != nil {
cfg.unmarshal = setting.UnmarshalFunc
}
for typ, attr := range setting.CacheAttributes {
if typ == SharedCacheType {
cfg.shared = f.sharedCache
cfg.sharedTTL = attr.TTL
} else if typ == LocalCacheType {
cfg.local = f.localCache
cfg.localTTL = attr.TTL
}
}
// need to indicate at least one cache type
if cfg.shared == nil && cfg.local == nil {
panic(errors.New("no cache type indicated"))
}
m[setting.Prefix] = cfg
}
return &cache{
configs: m,
mb: f.mb,
onCacheHit: func(prefix string, key string, count int) {
// trigger the callback on cache hitted if necessary
if f.onCacheHit != nil {
f.onCacheHit(prefix, key, count)
}
},
onCacheMiss: func(prefix string, key string, count int) {
// trigger the callback on cache missed if necessary
if f.onCacheMiss != nil {
f.onCacheMiss(prefix, key, count)
}
},
onLCCostAdd: func(cKey string, cost int) {
// trigger the callback on local cache added if necessary
if f.onLCCostAdd != nil {
pfx, key := getPrefixAndKey(cKey)
f.onLCCostAdd(pfx, key, cost)
}
},
onLCCostEvict: func(cKey string, cost int) {
// trigger the callback on local cache evicted if necessary
if f.onLCCostEvict != nil {
pfx, key := getPrefixAndKey(cKey)
f.onLCCostEvict(pfx, key, cost)
}
},
}
}
func (f *factory) Close() {
f.closeOnce.Do(func() {
f.mb.close()
})
}
func (f *factory) subscribedEventsHandler() func(ctx context.Context, e *event, err error) {
return func(ctx context.Context, e *event, err error) {
if err == errSelfEvent {
// do nothing
return
} else if err != nil {
// TODO: forward error messages outside
return
}
switch e.Type {
case EventTypeEvict:
keys := e.Body.Keys
if f.localCache != nil && len(keys) > 0 {
// evict local caches
f.localCache.Del(ctx, keys...)
}
}
}
}