From 63730346a60aba1253998c52cd1fc9348e7b8b46 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 8 Jul 2024 16:09:02 +0100 Subject: [PATCH 1/3] add module mode and extract public interface Signed-off-by: Chengxuan Xing --- Makefile | 2 + internal/events/eventstream.go | 41 ++- internal/events/eventstream_test.go | 5 + internal/tmmsgs/en_error_messages.go | 1 + .../eventsmocks/internal_events_dispatcher.go | 48 +++ mocks/fftmmocks/module_functions.go | 48 +++ pkg/apitypes/api_types.go | 1 + pkg/fftm/manager.go | 67 ++-- pkg/fftm/manager_test.go | 36 +-- pkg/fftm/route_delete_eventstream.go | 2 +- pkg/fftm/route_delete_eventstream_listener.go | 2 +- pkg/fftm/route_delete_subscription.go | 2 +- pkg/fftm/route_get_eventstream.go | 2 +- pkg/fftm/route_get_eventstream_listener.go | 2 +- pkg/fftm/route_get_eventstreams.go | 2 +- pkg/fftm/route_get_subscription.go | 2 +- pkg/fftm/route_get_subscriptions.go | 2 +- pkg/fftm/route_patch_eventstream.go | 2 +- pkg/fftm/route_patch_eventstream_listener.go | 2 +- pkg/fftm/route_patch_subscription.go | 2 +- pkg/fftm/route_post_eventstream.go | 2 +- .../route_post_eventstream_listener_reset.go | 2 +- pkg/fftm/route_post_eventstream_listeners.go | 2 +- pkg/fftm/route_post_eventstream_resume.go | 2 +- pkg/fftm/route_post_eventstream_suspend.go | 2 +- pkg/fftm/route_post_subscription_reset.go | 2 +- pkg/fftm/route_post_subscriptions.go | 2 +- pkg/fftm/stream_management.go | 289 ++++++++++-------- pkg/fftm/stream_management_test.go | 104 +++---- pkg/fftm/transaction_management.go | 23 +- pkg/txhandler/txhandler.go | 17 +- 31 files changed, 451 insertions(+), 267 deletions(-) create mode 100644 mocks/eventsmocks/internal_events_dispatcher.go create mode 100644 mocks/fftmmocks/module_functions.go diff --git a/Makefile b/Makefile index bf9930e7..adbb2d99 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,7 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY} endef $(eval $(call makemock, pkg/ffcapi, API, ffcapimocks)) +$(eval $(call makemock, pkg/fftm, ModuleFunctions, fftmmocks)) $(eval $(call makemock, pkg/txhandler, TransactionHandler, txhandlermocks)) $(eval $(call makemock, pkg/txhandler, ManagedTxEventHandler, txhandlermocks)) $(eval $(call makemock, internal/metrics, TransactionHandlerMetrics, metricsmocks)) @@ -41,6 +42,7 @@ $(eval $(call makemock, internal/persistence, RichQuery, per $(eval $(call makemock, internal/ws, WebSocketChannels, wsmocks)) $(eval $(call makemock, internal/ws, WebSocketServer, wsmocks)) $(eval $(call makemock, internal/events, Stream, eventsmocks)) +$(eval $(call makemock, internal/events, InternalEventsDispatcher, eventsmocks)) $(eval $(call makemock, internal/apiclient, FFTMClient, apiclientmocks)) go-mod-tidy: .ALWAYS diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index 9de8b39e..81ae0161 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -79,7 +79,9 @@ func InitDefaults() { } } -type eventStreamAction func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error +type InternalEventsDispatcher interface { + ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error +} type eventStreamBatch struct { number int64 @@ -92,7 +94,7 @@ type startedStreamState struct { ctx context.Context cancelCtx func() startTime *fftypes.FFTime - action eventStreamAction + action func(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error eventLoopDone chan struct{} batchLoopDone chan struct{} blockListenerDone chan struct{} @@ -110,6 +112,7 @@ type eventStream struct { confirmations confirmations.Manager confirmationsRequired int listeners map[fftypes.UUID]*listener + internalDispatcher InternalEventsDispatcher wsChannels ws.WebSocketChannels retry *retry.Retry currentState *startedStreamState @@ -125,6 +128,7 @@ func NewEventStream( wsChannels ws.WebSocketChannels, initialListeners []*apitypes.Listener, eme metrics.EventMetricsEmitter, + internalDispatcher InternalEventsDispatcher, ) (ees Stream, err error) { esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String()) es := &eventStream{ @@ -134,6 +138,7 @@ func NewEventStream( connector: connector, persistence: persistence, listeners: make(map[fftypes.UUID]*listener), + internalDispatcher: internalDispatcher, wsChannels: wsChannels, retry: esDefaults.retry, checkpointInterval: config.GetDuration(tmconfig.EventStreamsCheckpointInterval), @@ -162,18 +167,26 @@ func NewEventStream( func (es *eventStream) initAction(startedState *startedStreamState) error { ctx := startedState.ctx - switch *es.spec.Type { - case apitypes.EventStreamTypeWebhook: - wa, err := newWebhookAction(ctx, es.spec.Webhook) - if err != nil { - return err + if es.internalDispatcher != nil { + if es.spec.Type != &apitypes.EventStreamTypeInternal { + // TODO: need to understand why this should be panic, copied from the default switch case + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type)) + } + startedState.action = es.internalDispatcher.ProcessBatchedEvents + } else { + switch *es.spec.Type { + case apitypes.EventStreamTypeWebhook: + wa, err := newWebhookAction(ctx, es.spec.Webhook) + if err != nil { + return err + } + startedState.action = wa.attemptBatch + case apitypes.EventStreamTypeWebSocket: + startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch + default: + // mergeValidateEsConfig always be called previous to this + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } - startedState.action = wa.attemptBatch - case apitypes.EventStreamTypeWebSocket: - startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch - default: - // mergeValidateEsConfig always be called previous to this - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } return nil } @@ -251,6 +264,8 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda if merged.Webhook, changed, err = mergeValidateWhConfig(ctx, changed, base.Webhook, updates.Webhook); err != nil { return nil, false, err } + case apitypes.EventStreamTypeInternal: + // no checks are required for internal listener default: return nil, false, i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *merged.Type) } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 290d0924..225e6487 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -91,6 +91,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str &wsmocks.WebSocketChannels{}, listeners, emm, + nil, ) mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe() if err != nil { @@ -125,12 +126,14 @@ func TestNewTestEventStreamMissingID(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + _, err := NewEventStream(context.Background(), &apitypes.EventStream{}, &ffcapimocks.API{}, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, emm, + nil, ) assert.Regexp(t, "FF21048", err) } @@ -139,12 +142,14 @@ func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() emm := &metricsmocks.EventMetricsEmitter{} + _, err := NewEventStream(context.Background(), testESConf(t, `{}`), &ffcapimocks.API{}, &persistencemocks.Persistence{}, &wsmocks.WebSocketChannels{}, []*apitypes.Listener{}, emm, + nil, ) assert.Regexp(t, "FF21028", err) } diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index 3d358316..ac6cd362 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -105,4 +105,5 @@ var ( MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict) MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest) MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest) + MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest) ) diff --git a/mocks/eventsmocks/internal_events_dispatcher.go b/mocks/eventsmocks/internal_events_dispatcher.go new file mode 100644 index 00000000..daccb299 --- /dev/null +++ b/mocks/eventsmocks/internal_events_dispatcher.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.40.2. DO NOT EDIT. + +package eventsmocks + +import ( + context "context" + + apitypes "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + + mock "github.com/stretchr/testify/mock" +) + +// InternalEventsDispatcher is an autogenerated mock type for the InternalEventsDispatcher type +type InternalEventsDispatcher struct { + mock.Mock +} + +// ProcessBatchedEvents provides a mock function with given fields: ctx, batchNumber, attempt, _a3 +func (_m *InternalEventsDispatcher) ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, _a3 []*apitypes.EventWithContext) error { + ret := _m.Called(ctx, batchNumber, attempt, _a3) + + if len(ret) == 0 { + panic("no return value specified for ProcessBatchedEvents") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int, []*apitypes.EventWithContext) error); ok { + r0 = rf(ctx, batchNumber, attempt, _a3) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewInternalEventsDispatcher creates a new instance of InternalEventsDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewInternalEventsDispatcher(t interface { + mock.TestingT + Cleanup(func()) +}) *InternalEventsDispatcher { + mock := &InternalEventsDispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/fftmmocks/module_functions.go b/mocks/fftmmocks/module_functions.go new file mode 100644 index 00000000..e68defc1 --- /dev/null +++ b/mocks/fftmmocks/module_functions.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.40.2. DO NOT EDIT. + +package fftmmocks + +import ( + context "context" + + apitypes "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + + mock "github.com/stretchr/testify/mock" +) + +// ModuleFunctions is an autogenerated mock type for the ModuleFunctions type +type ModuleFunctions struct { + mock.Mock +} + +// ProcessBatchedEvents provides a mock function with given fields: ctx, batchNumber, attempt, events +func (_m *ModuleFunctions) ProcessBatchedEvents(ctx context.Context, batchNumber int64, attempt int, events []*apitypes.EventWithContext) error { + ret := _m.Called(ctx, batchNumber, attempt, events) + + if len(ret) == 0 { + panic("no return value specified for ProcessBatchedEvents") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int, []*apitypes.EventWithContext) error); ok { + r0 = rf(ctx, batchNumber, attempt, events) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewModuleFunctions creates a new instance of ModuleFunctions. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewModuleFunctions(t interface { + mock.TestingT + Cleanup(func()) +}) *ModuleFunctions { + mock := &ModuleFunctions{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/apitypes/api_types.go b/pkg/apitypes/api_types.go index d2f4b44a..10a40634 100644 --- a/pkg/apitypes/api_types.go +++ b/pkg/apitypes/api_types.go @@ -43,6 +43,7 @@ type EventStreamType = fftypes.FFEnum var ( EventStreamTypeWebhook = fftypes.FFEnumValue("estype", "webhook") EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket") + EventStreamTypeInternal = fftypes.FFEnumValue("estype", "internal") ) type ErrorHandlingType = fftypes.FFEnum diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index 1ea07ce6..ccfd8e31 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -42,6 +42,8 @@ import ( type Manager interface { Start() error + StreamManager + txhandler.TransactionManager Close() } @@ -50,9 +52,6 @@ type manager struct { cancelCtx func() confirmations confirmations.Manager txHandler txhandler.TransactionHandler - apiServer httpserver.HTTPServer - metricsServer httpserver.HTTPServer - wsServer ws.WebSocketServer persistence persistence.Persistence richQueryEnabled bool @@ -65,20 +64,32 @@ type manager struct { blockListenerDone chan struct{} txHandlerDone <-chan struct{} started bool + + // configurations that are specific to FFTM running as a go module + moduleFunctions ModuleFunctions + + // configurations that are specific to FFTM running as an HTTP server + apiServer httpserver.HTTPServer + metricsServer httpserver.HTTPServer + wsServer ws.WebSocketServer apiServerDone chan error metricsServerDone chan error metricsEnabled bool metricsManager metrics.Metrics } +type ModuleFunctions interface { + events.InternalEventsDispatcher +} + func InitConfig() { tmconfig.Reset() events.InitDefaults() } -func NewManager(ctx context.Context, connector ffcapi.API) (Manager, error) { +func NewManager(ctx context.Context, connector ffcapi.API, mf ModuleFunctions) (Manager, error) { var err error - m := newManager(ctx, connector) + m := newManager(ctx, connector, mf) if err = m.initPersistence(ctx); err != nil { return nil, err } @@ -88,16 +99,18 @@ func NewManager(ctx context.Context, connector ffcapi.API) (Manager, error) { return m, nil } -func newManager(ctx context.Context, connector ffcapi.API) *manager { +func newManager(ctx context.Context, connector ffcapi.API, mf ModuleFunctions) *manager { m := &manager{ connector: connector, apiServerDone: make(chan error), metricsServerDone: make(chan error), - metricsEnabled: config.GetBool(tmconfig.MetricsEnabled), + metricsEnabled: config.GetBool(tmconfig.MetricsEnabled) && mf == nil, eventStreams: make(map[fftypes.UUID]events.Stream), streamsByName: make(map[string]*fftypes.UUID), metricsManager: metrics.NewMetricsManager(ctx), + moduleFunctions: mf, } + m.toolkit = &txhandler.Toolkit{ Connector: m.connector, MetricsManager: m.metricsManager, @@ -108,10 +121,12 @@ func newManager(ctx context.Context, connector ffcapi.API) *manager { func (m *manager) initServices(ctx context.Context) (err error) { m.confirmations = confirmations.NewBlockConfirmationManager(ctx, m.connector, "receipts", m.metricsManager) - m.wsServer = ws.NewWebSocketServer(ctx) - m.apiServer, err = httpserver.NewHTTPServer(ctx, "api", m.router(m.metricsEnabled), m.apiServerDone, tmconfig.APIConfig, tmconfig.CorsConfig) - if err != nil { - return err + if m.moduleFunctions == nil { + m.wsServer = ws.NewWebSocketServer(ctx) + m.apiServer, err = httpserver.NewHTTPServer(ctx, "api", m.router(m.metricsEnabled), m.apiServerDone, tmconfig.APIConfig, tmconfig.CorsConfig) + if err != nil { + return err + } } // check whether a policy engine name is provided @@ -129,13 +144,15 @@ func (m *manager) initServices(ctx context.Context) (err error) { m.toolkit.EventHandler = NewManagedTransactionEventHandler(ctx, m.confirmations, m.wsServer, m.txHandler) m.txHandler.Init(ctx, m.toolkit) - // metrics service must be initialized after transaction handler - // in case the transaction handler has logic in the Init function - // to add more metrics - if m.metricsEnabled { - m.metricsServer, err = httpserver.NewHTTPServer(ctx, "metrics", m.createMetricsMuxRouter(), m.metricsServerDone, tmconfig.MetricsConfig, tmconfig.CorsConfig) - if err != nil { - return err + if m.moduleFunctions == nil { + // metrics service must be initialized after transaction handler + // in case the transaction handler has logic in the Init function + // to add more metrics + if m.metricsEnabled { + m.metricsServer, err = httpserver.NewHTTPServer(ctx, "metrics", m.createMetricsMuxRouter(), m.metricsServerDone, tmconfig.MetricsConfig, tmconfig.CorsConfig) + if err != nil { + return err + } } } return nil @@ -166,9 +183,11 @@ func (m *manager) initPersistence(ctx context.Context) (err error) { } func (m *manager) Start() error { - go httpserver.RunDebugServer(m.ctx, tmconfig.DebugConfig) + if m.moduleFunctions == nil { + go httpserver.RunDebugServer(m.ctx, tmconfig.DebugConfig) + } - if err := m.restoreStreams(); err != nil { + if err := m._restoreStreams(); err != nil { return err } @@ -179,9 +198,11 @@ func (m *manager) Start() error { return err } - go m.runAPIServer() - if m.metricsEnabled { - go m.runMetricsServer() + if m.moduleFunctions == nil { + go m.runAPIServer() + if m.metricsEnabled { + go m.runMetricsServer() + } } go m.confirmations.Start() diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index bddffbc0..b7ab9bb0 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -86,7 +86,7 @@ func newTestManager(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mm, err := NewManager(context.Background(), mca) + mm, err := NewManager(context.Background(), mca, nil) assert.NoError(t, err) m := mm.(*manager) @@ -105,9 +105,7 @@ func newTestManagerMockNoRichDB(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t, false) - mca := &ffcapimocks.API{} - - m := newManager(context.Background(), mca) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mpm := &persistencemocks.Persistence{} mpm.On("Close", mock.Anything).Return(nil) @@ -133,9 +131,7 @@ func newTestManagerMockRichDB(t *testing.T) (string, *manager, *persistencemocks url := testManagerCommonInit(t, false) - mca := &ffcapimocks.API{} - - m := newManager(context.Background(), mca) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mpm := &persistencemocks.Persistence{} mpm.On("Close", mock.Anything).Return(nil) @@ -170,7 +166,8 @@ func newTestManagerWithMetrics(t *testing.T) (string, *manager, func()) { mca := &ffcapimocks.API{} mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() - mm, err := NewManager(context.Background(), mca) + + mm, err := NewManager(context.Background(), mca, nil) assert.NoError(t, err) m := mm.(*manager) @@ -188,8 +185,7 @@ func newTestManagerWithMetrics(t *testing.T) (string, *manager, func()) { func newTestManagerMockPersistence(t *testing.T) (string, *manager, func()) { url := testManagerCommonInit(t, false) - - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -210,7 +206,7 @@ func TestNewManagerBadPersistencePathConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF21050", err) @@ -233,7 +229,7 @@ func TestNewManagerWithLegacyConfiguration(t *testing.T) { tmconfig.DeprecatedPolicyEngineBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) mp := &persistencemocks.Persistence{} mp.On("Close", mock.Anything).Return(nil).Maybe() m.persistence = mp @@ -253,7 +249,7 @@ func TestNewManagerBadHttpConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF00151", err) @@ -272,7 +268,7 @@ func TestNewManagerBadLevelDBConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err = NewManager(context.Background(), nil) + _, err = NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21049", err) } @@ -286,7 +282,7 @@ func TestNewManagerBadPersistenceConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21043", err) } @@ -298,7 +294,7 @@ func TestNewManagerInvalidTransactionHandlerName(t *testing.T) { config.Set(tmconfig.PersistenceLevelDBPath, dir) config.Set(tmconfig.TransactionsHandlerName, "wrong") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Regexp(t, "FF21070", err) } @@ -307,7 +303,7 @@ func TestNewManagerMetricsOffByDefault(t *testing.T) { tmconfig.Reset() - m := newManager(context.Background(), nil) + m := newManager(context.Background(), nil, nil) assert.False(t, m.metricsEnabled) } @@ -333,7 +329,7 @@ func TestNewManagerWithMetricsBadConfig(t *testing.T) { txRegistry.RegisterHandler(&simple.TransactionHandlerFactory{}) tmconfig.TransactionHandlerBaseConfig.SubSection("simple").Set(simple.FixedGasPrice, "223344556677") - _, err := NewManager(context.Background(), nil) + _, err := NewManager(context.Background(), nil, nil) assert.Error(t, err) assert.Regexp(t, "FF00151", err) } @@ -381,7 +377,7 @@ func TestPSQLInitFail(t *testing.T) { _ = testManagerCommonInit(t, false) config.Set(tmconfig.PersistenceType, "postgres") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) err := m.initPersistence(context.Background()) assert.Regexp(t, "FF21049", err) @@ -393,7 +389,7 @@ func TestPSQLInitRichQueryEnabled(t *testing.T) { config.Set(tmconfig.PersistenceType, "postgres") tmconfig.PostgresSection.Set(dbsql.SQLConfDatasourceURL, "unused") - m := newManager(context.Background(), &ffcapimocks.API{}) + m := newManager(context.Background(), &ffcapimocks.API{}, nil) err := m.initPersistence(context.Background()) assert.NoError(t, err) diff --git a/pkg/fftm/route_delete_eventstream.go b/pkg/fftm/route_delete_eventstream.go index 841a037c..0053acc7 100644 --- a/pkg/fftm/route_delete_eventstream.go +++ b/pkg/fftm/route_delete_eventstream.go @@ -37,7 +37,7 @@ var deleteEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: nil, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - err = m.deleteStream(r.Req.Context(), r.PP["streamId"]) + err = m.DeleteStream(r.Req.Context(), r.PP["streamId"]) return nil, err }, } diff --git a/pkg/fftm/route_delete_eventstream_listener.go b/pkg/fftm/route_delete_eventstream_listener.go index 7dc890d8..32fbebcd 100644 --- a/pkg/fftm/route_delete_eventstream_listener.go +++ b/pkg/fftm/route_delete_eventstream_listener.go @@ -39,7 +39,7 @@ var deleteEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return nil, m.deleteListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) + return nil, m.DeleteListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_delete_subscription.go b/pkg/fftm/route_delete_subscription.go index fd097996..f1f7b86d 100644 --- a/pkg/fftm/route_delete_subscription.go +++ b/pkg/fftm/route_delete_subscription.go @@ -39,7 +39,7 @@ var deleteSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusNoContent}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return nil, m.deleteListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) + return nil, m.DeleteListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_eventstream.go b/pkg/fftm/route_get_eventstream.go index 19694c98..6c81ecdc 100644 --- a/pkg/fftm/route_get_eventstream.go +++ b/pkg/fftm/route_get_eventstream.go @@ -38,7 +38,7 @@ var getEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStreamWithStatus{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getStream(r.Req.Context(), r.PP["streamId"]) + return m.GetStream(r.Req.Context(), r.PP["streamId"]) }, } } diff --git a/pkg/fftm/route_get_eventstream_listener.go b/pkg/fftm/route_get_eventstream_listener.go index 2e80ea76..4643d279 100644 --- a/pkg/fftm/route_get_eventstream_listener.go +++ b/pkg/fftm/route_get_eventstream_listener.go @@ -39,7 +39,7 @@ var getEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) + return m.GetListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_eventstreams.go b/pkg/fftm/route_get_eventstreams.go index ee5f6c5a..f54be14f 100644 --- a/pkg/fftm/route_get_eventstreams.go +++ b/pkg/fftm/route_get_eventstreams.go @@ -48,7 +48,7 @@ var getEventStreams = func(m *manager) *ffapi.Route { {Name: "after", Description: tmmsgs.APIParamAfter}, } route.JSONHandler = func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getStreams(r.Req.Context(), r.QP["after"], r.QP["limit"]) + return m.GetStreams(r.Req.Context(), r.QP["after"], r.QP["limit"]) } } return route diff --git a/pkg/fftm/route_get_subscription.go b/pkg/fftm/route_get_subscription.go index 68d47e05..1ba040de 100644 --- a/pkg/fftm/route_get_subscription.go +++ b/pkg/fftm/route_get_subscription.go @@ -39,7 +39,7 @@ var getSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) + return m.GetListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"]) }, } } diff --git a/pkg/fftm/route_get_subscriptions.go b/pkg/fftm/route_get_subscriptions.go index ad7d4910..263a3816 100644 --- a/pkg/fftm/route_get_subscriptions.go +++ b/pkg/fftm/route_get_subscriptions.go @@ -50,7 +50,7 @@ var getSubscriptions = func(m *manager) *ffapi.Route { {Name: "after", Description: tmmsgs.APIParamAfter}, } route.JSONHandler = func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getListeners(r.Req.Context(), r.QP["after"], r.QP["limit"]) + return m.GetListeners(r.Req.Context(), r.QP["after"], r.QP["limit"]) } } return route diff --git a/pkg/fftm/route_patch_eventstream.go b/pkg/fftm/route_patch_eventstream.go index 8ebb395e..736c768e 100644 --- a/pkg/fftm/route_patch_eventstream.go +++ b/pkg/fftm/route_patch_eventstream.go @@ -38,7 +38,7 @@ var patchEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStream{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateStream(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.EventStream)) + return m.UpdateStream(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.EventStream)) }, } } diff --git a/pkg/fftm/route_patch_eventstream_listener.go b/pkg/fftm/route_patch_eventstream_listener.go index 76302cc4..802df769 100644 --- a/pkg/fftm/route_patch_eventstream_listener.go +++ b/pkg/fftm/route_patch_eventstream_listener.go @@ -39,7 +39,7 @@ var patchEventStreamListener = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), false) + return m.UpdateListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), false) }, } } diff --git a/pkg/fftm/route_patch_subscription.go b/pkg/fftm/route_patch_subscription.go index 9ae7ce22..12a44e75 100644 --- a/pkg/fftm/route_patch_subscription.go +++ b/pkg/fftm/route_patch_subscription.go @@ -39,7 +39,7 @@ var patchSubscription = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), false) + return m.UpdateListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), false) }, } } diff --git a/pkg/fftm/route_post_eventstream.go b/pkg/fftm/route_post_eventstream.go index c41d70cb..8d7a69cd 100644 --- a/pkg/fftm/route_post_eventstream.go +++ b/pkg/fftm/route_post_eventstream.go @@ -36,7 +36,7 @@ var postEventStream = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.EventStream{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewStream(r.Req.Context(), r.Input.(*apitypes.EventStream)) + return m.CreateAndStoreNewStream(r.Req.Context(), r.Input.(*apitypes.EventStream)) }, } } diff --git a/pkg/fftm/route_post_eventstream_listener_reset.go b/pkg/fftm/route_post_eventstream_listener_reset.go index 39741977..2ca83d72 100644 --- a/pkg/fftm/route_post_eventstream_listener_reset.go +++ b/pkg/fftm/route_post_eventstream_listener_reset.go @@ -39,7 +39,7 @@ var postEventStreamListenerReset = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), true) + return m.UpdateListener(r.Req.Context(), r.PP["streamId"], r.PP["listenerId"], r.Input.(*apitypes.Listener), true) }, } } diff --git a/pkg/fftm/route_post_eventstream_listeners.go b/pkg/fftm/route_post_eventstream_listeners.go index c5e03f08..5d19c723 100644 --- a/pkg/fftm/route_post_eventstream_listeners.go +++ b/pkg/fftm/route_post_eventstream_listeners.go @@ -38,7 +38,7 @@ var postEventStreamListeners = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) + return m.CreateAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/route_post_eventstream_resume.go b/pkg/fftm/route_post_eventstream_resume.go index ce9c2931..64037c28 100644 --- a/pkg/fftm/route_post_eventstream_resume.go +++ b/pkg/fftm/route_post_eventstream_resume.go @@ -39,7 +39,7 @@ var postEventStreamResume = func(m *manager) *ffapi.Route { JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { falsy := false - _, err = m.updateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ + _, err = m.UpdateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ Suspended: &falsy, }) return &struct{}{}, err diff --git a/pkg/fftm/route_post_eventstream_suspend.go b/pkg/fftm/route_post_eventstream_suspend.go index 3551ee42..5a82160f 100644 --- a/pkg/fftm/route_post_eventstream_suspend.go +++ b/pkg/fftm/route_post_eventstream_suspend.go @@ -39,7 +39,7 @@ var postEventStreamSuspend = func(m *manager) *ffapi.Route { JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { truthy := true - _, err = m.updateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ + _, err = m.UpdateStream(r.Req.Context(), r.PP["streamId"], &apitypes.EventStream{ Suspended: &truthy, }) return &struct{}{}, err diff --git a/pkg/fftm/route_post_subscription_reset.go b/pkg/fftm/route_post_subscription_reset.go index d6f82f91..ced75c31 100644 --- a/pkg/fftm/route_post_subscription_reset.go +++ b/pkg/fftm/route_post_subscription_reset.go @@ -39,7 +39,7 @@ var postSubscriptionReset = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.updateExistingListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), true) + return m.UpdateListener(r.Req.Context(), "" /* no streamId on this path */, r.PP["listenerId"], r.Input.(*apitypes.Listener), true) }, } } diff --git a/pkg/fftm/route_post_subscriptions.go b/pkg/fftm/route_post_subscriptions.go index 313a5fd9..0f4c5a30 100644 --- a/pkg/fftm/route_post_subscriptions.go +++ b/pkg/fftm/route_post_subscriptions.go @@ -37,7 +37,7 @@ var postSubscriptions = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.createAndStoreNewListener(r.Req.Context(), r.Input.(*apitypes.Listener)) + return m.createAndStoreNewListenerDeprecated(r.Req.Context(), r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 0a6e3df7..81161e60 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -36,7 +36,24 @@ const ( startupPaginationLimit = 25 ) -func (m *manager) restoreStreams() error { +type StreamManager interface { + CreateAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) + GetStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) + GetStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) + UpdateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) + DeleteStream(ctx context.Context, idStr string) error +} + +type StreamListenerManager interface { + CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) + GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) + GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) + UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) + DeleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error +} + +// Event stream functions +func (m *manager) _restoreStreams() error { var lastInPage *fftypes.UUID for { streamDefs, err := m.persistence.ListStreamsByCreateTime(m.ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending) @@ -54,10 +71,10 @@ func (m *manager) restoreStreams() error { } // check to see if it's already started if _, ok := m.eventStreams[*def.ID]; !ok { - closeoutName, err := m.reserveStreamName(m.ctx, *def.Name, def.ID) + closeoutName, err := m._reserveStreamName(m.ctx, *def.Name, def.ID) var s events.Stream if err == nil { - s, err = m.addRuntimeStream(def, streamListeners) + s, err = m._addRuntimeStream(def, streamListeners) } if err == nil && !*def.Suspended { err = s.Start(m.ctx) @@ -72,7 +89,7 @@ func (m *manager) restoreStreams() error { return nil } -func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { +func (m *manager) _deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { for { // Do not specify after as we just delete everything listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, nil, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) @@ -91,8 +108,8 @@ func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftype return nil } -func (m *manager) addRuntimeStream(def *apitypes.EventStream, listeners []*apitypes.Listener) (events.Stream, error) { - s, err := events.NewEventStream(m.ctx, def, m.connector, m.persistence, m.wsServer, listeners, m.metricsManager) +func (m *manager) _addRuntimeStream(def *apitypes.EventStream, listeners []*apitypes.Listener) (events.Stream, error) { + s, err := events.NewEventStream(m.ctx, def, m.connector, m.persistence, m.wsServer, listeners, m.metricsManager, m.moduleFunctions) if err != nil { return nil, err } @@ -103,31 +120,7 @@ func (m *manager) addRuntimeStream(def *apitypes.EventStream, listeners []*apity return s, nil } -func (m *manager) deleteStream(ctx context.Context, idStr string) error { - id, err := fftypes.ParseUUID(ctx, idStr) - if err != nil { - return err - } - m.mux.Lock() - s := m.eventStreams[*id] - delete(m.eventStreams, *id) - if s != nil { - delete(m.streamsByName, *s.Spec().Name) - } - m.mux.Unlock() - if err := m.deleteAllStreamListeners(ctx, id); err != nil { - return err - } - if err := m.persistence.DeleteStream(ctx, id); err != nil { - return err - } - if s != nil { - return s.Delete(ctx) - } - return nil -} - -func (m *manager) reserveStreamName(ctx context.Context, name string, id *fftypes.UUID) (func(bool), error) { +func (m *manager) _reserveStreamName(ctx context.Context, name string, id *fftypes.UUID) (func(bool), error) { m.mux.Lock() defer m.mux.Unlock() @@ -157,7 +150,7 @@ func (m *manager) reserveStreamName(ctx context.Context, name string, id *fftype }, nil } -func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) { +func (m *manager) CreateAndStoreNewStream(ctx context.Context, def *apitypes.EventStream) (*apitypes.EventStream, error) { def.ID = apitypes.NewULID() def.Created = nil // set to updated time by events.NewEventStream if def.Name == nil || *def.Name == "" { @@ -165,13 +158,13 @@ func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.Eve } stored := false - closeoutName, err := m.reserveStreamName(ctx, *def.Name, def.ID) + closeoutName, err := m._reserveStreamName(ctx, *def.Name, def.ID) if err != nil { return nil, err } defer func() { closeoutName(stored) }() - s, err := m.addRuntimeStream(def, nil /* no listeners when a new stream is first created */) + s, err := m._addRuntimeStream(def, nil /* no listeners when a new stream is first created */) if err != nil { return nil, err } @@ -192,71 +185,32 @@ func (m *manager) createAndStoreNewStream(ctx context.Context, def *apitypes.Eve return spec, nil } -func (m *manager) createAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { - streamID, err := fftypes.ParseUUID(ctx, idStr) +func (m *manager) GetStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) if err != nil { return nil, err } - def.StreamID = streamID - return m.createAndStoreNewListener(ctx, def) -} - -func (m *manager) createAndStoreNewListener(ctx context.Context, def *apitypes.Listener) (*apitypes.Listener, error) { - return m.createOrUpdateListener(ctx, apitypes.NewULID(), def, false) -} - -func (m *manager) updateExistingListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { - l, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage - if err != nil { - return nil, err - } - updates.StreamID = l.StreamID - return m.createOrUpdateListener(ctx, l.ID, updates, reset) + return m.persistence.ListStreamsByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) } -func (m *manager) createOrUpdateListener(ctx context.Context, id *fftypes.UUID, newOrUpdates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { - if err := mergeEthCompatMethods(ctx, newOrUpdates); err != nil { - return nil, err - } - var s events.Stream - if newOrUpdates.StreamID != nil { - m.mux.Lock() - s = m.eventStreams[*newOrUpdates.StreamID] - m.mux.Unlock() - } - if s == nil { - return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, newOrUpdates.StreamID) - } - def, err := s.AddOrUpdateListener(ctx, id, newOrUpdates, reset) +func (m *manager) GetStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) { + id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err } - if err := m.persistence.WriteListener(ctx, def); err != nil { - err1 := s.RemoveListener(ctx, def.ID) - log.L(ctx).Infof("Cleaned up runtime listener after write failed (err?=%v)", err1) - return nil, err - } - return def, nil -} - -func (m *manager) deleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error { - spec, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage - if err != nil { - return err - } m.mux.Lock() - s := m.eventStreams[*spec.StreamID] + s := m.eventStreams[*id] m.mux.Unlock() if s == nil { - return i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, spec.StreamID) - } - if err := s.RemoveListener(ctx, spec.ID); err != nil { - return err + return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, idStr) } - return m.persistence.DeleteListener(ctx, spec.ID) + return &apitypes.EventStreamWithStatus{ + EventStream: *s.Spec(), + Status: s.Status(), + }, nil } -func (m *manager) updateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) { +func (m *manager) UpdateStream(ctx context.Context, idStr string, updates *apitypes.EventStream) (*apitypes.EventStream, error) { id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err @@ -270,7 +224,7 @@ func (m *manager) updateStream(ctx context.Context, idStr string, updates *apity nameChanged := false if updates.Name != nil && *updates.Name != "" { - closeoutName, err := m.reserveStreamName(ctx, *updates.Name, id) + closeoutName, err := m._reserveStreamName(ctx, *updates.Name, id) if err != nil { return nil, err } @@ -297,53 +251,71 @@ func (m *manager) updateStream(ctx context.Context, idStr string, updates *apity return spec, nil } -func (m *manager) getStream(ctx context.Context, idStr string) (*apitypes.EventStreamWithStatus, error) { +func (m *manager) DeleteStream(ctx context.Context, idStr string) error { id, err := fftypes.ParseUUID(ctx, idStr) if err != nil { - return nil, err + return err } m.mux.Lock() s := m.eventStreams[*id] + delete(m.eventStreams, *id) + if s != nil { + delete(m.streamsByName, *s.Spec().Name) + } m.mux.Unlock() - if s == nil { - return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, idStr) + if err := m._deleteAllStreamListeners(ctx, id); err != nil { + return err } - return &apitypes.EventStreamWithStatus{ - EventStream: *s.Spec(), - Status: s.Status(), - }, nil -} - -func (m *manager) parseLimit(ctx context.Context, limitStr string) (limit int, err error) { - if limitStr != "" { - if limit, err = strconv.Atoi(limitStr); err != nil { - return -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) - } + if err := m.persistence.DeleteStream(ctx, id); err != nil { + return err } - return limit, nil + if s != nil { + return s.Delete(ctx) + } + return nil } -func (m *manager) parseAfterAndLimit(ctx context.Context, afterStr, limitStr string) (after *fftypes.UUID, limit int, err error) { - if limit, err = m.parseLimit(ctx, limitStr); err != nil { - return nil, -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) +// Stream listener functions + +func (m *manager) _createOrUpdateListener(ctx context.Context, id *fftypes.UUID, newOrUpdates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { + if err := _mergeEthCompatMethods(ctx, newOrUpdates); err != nil { + return nil, err } - if afterStr != "" { - if after, err = fftypes.ParseUUID(ctx, afterStr); err != nil { - return nil, -1, err - } + var s events.Stream + if newOrUpdates.StreamID != nil { + m.mux.Lock() + s = m.eventStreams[*newOrUpdates.StreamID] + m.mux.Unlock() } - return after, limit, nil + if s == nil { + return nil, i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, newOrUpdates.StreamID) + } + def, err := s.AddOrUpdateListener(ctx, id, newOrUpdates, reset) + if err != nil { + return nil, err + } + if err := m.persistence.WriteListener(ctx, def); err != nil { + err1 := s.RemoveListener(ctx, def.ID) + log.L(ctx).Infof("Cleaned up runtime listener after write failed (err?=%v)", err1) + return nil, err + } + return def, nil +} + +func (m *manager) createAndStoreNewListenerDeprecated(ctx context.Context, def *apitypes.Listener) (*apitypes.Listener, error) { + return m._createOrUpdateListener(ctx, apitypes.NewULID(), def, false) } -func (m *manager) getStreams(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.EventStream, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { + streamID, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err } - return m.persistence.ListStreamsByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) + def.StreamID = streamID + return m.createAndStoreNewListenerDeprecated(ctx, def) } -func (m *manager) getListenerSpec(ctx context.Context, streamIDStr, listenerIDStr string) (spec *apitypes.Listener, err error) { +func (m *manager) _getListenerSpec(ctx context.Context, streamIDStr, listenerIDStr string) (spec *apitypes.Listener, err error) { var streamID *fftypes.UUID if streamIDStr != "" { streamID, err = fftypes.ParseUUID(ctx, streamIDStr) @@ -367,8 +339,36 @@ func (m *manager) getListenerSpec(ctx context.Context, streamIDStr, listenerIDSt return spec, nil } -func (m *manager) getListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) { - spec, err := m.getListenerSpec(ctx, streamIDStr, listenerIDStr) +func (m *manager) getStreamListenersByCreateTime(ctx context.Context, afterStr, limitStr, idStr string) (streams []*apitypes.Listener, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) + if err != nil { + return nil, err + } + id, err := fftypes.ParseUUID(ctx, idStr) + if err != nil { + return nil, err + } + return m.persistence.ListStreamListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending, id) +} + +func (m *manager) getStreamListenersRich(ctx context.Context, streamID string, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) { + id, err := fftypes.ParseUUID(ctx, streamID) + if err != nil { + return nil, nil, err + } + return m.persistence.RichQuery().ListStreamListeners(ctx, id, filter) +} + +func (m *manager) GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) { + after, limit, err := m._parseAfterAndLimit(ctx, afterStr, limitStr) + if err != nil { + return nil, err + } + return m.persistence.ListListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) +} + +func (m *manager) GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) { + spec, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) if err != nil { return nil, err } @@ -385,35 +385,35 @@ func (m *manager) getListener(ctx context.Context, streamIDStr, listenerIDStr st return l, nil } -func (m *manager) getListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) { + l, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage if err != nil { return nil, err } - return m.persistence.ListListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending) + updates.StreamID = l.StreamID + return m._createOrUpdateListener(ctx, l.ID, updates, reset) } -func (m *manager) getStreamListenersByCreateTime(ctx context.Context, afterStr, limitStr, idStr string) (streams []*apitypes.Listener, err error) { - after, limit, err := m.parseAfterAndLimit(ctx, afterStr, limitStr) +func (m *manager) DeleteListener(ctx context.Context, streamIDStr, listenerIDStr string) error { + spec, err := m._getListenerSpec(ctx, streamIDStr, listenerIDStr) // Verify the listener exists in storage if err != nil { - return nil, err + return err } - id, err := fftypes.ParseUUID(ctx, idStr) - if err != nil { - return nil, err + m.mux.Lock() + s := m.eventStreams[*spec.StreamID] + m.mux.Unlock() + if s == nil { + return i18n.NewError(ctx, tmmsgs.MsgStreamNotFound, spec.StreamID) } - return m.persistence.ListStreamListenersByCreateTime(ctx, after, limit, txhandler.SortDirectionDescending, id) -} - -func (m *manager) getStreamListenersRich(ctx context.Context, streamID string, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) { - id, err := fftypes.ParseUUID(ctx, streamID) - if err != nil { - return nil, nil, err + if err := s.RemoveListener(ctx, spec.ID); err != nil { + return err } - return m.persistence.RichQuery().ListStreamListeners(ctx, id, filter) + return m.persistence.DeleteListener(ctx, spec.ID) } -func mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) error { +// other internal functions + +func _mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) error { if listener.EthCompatMethods != nil { if listener.Options == nil { listener.Options = fftypes.JSONAnyPtr("{}") @@ -434,3 +434,24 @@ func mergeEthCompatMethods(ctx context.Context, listener *apitypes.Listener) err } return nil } + +func (m *manager) _parseLimit(ctx context.Context, limitStr string) (limit int, err error) { + if limitStr != "" { + if limit, err = strconv.Atoi(limitStr); err != nil { + return -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) + } + } + return limit, nil +} + +func (m *manager) _parseAfterAndLimit(ctx context.Context, afterStr, limitStr string) (after *fftypes.UUID, limit int, err error) { + if limit, err = m._parseLimit(ctx, limitStr); err != nil { + return nil, -1, i18n.NewError(ctx, tmmsgs.MsgInvalidLimit, limitStr, err) + } + if afterStr != "" { + if after, err = fftypes.ParseUUID(ctx, afterStr); err != nil { + return nil, -1, err + } + } + return after, limit, nil +} diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 054f458a..2e8f19a1 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -83,7 +83,7 @@ func TestRestoreStreamsReadFailed(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("ListStreamsByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending).Return(nil, fmt.Errorf("pop")) - err := m.restoreStreams() + err := m._restoreStreams() assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -100,7 +100,7 @@ func TestRestoreListenersReadFailed(t *testing.T) { }, nil) mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), 0, txhandler.SortDirectionAscending, mock.Anything).Return(nil, fmt.Errorf("pop")) - err := m.restoreStreams() + err := m._restoreStreams() assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -116,7 +116,7 @@ func TestRestoreStreamsValidateFail(t *testing.T) { err := m.persistence.WriteStream(m.ctx, es1) assert.NoError(t, err) - err = m.restoreStreams() + err = m._restoreStreams() assert.Regexp(t, "FF21028", err) } @@ -139,7 +139,7 @@ func TestRestoreListenersStartFail(t *testing.T) { err = m.persistence.WriteListener(m.ctx, e1l1) assert.NoError(t, err) - err = m.restoreStreams() + err = m._restoreStreams() assert.Regexp(t, "pop", err) mfc.AssertExpectations(t) @@ -168,7 +168,7 @@ func TestDeleteStartedListener(t *testing.T) { err = m.Start() assert.NoError(t, err) - err = m.deleteStream(m.ctx, es1.ID.String()) + err = m.DeleteStream(m.ctx, es1.ID.String()) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -188,7 +188,7 @@ func TestDeleteStartedListenerFail(t *testing.T) { }, nil) mp.On("DeleteListener", m.ctx, lID).Return(fmt.Errorf("pop")) - err := m.deleteAllStreamListeners(m.ctx, esID) + err := m._deleteAllStreamListeners(m.ctx, esID) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -219,7 +219,7 @@ func TestDeleteStartedListenerWithPagination(t *testing.T) { mp.On("DeleteListener", m.ctx, secondID).Return(nil) mp.On("DeleteListener", m.ctx, thirdID).Return(nil) - err := m.deleteAllStreamListeners(m.ctx, esID) + err := m._deleteAllStreamListeners(m.ctx, esID) assert.NoError(t, err) mp.AssertExpectations(t) @@ -230,7 +230,7 @@ func TestDeleteStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - err := m.deleteStream(m.ctx, "Bad ID") + err := m.DeleteStream(m.ctx, "Bad ID") assert.Regexp(t, "FF00138", err) } @@ -244,7 +244,7 @@ func TestDeleteStreamListenerPersistenceFail(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return(nil, fmt.Errorf("pop")) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -260,7 +260,7 @@ func TestDeleteStreamPersistenceFail(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteStream", m.ctx, esID).Return(fmt.Errorf("pop")) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -276,7 +276,7 @@ func TestDeleteStreamNotInitialized(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteStream", m.ctx, esID).Return(nil) - err := m.deleteStream(m.ctx, esID.String()) + err := m.DeleteStream(m.ctx, esID.String()) assert.NoError(t, err) mp.AssertExpectations(t) @@ -297,31 +297,31 @@ func TestCreateRenameStreamNameReservation(t *testing.T) { mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) // Reject missing name - _, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{}) + _, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{}) assert.Regexp(t, "FF21028", err) // Attempt to start and encounter a temporary error - _, err = m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + _, err = m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.Regexp(t, "temporary", err) // Ensure we still allow use of the name after the glitch is fixed - es1, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + es1, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.NoError(t, err) // Ensure we can't create another stream of same name - _, err = m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) + _, err = m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name1")}) assert.Regexp(t, "FF21047", err) // Create a second stream to test clash on rename - es2, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name2")}) + es2, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("Name2")}) assert.NoError(t, err) // Check for clash - _, err = m.updateStream(m.ctx, es1.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) + _, err = m.UpdateStream(m.ctx, es1.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) assert.Regexp(t, "FF21047", err) // Check for no-op rename to self - _, err = m.updateStream(m.ctx, es2.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) + _, err = m.UpdateStream(m.ctx, es2.ID.String(), &apitypes.EventStream{Name: strPtr("Name2")}) assert.NoError(t, err) mp.AssertExpectations(t) @@ -333,7 +333,7 @@ func TestCreateStreamValidateFail(t *testing.T) { defer close() wrongType := apitypes.DistributionMode("wrong") - _, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1"), Type: &wrongType}) + _, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1"), Type: &wrongType}) assert.Regexp(t, "FF21029", err) } @@ -342,7 +342,7 @@ func TestCreateAndStoreNewStreamListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.createAndStoreNewStreamListener(m.ctx, "bad", nil) + _, err := m.CreateAndStoreNewStreamListener(m.ctx, "bad", nil) assert.Regexp(t, "FF00138", err) } @@ -353,7 +353,7 @@ func TestUpdateExistingListenerNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, nil) - _, err := m.updateExistingListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String(), &apitypes.Listener{}, false) + _, err := m.UpdateListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String(), &apitypes.Listener{}, false) assert.Regexp(t, "FF21046", err) mp.AssertExpectations(t) @@ -363,7 +363,7 @@ func TestCreateOrUpdateListenerNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: apitypes.NewULID()}, false) + _, err := m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: apitypes.NewULID()}, false) assert.Regexp(t, "FF21045", err) } @@ -381,9 +381,9 @@ func TestCreateOrUpdateListenerFail(t *testing.T) { mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -402,14 +402,14 @@ func TestCreateOrUpdateListenerFailMergeEthCompatMethods(t *testing.T) { mfc.On("EventListenerVerifyOptions", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerVerifyOptionsResponse{}, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) l := &apitypes.Listener{ StreamID: es.ID, EthCompatMethods: fftypes.JSONAnyPtr(`{}`), } - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), l, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), l, false) assert.Error(t, err) mp.AssertExpectations(t) @@ -430,9 +430,9 @@ func TestCreateOrUpdateListenerWriteFail(t *testing.T) { mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerRemove", mock.Anything, mock.Anything).Return(&ffcapi.EventListenerRemoveResponse{}, ffcapi.ErrorReason(""), nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + _, err = m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -442,7 +442,7 @@ func TestDeleteListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - err := m.deleteListener(m.ctx, "bad ID", "bad ID") + err := m.DeleteListener(m.ctx, "bad ID", "bad ID") assert.Regexp(t, "FF00138", err) } @@ -455,7 +455,7 @@ func TestDeleteListenerStreamNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(l1, nil) - err := m.deleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + err := m.DeleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.Regexp(t, "FF21045", err) mp.AssertExpectations(t) @@ -477,14 +477,14 @@ func TestDeleteListenerFail(t *testing.T) { mfc.On("EventListenerAdd", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil) mfc.On("EventListenerRemove", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - l1, err := m.createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) + l1, err := m._createOrUpdateListener(m.ctx, apitypes.NewULID(), &apitypes.Listener{StreamID: es.ID}, false) assert.NoError(t, err) mp.On("GetListener", m.ctx, mock.Anything).Return(l1, nil) - err = m.deleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + err = m.DeleteListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -495,7 +495,7 @@ func TestUpdateStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.updateStream(m.ctx, "bad ID", &apitypes.EventStream{}) + _, err := m.UpdateStream(m.ctx, "bad ID", &apitypes.EventStream{}) assert.Regexp(t, "FF00138", err) } @@ -504,7 +504,7 @@ func TestUpdateStreamNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.updateStream(m.ctx, apitypes.NewULID().String(), &apitypes.EventStream{}) + _, err := m.UpdateStream(m.ctx, apitypes.NewULID().String(), &apitypes.EventStream{}) assert.Regexp(t, "FF21045", err) } @@ -520,10 +520,10 @@ func TestUpdateStreamBadChanges(t *testing.T) { mp.On("WriteStream", m.ctx, mock.Anything).Return(nil) mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) wrongType := apitypes.DistributionMode("wrong") - _, err = m.updateStream(m.ctx, es.ID.String(), &apitypes.EventStream{Type: &wrongType}) + _, err = m.UpdateStream(m.ctx, es.ID.String(), &apitypes.EventStream{Type: &wrongType}) assert.Regexp(t, "FF21029", err) } @@ -539,9 +539,9 @@ func TestUpdateStreamWriteFail(t *testing.T) { mp.On("WriteStream", m.ctx, mock.Anything).Return(fmt.Errorf("pop")) mp.On("GetCheckpoint", m.ctx, mock.Anything).Return(nil, nil) - es, err := m.createAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) + es, err := m.CreateAndStoreNewStream(m.ctx, &apitypes.EventStream{Name: strPtr("stream1")}) - _, err = m.updateStream(m.ctx, es.ID.String(), &apitypes.EventStream{}) + _, err = m.UpdateStream(m.ctx, es.ID.String(), &apitypes.EventStream{}) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -552,7 +552,7 @@ func TestGetStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStream(m.ctx, "bad ID") + _, err := m.GetStream(m.ctx, "bad ID") assert.Regexp(t, "FF00138", err) } @@ -561,7 +561,7 @@ func TestGetStreamNotFound(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStream(m.ctx, apitypes.NewULID().String()) + _, err := m.GetStream(m.ctx, apitypes.NewULID().String()) assert.Regexp(t, "FF21045", err) } @@ -570,7 +570,7 @@ func TestGetStreamsBadLimit(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getStreams(m.ctx, "", "wrong") + _, err := m.GetStreams(m.ctx, "", "wrong") assert.Regexp(t, "FF21044", err) } @@ -579,7 +579,7 @@ func TestGetListenerBadAfter(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListeners(m.ctx, "!bad UUID", "") + _, err := m.GetListeners(m.ctx, "!bad UUID", "") assert.Regexp(t, "FF00138", err) } @@ -588,7 +588,7 @@ func TestGetListenerBadStreamID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListener(m.ctx, "bad ID", apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, "bad ID", apitypes.NewULID().String()) assert.Regexp(t, "FF00138", err) } @@ -597,7 +597,7 @@ func TestGetListenerBadListenerID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), "bad ID") + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), "bad ID") assert.Regexp(t, "FF00138", err) } @@ -609,7 +609,7 @@ func TestGetListenerLookupErr(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) assert.Regexp(t, "pop", err) mp.AssertExpectations(t) @@ -623,7 +623,7 @@ func TestGetListenerNotFound(t *testing.T) { mp := m.persistence.(*persistencemocks.Persistence) mp.On("GetListener", m.ctx, mock.Anything).Return(nil, nil) - _, err := m.getListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) + _, err := m.GetListener(m.ctx, apitypes.NewULID().String(), apitypes.NewULID().String()) assert.Regexp(t, "FF21046", err) mp.AssertExpectations(t) @@ -662,7 +662,7 @@ func TestMergeEthCompatMethods(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"}`), } - err := mergeEthCompatMethods(context.Background(), l) + err := _mergeEthCompatMethods(context.Background(), l) assert.NoError(t, err) b, err := json.Marshal(l.Options) assert.NoError(t, err) @@ -673,7 +673,7 @@ func TestMergeEthCompatMethods(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: nil, } - err = mergeEthCompatMethods(context.Background(), l) + err = _mergeEthCompatMethods(context.Background(), l) assert.NoError(t, err) b, err = json.Marshal(l.Options) assert.NoError(t, err) @@ -686,14 +686,14 @@ func TestMergeEthCompatMethodsFail(t *testing.T) { EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"}`), } - err := mergeEthCompatMethods(context.Background(), l) + err := _mergeEthCompatMethods(context.Background(), l) assert.Error(t, err) l = &apitypes.Listener{ EthCompatMethods: fftypes.JSONAnyPtr(`[{"method1": "awesomeMethod"}]`), Options: fftypes.JSONAnyPtr(`{"otherOption": "otherValue"`), } - err = mergeEthCompatMethods(context.Background(), l) + err = _mergeEthCompatMethods(context.Background(), l) assert.Error(t, err) } @@ -708,7 +708,7 @@ func TestGetListenerStatusFailStillReturn(t *testing.T) { mfc := m.connector.(*ffcapimocks.API) mfc.On("EventListenerHWM", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")).Maybe() - l, err := m.getListener(m.ctx, l1.StreamID.String(), l1.ID.String()) + l, err := m.GetListener(m.ctx, l1.StreamID.String(), l1.ID.String()) assert.NoError(t, err) assert.Nil(t, l.Checkpoint) assert.False(t, l.Catchup) diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index c8d4ebc1..4dcaa0a4 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -40,7 +40,7 @@ func (m *manager) getTransactionByIDWithStatus(ctx context.Context, txID string, } func (m *manager) getTransactions(ctx context.Context, afterStr, limitStr, signer string, pending bool, dirString string) (transactions []*apitypes.ManagedTX, err error) { - limit, err := m.parseLimit(ctx, limitStr) + limit, err := m._parseLimit(ctx, limitStr) if err != nil { return nil, err } @@ -120,3 +120,24 @@ func (m *manager) requestTransactionResume(ctx context.Context, txID string) (st return http.StatusAccepted, canceledTx, nil } + +// exposing txhandler functions through manager +func (m *manager) HandleNewTransaction(ctx context.Context, txReq *apitypes.TransactionRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) { + return m.txHandler.HandleNewTransaction(ctx, txReq) +} + +func (m *manager) HandleNewContractDeployment(ctx context.Context, txReq *apitypes.ContractDeployRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) { + return m.txHandler.HandleNewContractDeployment(ctx, txReq) +} + +func (m *manager) HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleCancelTransaction(ctx, txID) +} + +func (m *manager) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleSuspendTransaction(ctx, txID) +} + +func (m *manager) HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + return m.txHandler.HandleResumeTransaction(ctx, txID) +} diff --git a/pkg/txhandler/txhandler.go b/pkg/txhandler/txhandler.go index 5802456f..882fa04f 100644 --- a/pkg/txhandler/txhandler.go +++ b/pkg/txhandler/txhandler.go @@ -154,7 +154,18 @@ type TransactionHandler interface { Start(ctx context.Context) (done <-chan struct{}, err error) // Event handling functions + // Instructional events: + TransactionManager + + // Informational events: + // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction + HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) + // HandleTransactionReceiptReceived - handles receipt of blockchain transactions for a managed transaction + HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) +} + +type TransactionManager interface { // HandleNewTransaction - handles event of adding new transactions onto blockchain HandleNewTransaction(ctx context.Context, txReq *apitypes.TransactionRequest) (mtx *apitypes.ManagedTX, submissionRejected bool, err error) // HandleNewContractDeployment - handles event of adding new smart contract deployment onto blockchain @@ -165,10 +176,4 @@ type TransactionHandler interface { HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // HandleResumeTransaction - handles event of resuming a suspended managed transaction HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) - - // Informational events: - // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction - HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) - // HandleTransactionReceiptReceived - handles receipt of blockchain transactions for a managed transaction - HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) } From ddfae10759044d59a6c19027be0a4376d1619752 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 9 Jul 2024 10:00:16 +0100 Subject: [PATCH 2/3] test internal event stream Signed-off-by: Chengxuan Xing --- internal/events/eventstream.go | 34 +++-- internal/events/eventstream_test.go | 134 ++++++++++++++++++- internal/tmmsgs/en_error_messages.go | 2 +- pkg/fftm/manager.go | 1 + pkg/fftm/route_post_eventstream_listeners.go | 2 +- pkg/fftm/stream_management.go | 6 +- pkg/fftm/stream_management_test.go | 4 +- 7 files changed, 151 insertions(+), 32 deletions(-) diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index 81ae0161..2c451926 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -131,6 +131,9 @@ func NewEventStream( internalDispatcher InternalEventsDispatcher, ) (ees Stream, err error) { esCtx := log.WithLogField(bgCtx, "eventstream", persistedSpec.ID.String()) + if persistedSpec.Type != nil && *persistedSpec.Type == apitypes.EventStreamTypeInternal && internalDispatcher == nil { + return nil, i18n.NewError(esCtx, tmmsgs.MsgMissingInternalDispatcher) + } es := &eventStream{ bgCtx: esCtx, status: apitypes.EventStreamStatusStopped, @@ -167,26 +170,21 @@ func NewEventStream( func (es *eventStream) initAction(startedState *startedStreamState) error { ctx := startedState.ctx - if es.internalDispatcher != nil { - if es.spec.Type != &apitypes.EventStreamTypeInternal { - // TODO: need to understand why this should be panic, copied from the default switch case - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamTypeForModuleMode, *es.spec.Type)) + + switch *es.spec.Type { + case apitypes.EventStreamTypeWebhook: + wa, err := newWebhookAction(ctx, es.spec.Webhook) + if err != nil { + return err } + startedState.action = wa.attemptBatch + case apitypes.EventStreamTypeWebSocket: + startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch + case apitypes.EventStreamTypeInternal: startedState.action = es.internalDispatcher.ProcessBatchedEvents - } else { - switch *es.spec.Type { - case apitypes.EventStreamTypeWebhook: - wa, err := newWebhookAction(ctx, es.spec.Webhook) - if err != nil { - return err - } - startedState.action = wa.attemptBatch - case apitypes.EventStreamTypeWebSocket: - startedState.action = newWebSocketAction(es.wsChannels, es.spec.WebSocket, *es.spec.Name).attemptBatch - default: - // mergeValidateEsConfig always be called previous to this - panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) - } + default: + // mergeValidateEsConfig always be called previous to this + panic(i18n.NewError(ctx, tmmsgs.MsgInvalidStreamType, *es.spec.Type)) } return nil } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 225e6487..d465d582 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -34,6 +34,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" "github.com/hyperledger/firefly-transaction-manager/internal/ws" "github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks" + "github.com/hyperledger/firefly-transaction-manager/mocks/eventsmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks" "github.com/hyperledger/firefly-transaction-manager/mocks/metricsmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/persistencemocks" @@ -69,12 +70,18 @@ func testESConf(t *testing.T, j string) (spec *apitypes.EventStream) { func newTestEventStream(t *testing.T, conf string) (es *eventStream) { tmconfig.Reset() - es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf) + es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, nil) + assert.NoError(t, err) + return es +} +func newTestInternalEventStream(t *testing.T, conf string, iedm InternalEventsDispatcher) (es *eventStream) { + tmconfig.Reset() + es, err := newTestEventStreamWithListener(t, &ffcapimocks.API{}, conf, iedm) assert.NoError(t, err) return es } -func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, listeners ...*apitypes.Listener) (es *eventStream, err error) { +func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf string, iedm InternalEventsDispatcher, listeners ...*apitypes.Listener) (es *eventStream, err error) { tmconfig.Reset() config.Set(tmconfig.EventStreamsDefaultsBatchTimeout, "1us") InitDefaults() @@ -91,7 +98,7 @@ func newTestEventStreamWithListener(t *testing.T, mfc *ffcapimocks.API, conf str &wsmocks.WebSocketChannels{}, listeners, emm, - nil, + iedm, ) mfc.On("EventStreamNewCheckpointStruct").Return(&utCheckpointType{}).Maybe() if err != nil { @@ -138,6 +145,24 @@ func TestNewTestEventStreamMissingID(t *testing.T) { assert.Regexp(t, "FF21048", err) } +func TestNewTestEventStreamMissingInternalDispatcher(t *testing.T) { + tmconfig.Reset() + InitDefaults() + emm := &metricsmocks.EventMetricsEmitter{} + + _, err := NewEventStream(context.Background(), &apitypes.EventStream{ + Type: &apitypes.EventStreamTypeInternal, + }, + &ffcapimocks.API{}, + &persistencemocks.Persistence{}, + &wsmocks.WebSocketChannels{}, + []*apitypes.Listener{}, + emm, + nil, + ) + assert.Regexp(t, "FF21091", err) +} + func TestNewTestEventStreamBadConfig(t *testing.T) { tmconfig.Reset() InitDefaults() @@ -536,6 +561,101 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) { mfc.AssertExpectations(t) } +func TestInternalEventStreamsE2EBlocks(t *testing.T) { + idem := &eventsmocks.InternalEventsDispatcher{} + + es := newTestInternalEventStream(t, `{ + "name": "ut_stream", + "type": "internal" + }`, idem) + + l := &apitypes.Listener{ + ID: apitypes.NewULID(), + Name: strPtr("ut_listener"), + Type: &apitypes.ListenerTypeBlocks, + FromBlock: strPtr(ffcapi.FromBlockLatest), + } + + started := make(chan (chan<- *ffcapi.ListenerEvent), 1) + mfc := es.connector.(*ffcapimocks.API) + + mfc.On("EventStreamStart", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStartRequest) bool { + return r.ID.Equals(es.spec.ID) + })).Run(func(args mock.Arguments) { + r := args[1].(*ffcapi.EventStreamStartRequest) + assert.Empty(t, r.InitialListeners) + }).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil) + + mfc.On("EventStreamStopped", mock.Anything, mock.MatchedBy(func(r *ffcapi.EventStreamStoppedRequest) bool { + return r.ID.Equals(es.spec.ID) + })).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil) + + mcm := es.confirmations.(*confirmationsmocks.Manager) + mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.MatchedBy(func(cp *ffcapi.BlockListenerCheckpoint) bool { + return cp.Block == 10000 + }), mock.Anything).Run(func(args mock.Arguments) { + started <- args[4].(chan<- *ffcapi.ListenerEvent) + }).Return(nil) + mcm.On("StopConfirmedBlockListener", mock.Anything, l.ID).Return(nil) + + msp := es.persistence.(*persistencemocks.Persistence) + // load existing checkpoint on start + msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(&apitypes.EventStreamCheckpoint{ + StreamID: es.spec.ID, + Time: fftypes.Now(), + Listeners: map[fftypes.UUID]json.RawMessage{ + *l.ID: []byte(`{"block":10000}`), + }, + }, nil) + // write a valid checkpoint + msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool { + return cp.StreamID.Equals(es.spec.ID) && string(cp.Listeners[*l.ID]) == `{"block":10001}` + })).Return(nil) + // write a checkpoint when we delete + msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool { + return cp.StreamID.Equals(es.spec.ID) && cp.Listeners[*l.ID] == nil + })).Return(nil) + + _, err := es.AddOrUpdateListener(es.bgCtx, l.ID, l, false) + assert.NoError(t, err) + + err = es.Start(es.bgCtx) + assert.NoError(t, err) + + assert.Equal(t, apitypes.EventStreamStatusStarted, es.Status()) + + err = es.Start(es.bgCtx) // double start is error + assert.Regexp(t, "FF21027", err) + + r := <-started + + mockListenerEvent := &ffcapi.ListenerEvent{ + Checkpoint: &ffcapi.BlockListenerCheckpoint{Block: 10001}, + BlockEvent: &ffcapi.BlockEvent{ + ListenerID: l.ID, + BlockInfo: ffcapi.BlockInfo{ + BlockNumber: fftypes.NewFFBigInt(10001), + BlockHash: fftypes.NewRandB32().String(), + ParentHash: fftypes.NewRandB32().String(), + }, + }, + } + + idem.On("ProcessBatchedEvents", mock.Anything, 1, 1, mock.MatchedBy(func(events []*ffcapi.ListenerEvent) bool { + return events[0] == mockListenerEvent + })).Return() + + r <- mockListenerEvent + + err = es.RemoveListener(es.bgCtx, l.ID) + assert.NoError(t, err) + + err = es.Stop(es.bgCtx) + assert.NoError(t, err) + + mfc.AssertExpectations(t) +} + func TestStartEventStreamCheckpointReadFail(t *testing.T) { es := newTestEventStream(t, `{ @@ -740,7 +860,7 @@ func TestStartWithExistingStreamOk(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -760,7 +880,7 @@ func TestStartWithExistingStreamFail(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.Regexp(t, "pop", err) mfc.AssertExpectations(t) @@ -1166,7 +1286,7 @@ func TestStartWithExistingBlockListener(t *testing.T) { _, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`, l) + }`, nil, l) assert.NoError(t, err) mfc.AssertExpectations(t) @@ -1184,7 +1304,7 @@ func TestStartAndAddBadListenerType(t *testing.T) { es, err := newTestEventStreamWithListener(t, mfc, `{ "name": "ut_stream" - }`) + }`, nil) assert.NoError(t, err) _, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false) diff --git a/internal/tmmsgs/en_error_messages.go b/internal/tmmsgs/en_error_messages.go index ac6cd362..be1d4e39 100644 --- a/internal/tmmsgs/en_error_messages.go +++ b/internal/tmmsgs/en_error_messages.go @@ -105,5 +105,5 @@ var ( MsgBlockListenerNotStarted = ffe("FF21088", "Block listener %s not started", http.StatusConflict) MsgBadListenerType = ffe("FF21089", "Invalid listener type: %s", http.StatusBadRequest) MsgFromBlockInvalid = ffe("FF21090", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest) - MsgInvalidStreamTypeForModuleMode = ffe("FF21091", "Invalid event stream type '%s', only 'internal' type is supported for module mode", http.StatusBadRequest) + MsgMissingInternalDispatcher = ffe("FF21091", "'internal' type is supported for module mode") ) diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index ccfd8e31..cab8d747 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -43,6 +43,7 @@ import ( type Manager interface { Start() error StreamManager + ListenerManager txhandler.TransactionManager Close() } diff --git a/pkg/fftm/route_post_eventstream_listeners.go b/pkg/fftm/route_post_eventstream_listeners.go index 5d19c723..f401c4d0 100644 --- a/pkg/fftm/route_post_eventstream_listeners.go +++ b/pkg/fftm/route_post_eventstream_listeners.go @@ -38,7 +38,7 @@ var postEventStreamListeners = func(m *manager) *ffapi.Route { JSONOutputValue: func() interface{} { return &apitypes.Listener{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.CreateAndStoreNewStreamListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) + return m.CreateAndStoreNewListener(r.Req.Context(), r.PP["streamId"], r.Input.(*apitypes.Listener)) }, } } diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 81161e60..8351d012 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -44,8 +44,8 @@ type StreamManager interface { DeleteStream(ctx context.Context, idStr string) error } -type StreamListenerManager interface { - CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) +type ListenerManager interface { + CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) GetListeners(ctx context.Context, afterStr, limitStr string) (streams []*apitypes.Listener, err error) GetListener(ctx context.Context, streamIDStr, listenerIDStr string) (l *apitypes.ListenerWithStatus, err error) UpdateListener(ctx context.Context, streamIDStr, listenerIDStr string, updates *apitypes.Listener, reset bool) (*apitypes.Listener, error) @@ -306,7 +306,7 @@ func (m *manager) createAndStoreNewListenerDeprecated(ctx context.Context, def * return m._createOrUpdateListener(ctx, apitypes.NewULID(), def, false) } -func (m *manager) CreateAndStoreNewStreamListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { +func (m *manager) CreateAndStoreNewListener(ctx context.Context, idStr string, def *apitypes.Listener) (*apitypes.Listener, error) { streamID, err := fftypes.ParseUUID(ctx, idStr) if err != nil { return nil, err diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 2e8f19a1..83d39534 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -338,11 +338,11 @@ func TestCreateStreamValidateFail(t *testing.T) { } -func TestCreateAndStoreNewStreamListenerBadID(t *testing.T) { +func TestCreateAndStoreNewListenerBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) defer close() - _, err := m.CreateAndStoreNewStreamListener(m.ctx, "bad", nil) + _, err := m.CreateAndStoreNewListener(m.ctx, "bad", nil) assert.Regexp(t, "FF00138", err) } From 7038fa9f79d46db1e5df5d31a74f7694e29793d0 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 9 Jul 2024 11:53:11 +0100 Subject: [PATCH 3/3] update module manager function reference Signed-off-by: Chengxuan Xing --- pkg/fftm/manager_test.go | 35 ++++++++++++++++++++++++++++++ pkg/fftm/route__root_command.go | 4 ++-- pkg/fftm/transaction_management.go | 6 ++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/pkg/fftm/manager_test.go b/pkg/fftm/manager_test.go index b7ab9bb0..79886d20 100644 --- a/pkg/fftm/manager_test.go +++ b/pkg/fftm/manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" "github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks" + "github.com/hyperledger/firefly-transaction-manager/mocks/fftmmocks" "github.com/hyperledger/firefly-transaction-manager/mocks/persistencemocks" "github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" @@ -398,3 +399,37 @@ func TestPSQLInitRichQueryEnabled(t *testing.T) { assert.True(t, m.richQueryEnabled) assert.NotNil(t, m.toolkit.RichQuery) } + +func TestModuleMode(t *testing.T) { + + _ = testManagerCommonInit(t, false) + + dir := t.TempDir() + config.Set(tmconfig.PersistenceLevelDBPath, dir) + + m := newManager(context.Background(), &ffcapimocks.API{}, &fftmmocks.ModuleFunctions{}) + mpm := &persistencemocks.Persistence{} + mpm.On("Close", mock.Anything).Return(nil) + mpm.On("ListStreamsByCreateTime", mock.Anything, mock.Anything, startupPaginationLimit, txhandler.SortDirectionAscending).Return(nil, nil) + mrq := &persistencemocks.RichQuery{} + mpm.On("RichQuery").Return(mrq) + m.persistence = mpm + m.richQueryEnabled = true + mcm := &confirmationsmocks.Manager{} + m.confirmations = mcm + mcm.On("Start").Return().Maybe() + mca := m.connector.(*ffcapimocks.API) + mca.On("NewBlockListener", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReason(""), nil).Maybe() + + err := m.initPersistence(context.Background()) + assert.NoError(t, err) + + err = m.initServices(context.Background()) + assert.NoError(t, err) + assert.Nil(t, m.apiServer) + assert.Nil(t, m.metricsServer) + assert.Nil(t, m.wsServer) + + err = m.Start() + assert.NoError(t, err) +} diff --git a/pkg/fftm/route__root_command.go b/pkg/fftm/route__root_command.go index df94dd93..e8bb29a7 100644 --- a/pkg/fftm/route__root_command.go +++ b/pkg/fftm/route__root_command.go @@ -83,7 +83,7 @@ var postRootCommand = func(m *manager) *ffapi.Route { if err = baseReq.UnmarshalTo(&tReq); err != nil { return nil, true /* reject */, i18n.NewError(r.Req.Context(), tmmsgs.MsgInvalidRequestErr, baseReq.Headers.Type, err) } - return m.txHandler.HandleNewTransaction(r.Req.Context(), &tReq) + return m.HandleNewTransaction(r.Req.Context(), &tReq) }), nil case apitypes.RequestTypeDeploy: // We have to supply an extra submissionRejected boolean on submission errors @@ -92,7 +92,7 @@ var postRootCommand = func(m *manager) *ffapi.Route { if err = baseReq.UnmarshalTo(&tReq); err != nil { return nil, true /* reject */, i18n.NewError(r.Req.Context(), tmmsgs.MsgInvalidRequestErr, baseReq.Headers.Type, err) } - return m.txHandler.HandleNewContractDeployment(r.Req.Context(), &tReq) + return m.HandleNewContractDeployment(r.Req.Context(), &tReq) }), nil case apitypes.RequestTypeQuery: var tReq apitypes.QueryRequest diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index 4dcaa0a4..2588a89b 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -87,7 +87,7 @@ func (m *manager) getTransactions(ctx context.Context, afterStr, limitStr, signe func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { - canceledTx, err := m.txHandler.HandleCancelTransaction(ctx, txID) + canceledTx, err := m.HandleCancelTransaction(ctx, txID) if err != nil { return http.StatusInternalServerError, nil, err @@ -99,7 +99,7 @@ func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) ( func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { - canceledTx, err := m.txHandler.HandleSuspendTransaction(ctx, txID) + canceledTx, err := m.HandleSuspendTransaction(ctx, txID) if err != nil { return http.StatusInternalServerError, nil, err @@ -111,7 +111,7 @@ func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (s func (m *manager) requestTransactionResume(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { - canceledTx, err := m.txHandler.HandleResumeTransaction(ctx, txID) + canceledTx, err := m.HandleResumeTransaction(ctx, txID) if err != nil { return http.StatusInternalServerError, nil, err