Skip to content

Commit

Permalink
Ensure nacks sent for whole batch
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jan 19, 2024
1 parent 7a26e8b commit fee90bd
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
Expand All @@ -410,30 +409,42 @@ func (ed *eventDispatcher) deliverEvents() {

// As soon as we hit an error, we need to trigger into nack mode
var err error

// Loop through the events enriching them, and dispatching individually in non-batch mode
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events) && err == nil; i++ {
for i := 0; i < len(events); i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
// Individual events (in reality there is only ever i==0 for this case)
if err == nil && !ed.batch {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
// The first error we encounter stops us attempting to enrich or dispatch any more events
if err == nil {
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
// If we are non-batched, we have to deliver each event individually...
if !ed.batch {
// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

// In batch mode we do one dispatch of the whole set as one
if err == nil && ed.batch {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
if ed.batch {
// Only attempt to deliver if we're in a non error case (enrich might have failed above)
if err == nil {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
}
// If we're in an error case we have to nack everything immediately
if err != nil {
// nack everything on behalf of the failed delivery
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
Expand Down

0 comments on commit fee90bd

Please sign in to comment.