Skip to content

Commit

Permalink
Merge pull request #1447 from kaleido-io/ws-batch
Browse files Browse the repository at this point in the history
Enable batch delivery over WebSockets
  • Loading branch information
peterbroadhurst authored Jan 19, 2024
2 parents 57141ca + fee90bd commit fd542c0
Show file tree
Hide file tree
Showing 19 changed files with 855 additions and 388 deletions.
7 changes: 4 additions & 3 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
Expand All @@ -249,7 +249,7 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

Expand Down Expand Up @@ -1387,7 +1387,8 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
|batchTimeout|Default batch timeout|`int`|`50ms`

## subscription.retry

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wsack.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ nav_order: 24

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
| `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wserror.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ nav_order: 25

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `error` | WSAck.error | `string` |

2 changes: 1 addition & 1 deletion docs/reference/types/wsstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ nav_order: 23

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `autoack` | WSStart.autoack | `bool` |
| `namespace` | WSStart.namespace | `string` |
| `name` | WSStart.name | `string` |
Expand Down
15 changes: 9 additions & 6 deletions internal/coreconfig/coreconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -327,8 +327,10 @@ var (
OrgDescription = ffc("org.description")
// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
// SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchTimeout default batch timeout
SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
SubscriptionMax = ffc("subscription.max")
// SubscriptionsRetryInitialDelay is the initial retry delay
Expand Down Expand Up @@ -404,7 +406,7 @@ func setDefaults() {
viper.SetDefault(string(DownloadRetryFactor), 2.0)
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
viper.SetDefault(string(EventAggregatorBatchSize), 200)
viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
Expand All @@ -414,7 +416,7 @@ func setDefaults() {
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
viper.SetDefault(string(EventDBEventsBufferSize), 100)
viper.SetDefault(string(EventDispatcherBufferLength), 5)
viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
Expand Down Expand Up @@ -451,7 +453,8 @@ func setDefaults() {
viper.SetDefault(string(PrivateMessagingBatchSize), 200)
viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
viper.SetDefault(string(SubscriptionMax), 500)
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
Expand Down
7 changes: 4 additions & 3 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -383,8 +383,9 @@ var (
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)

ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)

ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
Expand Down
153 changes: 69 additions & 84 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -65,25 +65,21 @@ type eventDispatcher struct {
elected bool
eventPoller *eventPoller
inflight map[fftypes.UUID]*core.Event
eventDelivery chan *core.EventDelivery
eventDelivery chan []*core.EventDelivery
mux sync.Mutex
namespace string
readAhead int
batch bool
batchTimeout time.Duration
subscription *subscription
txHelper txcommon.Helper
}

func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
ctx, cancelCtx := context.WithCancel(ctx)
readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
readAhead := uint(0)
if sub.definition.Options.ReadAhead != nil {
readAhead = uint(*sub.definition.Options.ReadAhead)
}
if readAhead > maxReadAhead {
readAhead = maxReadAhead
}

batchTimeout := defaultBatchTimeout
if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
Expand All @@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
subscription: sub,
namespace: sub.definition.Namespace,
inflight: make(map[fftypes.UUID]*core.Event),
eventDelivery: make(chan *core.EventDelivery, readAhead+1),
eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
readAhead: int(readAhead),
acksNacks: make(chan ackNack),
closed: make(chan struct{}),
txHelper: txHelper,
batch: batch,
batchTimeout: batchTimeout,
}

pollerConf := &eventPollerConf{
Expand All @@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
firstEvent: sub.definition.Options.FirstEvent,
}

// Users can tune the batch related settings.
// This is always true in batch:true cases, and optionally you can use the batchTimeout setting
// to tweak how we optimize ourselves for readahead / latency detection without batching
// (most likely users with this requirement would be best to just move to batch:true).
if batchTimeout > 0 {
pollerConf.eventBatchTimeout = batchTimeout
if batchTimeout > pollerConf.eventPollTimeout {
pollerConf.eventPollTimeout = batchTimeout
}
}
if batch || pollerConf.eventBatchSize < int(readAhead) {
pollerConf.eventBatchSize = ed.readAhead
}

ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
return ed
}
Expand Down Expand Up @@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
ed.elected = true
ed.eventPoller.start()

if ed.batch {
go ed.deliverBatchedEvents()
} else {
go ed.deliverEvents()
}
go ed.deliverEvents()

// Wait until the event poller closes
<-ed.eventPoller.closed
}
Expand Down Expand Up @@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
ed.mux.Unlock()

dispatched++
ed.eventDelivery <- event
if !ed.batch {
// dispatch individually
ed.eventDelivery <- []*core.EventDelivery{event}
}
}

if ed.batch && len(dispatchable) > 0 {
// Dispatch the whole batch now marked in-flight
ed.eventDelivery <- dispatchable
}

if inflightCount == 0 {
Expand Down Expand Up @@ -384,88 +398,59 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

func (ed *eventDispatcher) deliverBatchedEvents() {
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData

var events []*core.CombinedEventDataDelivery
var batchTimeoutContext context.Context
var batchTimeoutCancel func()
for {
var timeoutContext context.Context
var timedOut bool
if batchTimeoutContext != nil {
timeoutContext = batchTimeoutContext
} else {
timeoutContext = ed.ctx
}
select {
case event, ok := <-ed.eventDelivery:
case events, ok := <-ed.eventDelivery:
if !ok {
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if events == nil {
events = []*core.CombinedEventDataDelivery{}
batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
}

log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)

var data []*core.Data
// As soon as we hit an error, we need to trigger into nack mode
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
}

events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})

if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}

case <-timeoutContext.Done():
timedOut = true
case <-ed.ctx.Done():
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
_ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
// If err handle all the delivery responses for all the events??
events = nil
}
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
select {
case event, ok := <-ed.eventDelivery:
if !ok {
return
// Loop through the events enriching them, and dispatching individually in non-batch mode
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events); i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
// The first error we encounter stops us attempting to enrich or dispatch any more events
if err == nil {
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
}
// If we are non-batched, we have to deliver each event individually...
if !ed.batch {
// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
var data []*core.Data
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
// In batch mode we do one dispatch of the whole set as one
if ed.batch {
// Only attempt to deliver if we're in a non error case (enrich might have failed above)
if err == nil {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
}
// If we're in an error case we have to nack everything immediately
if err != nil {
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}
case <-ed.ctx.Done():
return
}
Expand Down
Loading

0 comments on commit fd542c0

Please sign in to comment.