forked from Azure/azure-service-bus-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
65 lines (52 loc) · 1.92 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package servicebus
import "context"
type (
// TODO: message session should be available for each of the handler methods
// TODO: write a session manager that will handle a max concurrent session
// Handler exposes the functionality required to process a Service Bus message.
Handler interface {
Handle(context.Context, *Message) error
}
// HandlerFunc is a type converter that allows a func to be used as a `Handler`
HandlerFunc func(context.Context, *Message) error
// SessionHandler exposes a manner of handling a group of messages together. Instances of SessionHandler should be
// passed to a Receiver such as a Queue or Subscription.
SessionHandler interface {
Handler
// Start is called when a Receiver is informed that has acquired a lock on a Service Bus Session.
Start(*MessageSession) error
// End is called when a Receiver is informed that the last message of a Session has been passed to it.
End()
}
)
// Handle redirects this call to the func that was provided.
func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) error {
return hf(ctx, msg)
}
type defaultSessionHandler struct {
Handler
start func(*MessageSession) error
end func()
}
// NewSessionHandler is a type converter that allows three funcs to be tied together into a type that fulfills the
// SessionHandler interface.
func NewSessionHandler(base Handler, start func(*MessageSession) error, end func()) SessionHandler {
return &defaultSessionHandler{
Handler: base,
start: start,
end: end,
}
}
// Start calls the func() that was provided to `NewSessionHandler` when a new session lock is established.
func (dsh defaultSessionHandler) Start(ms *MessageSession) error {
if dsh.start != nil {
return dsh.start(ms)
}
return nil
}
// End calls the func() that was provided to `NewSessionHandler` when a session is finished processing for any reason.
func (dsh defaultSessionHandler) End() {
if dsh.end != nil {
dsh.end()
}
}