Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a mode for transaction manager to be initiated as a golang module #125

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY}
endef

$(eval $(call makemock, pkg/ffcapi, API, ffcapimocks))
$(eval $(call makemock, pkg/fftm, ModuleFunctions, fftmmocks))
$(eval $(call makemock, pkg/txhandler, TransactionHandler, txhandlermocks))
$(eval $(call makemock, pkg/txhandler, ManagedTxEventHandler, txhandlermocks))
$(eval $(call makemock, internal/metrics, TransactionHandlerMetrics, metricsmocks))
Expand All @@ -41,6 +42,7 @@ $(eval $(call makemock, internal/persistence, RichQuery, per
$(eval $(call makemock, internal/ws, WebSocketChannels, wsmocks))
$(eval $(call makemock, internal/ws, WebSocketServer, wsmocks))
$(eval $(call makemock, internal/events, Stream, eventsmocks))
$(eval $(call makemock, internal/events, InternalEventsDispatcher, eventsmocks))
$(eval $(call makemock, internal/apiclient, FFTMClient, apiclientmocks))

go-mod-tidy: .ALWAYS
Expand Down
17 changes: 15 additions & 2 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func InitDefaults() {
}
}

type eventStreamAction func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
type InternalEventsDispatcher interface {
ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
}

type eventStreamBatch struct {
number int64
Expand All @@ -92,7 +94,7 @@ type startedStreamState struct {
ctx context.Context
cancelCtx func()
startTime *fftypes.FFTime
action eventStreamAction
action func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error
eventLoopDone chan struct{}
batchLoopDone chan struct{}
blockListenerDone chan struct{}
Expand All @@ -110,6 +112,7 @@ type eventStream struct {
confirmations confirmations.Manager
confirmationsRequired int
listeners map[fftypes.UUID]*listener
internalDispatcher InternalEventsDispatcher
wsChannels ws.WebSocketChannels
retry *retry.Retry
currentState *startedStreamState
Expand All @@ -125,15 +128,20 @@ func NewEventStream(
wsChannels ws.WebSocketChannels,
initialListeners []*apitypes.Listener,
eme metrics.EventMetricsEmitter,
internalDispatcher InternalEventsDispatcher,
) (ees Stream, err error) {
esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String())
if persistedSpec.Type != nil && *persistedSpec.Type == apitypes.EventStreamTypeInternal && internalDispatcher == nil {
return nil, i18n.NewError(esCtx, tmmsgs.MsgMissingInternalDispatcher)
}
es := &eventStream{
bgCtx: esCtx,
status: apitypes.EventStreamStatusStopped,
spec: persistedSpec,
connector: connector,
persistence: persistence,
listeners: make(map[fftypes.UUID]*listener),
internalDispatcher: internalDispatcher,
wsChannels: wsChannels,
retry: esDefaults.retry,
checkpointInterval: config.GetDuration(tmconfig.EventStreamsCheckpointInterval),
Expand Down Expand Up @@ -162,6 +170,7 @@ func NewEventStream(

func (es *eventStream) initAction(startedState *startedStreamState) error {
ctx := startedState.ctx

switch *es.spec.Type {
case apitypes.EventStreamTypeWebhook:
wa, err := newWebhookAction(ctx, es.spec.Webhook)
Expand All @@ -171,6 +180,8 @@ func (es *eventStream) initAction(startedState *startedStreamState) error {
startedState.action = wa.attemptBatch
case apitypes.EventStreamTypeWebSocket:
startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch
case apitypes.EventStreamTypeInternal:
startedState.action = es.internalDispatcher.ProcessBatchedEvents
default:
// mergeValidateEsConfig always be called previous to this
panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type))
Expand Down Expand Up @@ -251,6 +262,8 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda
if merged.Webhook, changed, err = mergeValidateWhConfig(ctx, changed, base.Webhook, updates.Webhook); err != nil {
return nil, false, err
}
case apitypes.EventStreamTypeInternal:
// no checks are required for internal listener
default:
return nil, false, i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *merged.Type)
}
Expand Down
137 changes: 131 additions & 6 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/internal/tmconfig"
"github.com/hyperledger/firefly-transaction-manager/internal/ws"
"github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/eventsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/metricsmocks"
"github.com/hyperledger/firefly-transaction-manager/mocks/persistencemocks"
Expand Down Expand Up @@ -69,12 +70,18 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) {

func newTestEventStream(t *testing.T, conf string) (es *eventStream) {
tmconfig.Reset()
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf)
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, nil)
assert.NoError(t, err)
return es
}
func newTestInternalEventStream(t *testing.T, conf string, iedm InternalEventsDispatcher) (es *eventStream) {
tmconfig.Reset()
es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, iedm)
assert.NoError(t, err)
return es
}

func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, listeners ...*apitypes.Listener) (es *eventStream, err error) {
func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, iedm InternalEventsDispatcher, listeners ...*apitypes.Listener) (es *eventStream, err error) {
tmconfig.Reset()
config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us")
InitDefaults()
Expand All @@ -91,6 +98,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str
&wsmocks.WebSocketChannels{},
listeners,
emm,
iedm,
)
mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe()
if err != nil {
Expand Down Expand Up @@ -125,26 +133,48 @@ func TestNewTestEventStreamMissingID(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), &apitypes.EventStream{},
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21048", err)
}

func TestNewTestEventStreamMissingInternalDispatcher(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), &apitypes.EventStream{
Type: &apitypes.EventStreamTypeInternal,
},
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21091", err)
}

func TestNewTestEventStreamBadConfig(t *testing.T) {
tmconfig.Reset()
InitDefaults()
emm := &metricsmocks.EventMetricsEmitter{}

_, err := NewEventStream(context.Background(), testESConf(t, `{}`),
&ffcapimocks.API{},
&persistencemocks.Persistence{},
&wsmocks.WebSocketChannels{},
[]*apitypes.Listener{},
emm,
nil,
)
assert.Regexp(t, "FF21028", err)
}
Expand Down Expand Up @@ -531,6 +561,101 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) {
mfc.AssertExpectations(t)
}

func TestInternalEventStreamsE2EBlocks(t *testing.T) {
idem := &eventsmocks.InternalEventsDispatcher{}

es := newTestInternalEventStream(t, `{
"name": "ut_stream",
"type": "internal"
}`, idem)

l := &apitypes.Listener{
ID: apitypes.NewULID(),
Name: strPtr("ut_listener"),
Type: &apitypes.ListenerTypeBlocks,
FromBlock: strPtr(ffcapi.FromBlockLatest),
}

started := make(chan (chan<- *ffcapi.ListenerEvent), 1)
mfc := es.connector.(*ffcapimocks.API)

mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool {
return r.ID.Equals(es.spec.ID)
})).Run(func(args mock.Arguments) {
r := args[1].(*ffcapi.EventStreamStartRequest)
assert.Empty(t, r.InitialListeners)
}).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil)

mfc.On("EventStreamStopped", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStoppedRequest) bool {
return r.ID.Equals(es.spec.ID)
})).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil)

mcm := es.confirmations.(*confirmationsmocks.Manager)
mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.MatchedBy(func(cp *ffcapi.BlockListenerCheckpoint) bool {
return cp.Block == 10000
}), mock.Anything).Run(func(args mock.Arguments) {
started <- args[4].(chan<- *ffcapi.ListenerEvent)
}).Return(nil)
mcm.On("StopConfirmedBlockListener", mock.Anything, l.ID).Return(nil)

msp := es.persistence.(*persistencemocks.Persistence)
// load existing checkpoint on start
msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(&apitypes.EventStreamCheckpoint{
StreamID: es.spec.ID,
Time: fftypes.Now(),
Listeners: map[fftypes.UUID]json.RawMessage{
*l.ID: []byte(`{"block":10000}`),
},
}, nil)
// write a valid checkpoint
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && string(cp.Listeners[*l.ID]) == `{"block":10001}`
})).Return(nil)
// write a checkpoint when we delete
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && cp.Listeners[*l.ID] == nil
})).Return(nil)

_, err := es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
assert.NoError(t, err)

err = es.Start(es.bgCtx)
assert.NoError(t, err)

assert.Equal(t, apitypes.EventStreamStatusStarted, es.Status())

err = es.Start(es.bgCtx) // double start is error
assert.Regexp(t, "FF21027", err)

r := <-started

mockListenerEvent := &ffcapi.ListenerEvent{
Checkpoint: &ffcapi.BlockListenerCheckpoint{Block: 10001},
BlockEvent: &ffcapi.BlockEvent{
ListenerID: l.ID,
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(10001),
BlockHash: fftypes.NewRandB32().String(),
ParentHash: fftypes.NewRandB32().String(),
},
},
}

idem.On("ProcessBatchedEvents", mock.Anything, 1, 1, mock.MatchedBy(func(events []*ffcapi.ListenerEvent) bool {
return events[0] == mockListenerEvent
})).Return()

r <- mockListenerEvent

err = es.RemoveListener(es.bgCtx, l.ID)
assert.NoError(t, err)

err = es.Stop(es.bgCtx)
assert.NoError(t, err)

mfc.AssertExpectations(t)
}

func TestStartEventStreamCheckpointReadFail(t *testing.T) {

es := newTestEventStream(t, `{
Expand Down Expand Up @@ -735,7 +860,7 @@ func TestStartWithExistingStreamOk(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.NoError(t, err)

mfc.AssertExpectations(t)
Expand All @@ -755,7 +880,7 @@ func TestStartWithExistingStreamFail(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.Regexp(t, "pop", err)

mfc.AssertExpectations(t)
Expand Down Expand Up @@ -1161,7 +1286,7 @@ func TestStartWithExistingBlockListener(t *testing.T) {

_, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`, l)
}`, nil, l)
assert.NoError(t, err)

mfc.AssertExpectations(t)
Expand All @@ -1179,7 +1304,7 @@ func TestStartAndAddBadListenerType(t *testing.T) {

es, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`)
}`, nil)
assert.NoError(t, err)

_, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
Expand Down
1 change: 1 addition & 0 deletions internal/tmmsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ var (
MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict)
MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest)
MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest)
MsgMissingInternalDispatcher = ffe("FF21091", "'internal' type is supported for module mode")
)
48 changes: 48 additions & 0 deletions mocks/eventsmocks/internal_events_dispatcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading