-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription.go
90 lines (76 loc) · 1.89 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package stompy
import (
"sync"
"github.com/nu7hatch/gouuid"
)
//the subscription handler type defines the function signature that should be passed when subscribing to queues
type SubscriptionHandler func(Frame)
type subscription struct {
Id string
Destination string
Handler SubscriptionHandler
AddedHeaders StompHeaders
}
func newSubscription(destination string, handler SubscriptionHandler, headers StompHeaders) (subscription, error) {
sub := subscription{}
id, err := uuid.NewV4()
if nil != err {
return sub, err
}
sub.Id = id.String()
sub.Destination = destination
sub.Handler = handler
sub.AddedHeaders = headers
return sub, nil
}
//lockable struct for mapping subscription ids to their handlers
type subscriptions struct {
sync.Mutex
subs map[string]subscription
}
func newSubscriptions() *subscriptions {
return &subscriptions{subs: make(map[string]subscription)}
}
func (s *subscriptions) dispatch(incoming chan Frame) {
var forward = func(f Frame) {
id := f.Headers["subscription"]
if sub, ok := s.subs[id]; ok {
go sub.Handler(f)
}
}
for f := range incoming {
cmd := f.CommandString()
switch cmd {
case "MESSAGE":
forward(f)
break
case "ERROR":
forward(f)
break
case "RECEIPT":
if receiptId, ok := f.Headers["receipt-id"]; ok {
if receipt := awaitingReceipt.Get(receiptId); nil != receipt {
//if some one is listening on the channel send the receipt make sure to remove receipt
receipt.receiptReceived <- true
}
}
break
}
}
}
func (s *subscriptions) addSubscription(sub subscription) error {
s.Lock()
defer s.Unlock()
if _, ok := s.subs[sub.Id]; ok {
return ClientError("subscription already exists with that id")
}
s.subs[sub.Id] = sub
return nil
}
func (s *subscriptions) removeSubscription(subId string) {
s.Lock()
defer s.Unlock()
if _, ok := s.subs[subId]; ok {
delete(s.subs, subId)
}
}