Skip to content

Commit

Permalink
Merge pull request #20 from kaleido-io/eventstreams-fix
Browse files Browse the repository at this point in the history
Fix event stream topic selection
  • Loading branch information
peterbroadhurst authored Aug 19, 2022
2 parents c1bd3e8 + dca4589 commit 50290f6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
5 changes: 2 additions & 3 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ var esDefaults struct {
blockedRetryDelay fftypes.FFDuration
webhookRequestTimeout fftypes.FFDuration
websocketDistributionMode apitypes.DistributionMode
topic string
retry *retry.Retry
}

Expand Down Expand Up @@ -164,7 +163,7 @@ func (es *eventStream) initAction(startedState *startedStreamState) {
case apitypes.EventStreamTypeWebhook:
startedState.action = newWebhookAction(ctx, es.spec.Webhook).attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.WebSocket.Topic).attemptBatch
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
Expand Down Expand Up @@ -237,7 +236,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda
changed = apitypes.CheckUpdateEnum(changed, &merged.Type, base.Type, updates.Type, apitypes.EventStreamTypeWebSocket)
switch *merged.Type {
case apitypes.EventStreamTypeWebSocket:
if merged.WebSocket, changed, err = mergeValidateWsConfig(ctx, changed, base.WebSocket, updates.WebSocket); err != nil {
if merged.WebSocket, changed, err = mergeValidateWsConfig(ctx, changed, *merged.Name, base.WebSocket, updates.WebSocket); err != nil {
return nil, false, err
}
case apitypes.EventStreamTypeWebhook:
Expand Down
15 changes: 11 additions & 4 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func TestConfigNewDefaultsUpdate(t *testing.T) {
InitDefaults()

es := testESConf(t, `{
"name": "test1"
"name": "test1",
"websocket": {
"topic": "test1"
}
}`)
es, changed, err := mergeValidateEsConfig(context.Background(), nil, es)
assert.NoError(t, err)
Expand All @@ -157,7 +160,7 @@ func TestConfigNewDefaultsUpdate(t *testing.T) {
"type":"websocket",
"websocket": {
"distributionMode":"load_balance",
"topic":""
"topic":"test1"
}
}`, string(b))

Expand Down Expand Up @@ -297,7 +300,10 @@ func TestInitActionBadAction(t *testing.T) {
func TestWebSocketEventStreamsE2EMigrationThenStart(t *testing.T) {

es := newTestEventStream(t, `{
"name": "ut_stream"
"name": "ut_stream",
"websocket": {
"topic": "ut_stream"
}
}`)

addr := "0x12345"
Expand Down Expand Up @@ -1198,7 +1204,8 @@ func TestWebSocketBroadcastActionCloseDuringCheckpoint(t *testing.T) {
es := newTestEventStream(t, `{
"name": "ut_stream",
"websocket": {
"distributionMode": "broadcast"
"distributionMode": "broadcast",
"topic": "ut_stream"
}
}`)

Expand Down
4 changes: 2 additions & 2 deletions internal/events/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func mergeValidateWsConfig(ctx context.Context, changed bool, base *apitypes.WebSocketConfig, updates *apitypes.WebSocketConfig) (*apitypes.WebSocketConfig, bool, error) {
func mergeValidateWsConfig(ctx context.Context, changed bool, esName string, base *apitypes.WebSocketConfig, updates *apitypes.WebSocketConfig) (*apitypes.WebSocketConfig, bool, error) {

if base == nil {
base = &apitypes.WebSocketConfig{}
Expand All @@ -48,7 +48,7 @@ func mergeValidateWsConfig(ctx context.Context, changed bool, base *apitypes.Web
}

// Topic
changed = apitypes.CheckUpdateString(changed, &merged.Topic, base.Topic, updates.Topic, esDefaults.topic)
changed = apitypes.CheckUpdateString(changed, &merged.Topic, base.Topic, updates.Topic, esName /* default to the ES name */)

return merged, changed, nil
}
Expand Down

0 comments on commit 50290f6

Please sign in to comment.