-
Notifications
You must be signed in to change notification settings - Fork 0
/
receipts.go
76 lines (64 loc) · 1.41 KB
/
receipts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package stompy
import (
"sync"
"time"
)
type Receipt struct {
Received chan bool
receiptReceived chan bool
Timeout time.Duration
}
func (rec *Receipt) cleanUp(id string) {
awaitingReceipt.Remove(id)
close(rec.receiptReceived)
close(rec.Received)
}
func NewReceipt(timeout time.Duration) *Receipt {
return &Receipt{make(chan bool, 1), make(chan bool, 1), timeout}
}
type receipts struct {
sync.RWMutex
receipts map[string]*Receipt
}
func (r *receipts) Add(id string, rec *Receipt) error {
r.Lock()
defer r.Unlock()
if _, ok := r.receipts[id]; ok {
return ClientError("already a receipt with that id " + id)
}
r.receipts[id] = rec
//make sure we clean up. if we receive our receipt forward it on to the client
//after the timeout send received as false.
go func(receipt *Receipt, id string) {
defer receipt.cleanUp(id)
select {
case <-rec.receiptReceived:
rec.Received <- true
break
case <-time.After(rec.Timeout):
rec.Received <- false
break
}
}(rec, id)
return nil
}
func (r *receipts) Remove(id string) {
r.Lock()
defer r.Unlock()
if _, ok := r.receipts[id]; ok {
delete(r.receipts, id)
return
}
return
}
func (r *receipts) Count() int {
r.RLock()
defer r.RUnlock()
return len(r.receipts)
}
func (r *receipts) Get(id string) *Receipt {
r.RLock()
defer r.RUnlock()
return r.receipts[id]
}
var awaitingReceipt = &receipts{receipts: make(map[string]*Receipt)}