From fee90bd3f04ecf63b1d8e0025542c84d99592d99 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 19 Jan 2024 12:45:49 -0500 Subject: [PATCH] Ensure nacks sent for whole batch Signed-off-by: Peter Broadhurst --- internal/events/event_dispatcher.go | 39 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index f667336d6..8afd99e39 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -398,7 +398,6 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) { } } -// TODO issue here, we can't just call DeliveryRequest with one thing. func (ed *eventDispatcher) deliverEvents() { withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData for { @@ -410,30 +409,42 @@ func (ed *eventDispatcher) deliverEvents() { // As soon as we hit an error, we need to trigger into nack mode var err error + + // Loop through the events enriching them, and dispatching individually in non-batch mode eventsWithData := make([]*core.CombinedEventDataDelivery, len(events)) - for i := 0; i < len(events) && err == nil; i++ { + for i := 0; i < len(events); i++ { e := &core.CombinedEventDataDelivery{ Event: events[i], } eventsWithData[i] = e - log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference) - if withData && e.Event.Message != nil { - e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message) - } - // Individual events (in reality there is only ever i==0 for this case) - if err == nil && !ed.batch { - err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data) + // The first error we encounter stops us attempting to enrich or dispatch any more events + if err == nil { + log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference) + if withData && e.Event.Message != nil { + e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message) + } } - if err != nil { - ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + // If we are non-batched, we have to deliver each event individually... + if !ed.batch { + // .. only attempt to deliver if we've not triggered into an error scenario for one of the events already + if err == nil { + err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data) + } + // ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events + if err != nil { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + } } } // In batch mode we do one dispatch of the whole set as one - if err == nil && ed.batch { - err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData) + if ed.batch { + // Only attempt to deliver if we're in a non error case (enrich might have failed above) + if err == nil { + err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData) + } + // If we're in an error case we have to nack everything immediately if err != nil { - // nack everything on behalf of the failed delivery for _, e := range events { ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) }