This repository has been archived by the owner on Jun 5, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathagent.go
228 lines (209 loc) · 5.96 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package stun
import (
"errors"
"sync"
"time"
)
// NoopHandler just discards any event.
var NoopHandler Handler = func(e Event) {}
// NewAgent initializes and returns new Agent with provided handler.
// If h is nil, the NoopHandler will be used.
func NewAgent(h Handler) *Agent {
if h == nil {
h = NoopHandler
}
a := &Agent{
transactions: make(map[transactionID]agentTransaction),
handler: h,
}
return a
}
// Agent is low-level abstraction over transaction list that
// handles concurrency (all calls are goroutine-safe) and
// time outs (via Collect call).
type Agent struct {
// transactions is map of transactions that are currently
// in progress. Event handling is done in such way when
// transaction is unregistered before agentTransaction access,
// minimizing mux lock and protecting agentTransaction from
// data races via unexpected concurrent access.
transactions map[transactionID]agentTransaction
closed bool // all calls are invalid if true
mux sync.Mutex // protects transactions and closed
handler Handler // handles transactions
}
// Handler handles state changes of transaction.
//
// Handler is called on transaction state change.
// Usage of e is valid only during call, user must
// copy needed fields explicitly.
type Handler func(e Event)
// Event is passed to Handler describing the transaction event.
// Do not reuse outside Handler.
type Event struct {
TransactionID [TransactionIDSize]byte
Message *Message
Error error
}
// agentTransaction represents transaction in progress.
// Concurrent access is invalid.
type agentTransaction struct {
id transactionID
deadline time.Time
}
var (
// ErrTransactionStopped indicates that transaction was manually stopped.
ErrTransactionStopped = errors.New("transaction is stopped")
// ErrTransactionNotExists indicates that agent failed to find transaction.
ErrTransactionNotExists = errors.New("transaction not exists")
// ErrTransactionExists indicates that transaction with same id is already
// registered.
ErrTransactionExists = errors.New("transaction exists with same id")
)
// StopWithError removes transaction from list and calls handler with
// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
func (a *Agent) StopWithError(id [TransactionIDSize]byte, err error) error {
a.mux.Lock()
if a.closed {
a.mux.Unlock()
return ErrAgentClosed
}
t, exists := a.transactions[id]
delete(a.transactions, id)
h := a.handler
a.mux.Unlock()
if !exists {
return ErrTransactionNotExists
}
h(Event{
TransactionID: t.id,
Error: err,
})
return nil
}
// Stop stops transaction by id with ErrTransactionStopped, blocking
// until handler returns.
func (a *Agent) Stop(id [TransactionIDSize]byte) error {
return a.StopWithError(id, ErrTransactionStopped)
}
// ErrAgentClosed indicates that agent is in closed state and is unable
// to handle transactions.
var ErrAgentClosed = errors.New("agent is closed")
// Start registers transaction with provided id and deadline.
// Could return ErrAgentClosed, ErrTransactionExists.
//
// Agent handler is guaranteed to be eventually called.
func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time) error {
a.mux.Lock()
defer a.mux.Unlock()
if a.closed {
return ErrAgentClosed
}
_, exists := a.transactions[id]
if exists {
return ErrTransactionExists
}
a.transactions[id] = agentTransaction{
id: id,
deadline: deadline,
}
return nil
}
// agentCollectCap is initial capacity for Agent.Collect slices,
// sufficient to make function zero-alloc in most cases.
const agentCollectCap = 100
// ErrTransactionTimeOut indicates that transaction has reached deadline.
var ErrTransactionTimeOut = errors.New("transaction is timed out")
// Collect terminates all transactions that have deadline before provided
// time, blocking until all handlers will process ErrTransactionTimeOut.
// Will return ErrAgentClosed if agent is already closed.
//
// It is safe to call Collect concurrently but makes no sense.
func (a *Agent) Collect(gcTime time.Time) error {
toRemove := make([]transactionID, 0, agentCollectCap)
a.mux.Lock()
if a.closed {
// Doing nothing if agent is closed.
// All transactions should be already closed
// during Close() call.
a.mux.Unlock()
return ErrAgentClosed
}
// Adding all transactions with deadline before gcTime
// to toCall and toRemove slices.
// No allocs if there are less than agentCollectCap
// timed out transactions.
for id, t := range a.transactions {
if t.deadline.Before(gcTime) {
toRemove = append(toRemove, id)
}
}
// Un-registering timed out transactions.
for _, id := range toRemove {
delete(a.transactions, id)
}
// Calling handler does not require locked mutex,
// reducing lock time.
h := a.handler
a.mux.Unlock()
// Sending ErrTransactionTimeOut to handler for all transactions,
// blocking until last one.
event := Event{
Error: ErrTransactionTimeOut,
}
for _, id := range toRemove {
event.TransactionID = id
h(event)
}
return nil
}
// Process incoming message, synchronously passing it to handler.
func (a *Agent) Process(m *Message) error {
e := Event{
TransactionID: m.TransactionID,
Message: m,
}
a.mux.Lock()
if a.closed {
a.mux.Unlock()
return ErrAgentClosed
}
h := a.handler
delete(a.transactions, m.TransactionID)
a.mux.Unlock()
h(e)
return nil
}
// SetHandler sets agent handler to h.
func (a *Agent) SetHandler(h Handler) error {
a.mux.Lock()
if a.closed {
a.mux.Unlock()
return ErrAgentClosed
}
a.handler = h
a.mux.Unlock()
return nil
}
// Close terminates all transactions with ErrAgentClosed and renders Agent to
// closed state.
func (a *Agent) Close() error {
e := Event{
Error: ErrAgentClosed,
}
a.mux.Lock()
if a.closed {
a.mux.Unlock()
return ErrAgentClosed
}
for _, t := range a.transactions {
e.TransactionID = t.id
a.handler(e)
}
a.transactions = nil
a.closed = true
a.handler = nil
a.mux.Unlock()
return nil
}
type transactionID [TransactionIDSize]byte