forked from Azure/azure-service-bus-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue_manager.go
367 lines (313 loc) · 11.2 KB
/
queue_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package servicebus
import (
"context"
"encoding/xml"
"errors"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/Azure/go-autorest/autorest/to"
"github.com/devigned/tab"
"github.com/Azure/azure-service-bus-go/atom"
)
type (
// QueueManager provides CRUD functionality for Service Bus Queues
QueueManager struct {
*entityManager
}
// Entity is represents the most basic form of an Azure Service Bus entity.
Entity struct {
Name string
ID string
}
// QueueEntity is the Azure Service Bus description of a Queue for management activities
QueueEntity struct {
*QueueDescription
*Entity
}
// queueFeed is a specialized feed containing QueueEntries
queueFeed struct {
*atom.Feed
Entries []queueEntry `xml:"entry"`
}
// queueEntry is a specialized Queue feed entry
queueEntry struct {
*atom.Entry
Content *queueContent `xml:"content"`
}
// QueueManagementOption represents named configuration options for queue mutation
QueueManagementOption func(*QueueDescription) error
// Targetable provides the ability to forward messages to the entity
Targetable interface {
TargetURI() string
}
)
// TargetURI provides an absolute address to a target entity
func (e Entity) TargetURI() string {
split := strings.Split(e.ID, "?")
return split[0]
}
func queueEntryToEntity(entry *queueEntry) *QueueEntity {
return &QueueEntity{
QueueDescription: &entry.Content.QueueDescription,
Entity: &Entity{
Name: entry.Title,
ID: entry.ID,
},
}
}
/*
QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased
storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure
FIFO message retrieval:
SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the
partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled
by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of
session states.
PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses
the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional
messages. The partition key ensures that all messages that are sent within a transaction are handled by the same
messaging broker.
MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId
property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that
all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and
eliminate duplicate messages
*/
func QueueEntityWithPartitioning() QueueManagementOption {
return func(queue *QueueDescription) error {
queue.EnablePartitioning = ptrBool(true)
return nil
}
}
// QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of
// the memory allocated for the queue. Default is 1 MB (1 * 1024).
//
// size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku
func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption {
return func(q *QueueDescription) error {
if size < 1024 || size > 80*1024 {
return errors.New("QueueEntityWithMaxSizeInMegabytes: must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku")
}
int32Size := int32(size)
q.MaxSizeInMegabytes = &int32Size
return nil
}
}
// QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window
// is not specified, then it uses the default of 10 minutes.
func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption {
return func(q *QueueDescription) error {
q.RequiresDuplicateDetection = ptrBool(true)
if window != nil {
q.DuplicateDetectionHistoryTimeWindow = ptrString(durationTo8601Seconds(*window))
}
return nil
}
}
// QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs
func QueueEntityWithRequiredSessions() QueueManagementOption {
return func(q *QueueDescription) error {
q.RequiresSession = ptrBool(true)
return nil
}
}
// QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue
func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption {
return func(q *QueueDescription) error {
q.DeadLetteringOnMessageExpiration = ptrBool(true)
return nil
}
}
// QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The
// minimum duration is 5 minutes.
func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption {
return func(q *QueueDescription) error {
if window != nil {
if window.Minutes() < 5 {
return errors.New("QueueEntityWithAutoDeleteOnIdle: window must be greater than 5 minutes")
}
q.AutoDeleteOnIdle = ptrString(durationTo8601Seconds(*window))
}
return nil
}
}
// QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which
// the message expires, starting from when the message is sent to Service Bus. This is the default value used when
// TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption {
return func(q *QueueDescription) error {
if window == nil {
duration := time.Duration(14 * 24 * time.Hour)
window = &duration
}
q.DefaultMessageTimeToLive = ptrString(durationTo8601Seconds(*window))
return nil
}
}
// QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the
// message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1
// minute.
func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption {
return func(q *QueueDescription) error {
if window == nil {
duration := time.Duration(1 * time.Minute)
window = &duration
}
q.LockDuration = ptrString(durationTo8601Seconds(*window))
return nil
}
}
// QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.
//
// The ability to AutoForward to a target requires the connection have management authorization. If the connection
// string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be
// returned on the PUT.
func QueueEntityWithAutoForward(target Targetable) QueueManagementOption {
return func(q *QueueDescription) error {
uri := target.TargetURI()
q.ForwardTo = &uri
return nil
}
}
// QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to
// the specified target.
//
// The ability to forward dead letter messages to a target requires the connection have management authorization. If
// the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized
// error will be returned on the PUT.
func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption {
return func(q *QueueDescription) error {
uri := target.TargetURI()
q.ForwardDeadLetteredMessagesTo = &uri
return nil
}
}
// QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before
// dead-lettering the message
func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption {
return func(q *QueueDescription) error {
q.MaxDeliveryCount = &count
return nil
}
}
// NewQueueManager creates a new QueueManager for a Service Bus Namespace
func (ns *Namespace) NewQueueManager() *QueueManager {
return &QueueManager{
entityManager: newEntityManager(ns.getHTTPSHostURI(), ns.TokenProvider),
}
}
// Delete deletes a Service Bus Queue entity by name
func (qm *QueueManager) Delete(ctx context.Context, name string) error {
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.Delete")
defer span.End()
res, err := qm.entityManager.Delete(ctx, "/"+name)
defer closeRes(ctx, res)
return err
}
// Put creates or updates a Service Bus Queue
func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error) {
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.Put")
defer span.End()
qd := new(QueueDescription)
for _, opt := range opts {
if err := opt(qd); err != nil {
tab.For(ctx).Error(err)
return nil, err
}
}
qd.ServiceBusSchema = to.StringPtr(serviceBusSchema)
qe := &queueEntry{
Entry: &atom.Entry{
AtomSchema: atomSchema,
},
Content: &queueContent{
Type: applicationXML,
QueueDescription: *qd,
},
}
var mw []MiddlewareFunc
if qd.ForwardTo != nil {
mw = append(mw, addSupplementalAuthorization(*qd.ForwardTo, qm.TokenProvider()))
}
if qd.ForwardDeadLetteredMessagesTo != nil {
mw = append(mw, addDeadLetterSupplementalAuthorization(*qd.ForwardDeadLetteredMessagesTo, qm.TokenProvider()))
}
reqBytes, err := xml.Marshal(qe)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
reqBytes = xmlDoc(reqBytes)
res, err := qm.entityManager.Put(ctx, "/"+name, reqBytes, mw...)
defer closeRes(ctx, res)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
var entry queueEntry
err = xml.Unmarshal(b, &entry)
if err != nil {
return nil, formatManagementError(b)
}
return queueEntryToEntity(&entry), nil
}
// List fetches all of the queues for a Service Bus Namespace
func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error) {
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.List")
defer span.End()
res, err := qm.entityManager.Get(ctx, `/$Resources/Queues`)
defer closeRes(ctx, res)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
var feed queueFeed
err = xml.Unmarshal(b, &feed)
if err != nil {
return nil, formatManagementError(b)
}
qd := make([]*QueueEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
qd[idx] = queueEntryToEntity(&entry)
}
return qd, nil
}
// Get fetches a Service Bus Queue entity by name
func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error) {
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.Get")
defer span.End()
res, err := qm.entityManager.Get(ctx, name)
defer closeRes(ctx, res)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
if res.StatusCode == http.StatusNotFound {
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
var entry queueEntry
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
return queueEntryToEntity(&entry), nil
}