From 7dde6d7381c27b6c6b9820ccaca99520767c6f35 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 20 Apr 2022 04:16:30 -0400 Subject: [PATCH] Hook fully up websocket change events Signed-off-by: Peter Broadhurst --- internal/manager/manager.go | 8 +++---- internal/manager/manager_test.go | 40 -------------------------------- 2 files changed, 4 insertions(+), 44 deletions(-) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index ecbef5c2..dd55a15b 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -48,7 +48,6 @@ type Manager interface { type manager struct { ctx context.Context cancelCtx func() - changeEvents chan *fftypes.ChangeEvent connectorAPI ffcapi.API confirmations confirmations.Manager policyEngine policyengine.PolicyEngine @@ -80,7 +79,6 @@ type manager struct { func NewManager(ctx context.Context) (Manager, error) { var err error m := &manager{ - changeEvents: make(chan *fftypes.ChangeEvent), connectorAPI: ffcapi.NewFFCAPI(ctx), ffCoreClient: ffresty.New(ctx, tmconfig.FFCorePrefix), fullScanRequests: make(chan bool, 1), @@ -128,7 +126,6 @@ type pendingState struct { } func (m *manager) startChangeListener(ctx context.Context, w wsclient.WSClient) error { - log.L(m.ctx).Infof("Change listener connected") cmd := fftypes.WSChangeEventCommand{ Type: fftypes.WSChangeEventCommandTypeStart, Collections: []string{"operations"}, @@ -137,6 +134,7 @@ func (m *manager) startChangeListener(ctx context.Context, w wsclient.WSClient) }, } b, _ := json.Marshal(&cmd) + log.L(m.ctx).Infof("Change listener connected. Sent: %s", b) return w.Send(ctx, b) } @@ -291,7 +289,9 @@ func (m *manager) changeEventLoop() { defer close(m.changeEventLoopDone) for { select { - case ce := <-m.changeEvents: + case b := <-m.wsClient.Receive(): + var ce *fftypes.ChangeEvent + _ = json.Unmarshal(b, &ce) m.handleEvent(ce) case <-m.ctx.Done(): log.L(m.ctx).Infof("Change event loop exiting") diff --git a/internal/manager/manager_test.go b/internal/manager/manager_test.go index 2b95dc03..0082888b 100644 --- a/internal/manager/manager_test.go +++ b/internal/manager/manager_test.go @@ -137,46 +137,6 @@ func TestNewManagerBadPolicyEngine(t *testing.T) { } -func TestChangeEventsNewTracked(t *testing.T) { - - ce := &fftypes.ChangeEvent{ - ID: fftypes.NewUUID(), - Type: fftypes.ChangeEventTypeUpdated, - Collection: "operations", - Namespace: "ns1", - } - - var m *manager - _, m, cancel := newTestManager(t, - func(w http.ResponseWriter, r *http.Request) {}, - func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, http.MethodGet, r.Method) - assert.Equal(t, fmt.Sprintf("/admin/api/v1/operations/%s", ce.ID), r.URL.Path) - b, err := json.Marshal(newTestOperation(t, &fftm.ManagedTXOutput{ - ID: ce.ID, - FFTMName: testManagerName, - Request: &fftm.TransactionRequest{}, - }, fftypes.OpStatusPending)) - assert.NoError(t, err) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - w.Write(b) - // Cancel context here so loop ends - m.cancelCtx() - }, - ) - defer cancel() - - m.changeEvents = make(chan *fftypes.ChangeEvent, 1) - m.changeEventLoopDone = make(chan struct{}) - m.changeEvents <- ce - - m.changeEventLoop() - - assert.Equal(t, ce.ID, m.pendingOpsByID[*ce.ID].mtx.ID) - -} - func TestChangeEventsNewBadOutput(t *testing.T) { ce := &fftypes.ChangeEvent{