Skip to content

Commit

Permalink
Wait until post-connect called before returning from Connec() and can…
Browse files Browse the repository at this point in the history
…cel RPC calls

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Oct 6, 2023
1 parent 7a06663 commit 7e346ca
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 16 deletions.
2 changes: 2 additions & 0 deletions internal/signermsgs/en_error_messges.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ var (
MsgInvalidSigner = ffe("FF22064", "Invalid signer")
MsgResultParseFailed = ffe("FF22065", "Failed to parse result (expected=%T): %s")
MsgSubscribeResponseInvalid = ffe("FF22066", "Subscription response invalid")
MsgWebSocketReconnected = ffe("FF22067", "WebSocket reconnected during JSON/RPC call")
MsgContextCancelledWSConnect = ffe("FF22068", "Context canceled while connecting WebSocket")
)
53 changes: 44 additions & 9 deletions pkg/rpcbackend/wsbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type wsRPCClient struct {
wsConf wsclient.WSConfig
client wsclient.WSClient
requestCounter int64
connected chan struct{}
calls map[string]chan *RPCResponse
configuredSubs map[fftypes.UUID]*sub
pendingSubsByReqID map[string]*sub
Expand All @@ -95,11 +96,23 @@ func (rc *wsRPCClient) Connect(ctx context.Context) (err error) {
if err != nil {
return err
}
go rc.receiveLoop(log.WithLogField(ctx, "role", "rpc_websocket"))

// Wait until the afterConnect hook has been driven
connected := make(chan struct{})
rc.connected = connected
if err := rc.client.Connect(); err != nil {
return err
}
go rc.receiveLoop(log.WithLogField(ctx, "role", "rpc_websocket"))
return rc.waitConnected(ctx, connected)
}

func (rc *wsRPCClient) waitConnected(ctx context.Context, connected chan struct{}) error {
select {
case <-connected:
case <-ctx.Done():
return i18n.NewError(ctx, signermsgs.MsgContextCancelledWSConnect)
}
return nil
}

Expand All @@ -111,16 +124,26 @@ func (rc *wsRPCClient) Close() {

func (rc *wsRPCClient) handleReconnect(ctx context.Context, w wsclient.WSClient) error {
if !rc.wsConf.DisableReconnect {
subs := rc.clearActiveReturnConfiguredSubs()
calls, subs := rc.clearActiveReturnConfiguredSubs()
for rpcID, c := range calls {
rc.deliverCallResponse(ctx, c, &RPCResponse{
ID: fftypes.JSONAnyPtr(`"` + rpcID + `"`),
Error: NewRPCError(ctx, RPCCodeInternalError, signermsgs.MsgWebSocketReconnected),
})
}
for _, s := range subs {
reqID, rpcErr := s.sendSubscribe(ctx)
log.L(ctx).Infof("RPC[%s]: Resubscribing %s after WebSocket reconnect", reqID, s.localID)
log.L(ctx).Infof("Resubscribing %s after WebSocket reconnect", s.localID)
_, rpcErr := s.sendSubscribe(ctx)
if rpcErr != nil {
log.L(ctx).Errorf("Failed to send resubscribe: %s", rpcErr)
return rpcErr.Error()
}
}
}
if rc.connected != nil {
close(rc.connected)
rc.connected = nil
}
return nil
}

Expand Down Expand Up @@ -213,9 +236,12 @@ func (rc *wsRPCClient) removeConfiguredSub(id *fftypes.UUID) {
delete(rc.configuredSubs, *id)
}

func (rc *wsRPCClient) clearActiveReturnConfiguredSubs() map[fftypes.UUID]*sub {
func (rc *wsRPCClient) clearActiveReturnConfiguredSubs() (map[string]chan *RPCResponse, map[fftypes.UUID]*sub) {
rc.mux.Lock()
defer rc.mux.Unlock()
// Return a copy of all the in-flight RPC calls, before we clear those (as they will all be defunct now)
calls := rc.calls
rc.calls = make(map[string]chan *RPCResponse)
// Clear the active state as considered now invalid after a reconnect
rc.activeSubsBySubID = make(map[string]*sub)
rc.pendingSubsByReqID = make(map[string]*sub)
Expand All @@ -226,7 +252,7 @@ func (rc *wsRPCClient) clearActiveReturnConfiguredSubs() map[fftypes.UUID]*sub {
s.pendingReqID = ""
subs[id] = s
}
return subs
return calls, subs
}

func (rc *wsRPCClient) getAllSubs() []*sub {
Expand Down Expand Up @@ -407,7 +433,7 @@ func (rc *wsRPCClient) handleSubscriptionNotification(ctx context.Context, rpcRe
}
}

func (rc *wsRPCClient) handleSubscriptionConfirm(ctx context.Context, rpcRes *RPCResponse, inflightSub *sub) {
func (rc *wsRPCClient) handleSubscriptionConfirm(ctx context.Context, inflightSub *sub, rpcRes *RPCResponse) {
resChl := inflightSub.newSubResponse
inflightSub.newSubResponse = nil // we only dispatch once (it's only new once, on reconnect it's old and there's nobody to tell if we fail)
if rpcRes.Error != nil && rpcRes.Error.Code != 0 {
Expand All @@ -434,6 +460,15 @@ func (rc *wsRPCClient) handleSubscriptionConfirm(ctx context.Context, rpcRes *RP
resChl <- nil
}

func (rc *wsRPCClient) deliverCallResponse(ctx context.Context, inflightCall chan *RPCResponse, rpcRes *RPCResponse) {
select {
case inflightCall <- rpcRes:
default:
// only considered for the very edge case of reconnect - the inflight response should only be
// in the map until it's sent a single response, and there's a slot to ensure it never blocks
}
}

func (rc *wsRPCClient) receiveLoop(ctx context.Context) {
for {
bytes, ok := <-rc.client.Receive()
Expand All @@ -453,9 +488,9 @@ func (rc *wsRPCClient) receiveLoop(ctx context.Context) {
inflightSub, inflightCall := rc.popInflight(rpcRes.ID.AsString())
switch {
case inflightSub != nil:
rc.handleSubscriptionConfirm(ctx, &rpcRes, inflightSub)
rc.handleSubscriptionConfirm(ctx, inflightSub, &rpcRes)
case inflightCall != nil:
inflightCall <- &rpcRes // assured not to block as we allocate one slot, and pop first time we see it
rc.deliverCallResponse(ctx, inflightCall, &rpcRes)
default:
log.L(ctx).Warnf("RPC[%s] <-- Received unexpected RPC response: %+v", rpcRes.ID.AsString(), rpcRes)
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/rpcbackend/wsbackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func newTestWSRPC(t *testing.T) (context.Context, *wsRPCClient, chan string, cha
rc := NewWSRPCClient(wsConfig)
ctx, cancelCtx := context.WithCancel(context.Background())
return ctx, rc.(*wsRPCClient), toServer, fromServer, func() {
rc.Close()
close()
rc.Close()
cancelCtx()
}
}
Expand Down Expand Up @@ -329,13 +329,13 @@ func TestHandleSubscriptionConfirmServerError(t *testing.T) {
defer done()

errChl := make(chan *RPCError, 1)
rc.handleSubscriptionConfirm(ctx, &RPCResponse{
rc.handleSubscriptionConfirm(ctx, &sub{
newSubResponse: errChl,
}, &RPCResponse{
Error: &RPCError{
Code: int64(RPCCodeInternalError),
Message: "pop",
},
}, &sub{
newSubResponse: errChl,
})
rpcErr := <-errChl
assert.Regexp(t, "pop", rpcErr.Error())
Expand All @@ -346,7 +346,7 @@ func TestHandleSubscriptionConfirmBadSub(t *testing.T) {
defer done()

errChl := make(chan *RPCError, 1)
rc.handleSubscriptionConfirm(ctx, &RPCResponse{}, &sub{newSubResponse: errChl})
rc.handleSubscriptionConfirm(ctx, &sub{newSubResponse: errChl}, &RPCResponse{})
rpcErr := <-errChl
assert.Regexp(t, "FF22066", rpcErr.Error())
}
Expand Down Expand Up @@ -382,17 +382,23 @@ func TestHandleReonnnectOK(t *testing.T) {

s, errChl := rc.addConfiguredSub(ctx, []interface{}{"newHeads"})

inflightID, inflightRes := rc.addInflightRequest(&RPCRequest{})

go func() {
msg := <-toServer
assert.Equal(t, `{"jsonrpc":"2.0","id":"000000001","method":"eth_subscribe","params":["newHeads"]}`, msg)
fromServer <- `{"jsonrpc":"2.0","id":"000000001","result":"0x9ce59a13059e417087c02d3236a0b1cc"}`
assert.Equal(t, `{"jsonrpc":"2.0","id":"000000002","method":"eth_subscribe","params":["newHeads"]}`, msg)
fromServer <- `{"jsonrpc":"2.0","id":"000000002","result":"0x9ce59a13059e417087c02d3236a0b1cc"}`
}()

err = rc.handleReconnect(ctx, rc.client)
assert.NoError(t, err)
rpcErr := <-errChl
assert.Nil(t, rpcErr)
assert.Equal(t, "0x9ce59a13059e417087c02d3236a0b1cc", s.currentSubID)

rpcRes := <-inflightRes
assert.Regexp(t, "FF22067", rpcRes.Error.Error())
assert.Equal(t, inflightID, rpcRes.ID.AsString())
}

func TestHandleReonnnectFail(t *testing.T) {
Expand All @@ -412,3 +418,23 @@ func TestHandleReonnnectFail(t *testing.T) {
err = rc.handleReconnect(ctx, rc.client)
assert.Regexp(t, "FF22011", err)
}

func TestConnectClosedContextFail(t *testing.T) {
ctx, rc, _, _, done := newTestWSRPC(t)

err := rc.Connect(ctx)
assert.NoError(t, err)
done()

connected := make(chan struct{})
rc.connected = connected
err = rc.waitConnected(ctx, connected)
assert.Regexp(t, "FF22068", err)
}

func TestDeliverCallResponseNonBlocking(t *testing.T) {
ctx, rc, _, _, done := newTestWSRPC(t)
defer done()

rc.deliverCallResponse(ctx, make(chan *RPCResponse), &RPCResponse{})
}

0 comments on commit 7e346ca

Please sign in to comment.