Skip to content

Commit

Permalink
Hook fully up websocket change events
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Apr 20, 2022
1 parent 4554050 commit 7dde6d7
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 44 deletions.
8 changes: 4 additions & 4 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Manager interface {
type manager struct {
ctx context.Context
cancelCtx func()
changeEvents chan *fftypes.ChangeEvent
connectorAPI ffcapi.API
confirmations confirmations.Manager
policyEngine policyengine.PolicyEngine
Expand Down Expand Up @@ -80,7 +79,6 @@ type manager struct {
func NewManager(ctx context.Context) (Manager, error) {
var err error
m := &manager{
changeEvents: make(chan *fftypes.ChangeEvent),
connectorAPI: ffcapi.NewFFCAPI(ctx),
ffCoreClient: ffresty.New(ctx, tmconfig.FFCorePrefix),
fullScanRequests: make(chan bool, 1),
Expand Down Expand Up @@ -128,7 +126,6 @@ type pendingState struct {
}

func (m *manager) startChangeListener(ctx context.Context, w wsclient.WSClient) error {
log.L(m.ctx).Infof("Change listener connected")
cmd := fftypes.WSChangeEventCommand{
Type: fftypes.WSChangeEventCommandTypeStart,
Collections: []string{"operations"},
Expand All @@ -137,6 +134,7 @@ func (m *manager) startChangeListener(ctx context.Context, w wsclient.WSClient)
},
}
b, _ := json.Marshal(&cmd)
log.L(m.ctx).Infof("Change listener connected. Sent: %s", b)
return w.Send(ctx, b)
}

Expand Down Expand Up @@ -291,7 +289,9 @@ func (m *manager) changeEventLoop() {
defer close(m.changeEventLoopDone)
for {
select {
case ce := <-m.changeEvents:
case b := <-m.wsClient.Receive():
var ce *fftypes.ChangeEvent
_ = json.Unmarshal(b, &ce)
m.handleEvent(ce)
case <-m.ctx.Done():
log.L(m.ctx).Infof("Change event loop exiting")
Expand Down
40 changes: 0 additions & 40 deletions internal/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,46 +137,6 @@ func TestNewManagerBadPolicyEngine(t *testing.T) {

}

func TestChangeEventsNewTracked(t *testing.T) {

ce := &fftypes.ChangeEvent{
ID: fftypes.NewUUID(),
Type: fftypes.ChangeEventTypeUpdated,
Collection: "operations",
Namespace: "ns1",
}

var m *manager
_, m, cancel := newTestManager(t,
func(w http.ResponseWriter, r *http.Request) {},
func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodGet, r.Method)
assert.Equal(t, fmt.Sprintf("/admin/api/v1/operations/%s", ce.ID), r.URL.Path)
b, err := json.Marshal(newTestOperation(t, &fftm.ManagedTXOutput{
ID: ce.ID,
FFTMName: testManagerName,
Request: &fftm.TransactionRequest{},
}, fftypes.OpStatusPending))
assert.NoError(t, err)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
w.Write(b)
// Cancel context here so loop ends
m.cancelCtx()
},
)
defer cancel()

m.changeEvents = make(chan *fftypes.ChangeEvent, 1)
m.changeEventLoopDone = make(chan struct{})
m.changeEvents <- ce

m.changeEventLoop()

assert.Equal(t, ce.ID, m.pendingOpsByID[*ce.ID].mtx.ID)

}

func TestChangeEventsNewBadOutput(t *testing.T) {

ce := &fftypes.ChangeEvent{
Expand Down

0 comments on commit 7dde6d7

Please sign in to comment.