From 218d1e9bc97df4eac21214e910784dca6b188b11 Mon Sep 17 00:00:00 2001 From: andrew-coleman Date: Tue, 24 Jan 2023 16:38:25 +0000 Subject: [PATCH] BroadcastTimeout for Gateway MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Gateway has retry logic for failing ordering nodes, however if an ordering node doesn’t respond, then the Submit call will timeout without allowing other ordering nodes to be tried. This commit implements a BroadcastTimeout that allows individual OSN’s to timeout before the overall Submit timeout expires. Similar logic already exists for calls to endorser peers. Signed-off-by: andrew-coleman (cherry picked from commit 867fbedd06c667ac880ebe82b5a18eddc059ec33) --- core/peer/config_test.go | 1 + internal/pkg/gateway/api.go | 23 +++- internal/pkg/gateway/api_test.go | 111 ++++++++++++++++++++ internal/pkg/gateway/config/options.go | 6 ++ internal/pkg/gateway/config/options_test.go | 3 + sampleconfig/core.yaml | 3 + 6 files changed, 145 insertions(+), 2 deletions(-) diff --git a/core/peer/config_test.go b/core/peer/config_test.go index c2092eee888..533923a8ee7 100644 --- a/core/peer/config_test.go +++ b/core/peer/config_test.go @@ -387,6 +387,7 @@ func TestGlobalConfig(t *testing.T) { GatewayOptions: config.Options{ Enabled: true, EndorsementTimeout: 10 * time.Second, + BroadcastTimeout: 10 * time.Second, DialTimeout: 60 * time.Second, }, } diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index cda71fe340e..a725383d95f 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -320,7 +320,7 @@ func (gs *Server) planFromFirstEndorser(ctx context.Context, channel string, cha }() select { case <-done: - // Endorser completedLayout normally + // Endorser completed normally case <-ctx.Done(): // Overall endorsement timeout expired logger.Warn("Endorse call timed out while collecting first endorsement") @@ -438,7 +438,26 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su for _, index := range rand.Perm(len(orderers)) { orderer := orderers[index] logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress) - response, err := gs.broadcast(ctx, orderer, txn) + + var response *ab.BroadcastResponse + var err error + done := make(chan struct{}) + go func() { + defer close(done) + ctx, cancel := context.WithTimeout(ctx, gs.options.BroadcastTimeout) + defer cancel() + + response, err = gs.broadcast(ctx, orderer, txn) + }() + select { + case <-done: + // Broadcast completed normally + case <-ctx.Done(): + // Overall submit timeout expired + logger.Warn("Submit call timed out while broadcasting to ordering service") + return nil, newRpcError(codes.DeadlineExceeded, "submit timeout expired while broadcasting to ordering service") + } + if err != nil { errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error())) logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "err", err) diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index e65e25f891e..58ae2b70a62 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -121,6 +121,7 @@ const ( testChannel = "test_channel" testChaincode = "test_chaincode" endorsementTimeout = -1 * time.Second + broadcastTimeout = 100 * time.Millisecond ) type testDef struct { @@ -1306,6 +1307,115 @@ func TestSubmit(t *testing.T) { } }, }, + { + name: "orderer timeout - retry", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock}}, + }, + config: &dp.ConfigResult{ + Orderers: map[string]*dp.Endpoints{ + "msp1": { + Endpoint: []*dp.Endpoint{ + {Host: "orderer1", Port: 7050}, + {Host: "orderer2", Port: 7050}, + {Host: "orderer3", Port: 7050}, + }, + }, + }, + }, + postSetup: func(t *testing.T, def *preparedTest) { + def.ctx, def.cancel = context.WithTimeout(def.ctx, 300*time.Millisecond) + broadcastTime := 200 * time.Millisecond // first invocation exceeds BroadcastTimeout + + abc := &mocks.ABClient{} + abbc := &mocks.ABBClient{} + abbc.SendReturns(nil) + abbc.RecvReturns(&ab.BroadcastResponse{ + Info: "success", + Status: cp.Status(200), + }, nil) + abc.BroadcastStub = func(ctx context.Context, co ...grpc.CallOption) (ab.AtomicBroadcast_BroadcastClient, error) { + defer func() { + broadcastTime = time.Millisecond // subsequent invocations will not timeout + }() + select { + case <-time.After(broadcastTime): + return abbc, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + def.server.registry.endpointFactory = &endpointFactory{ + timeout: 5 * time.Second, + connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient { + return &mocks.EndorserClient{} + }, + connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient { + return abc + }, + dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return nil, nil + }, + } + }, + postTest: func(t *testing.T, def *preparedTest) { + def.cancel() + }, + }, + { + name: "submit timeout", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock}}, + }, + config: &dp.ConfigResult{ + Orderers: map[string]*dp.Endpoints{ + "msp1": { + Endpoint: []*dp.Endpoint{ + {Host: "orderer1", Port: 7050}, + {Host: "orderer2", Port: 7050}, + {Host: "orderer3", Port: 7050}, + }, + }, + }, + }, + postSetup: func(t *testing.T, def *preparedTest) { + def.ctx, def.cancel = context.WithTimeout(def.ctx, 50*time.Millisecond) + broadcastTime := 200 * time.Millisecond // invocation exceeds BroadcastTimeout + + abc := &mocks.ABClient{} + abbc := &mocks.ABBClient{} + abbc.SendReturns(nil) + abbc.RecvReturns(&ab.BroadcastResponse{ + Info: "success", + Status: cp.Status(200), + }, nil) + abc.BroadcastStub = func(ctx context.Context, co ...grpc.CallOption) (ab.AtomicBroadcast_BroadcastClient, error) { + select { + case <-time.After(broadcastTime): + return abbc, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + def.server.registry.endpointFactory = &endpointFactory{ + timeout: 5 * time.Second, + connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient { + return &mocks.EndorserClient{} + }, + connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient { + return abc + }, + dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return nil, nil + }, + } + }, + postTest: func(t *testing.T, def *preparedTest) { + def.cancel() + }, + errCode: codes.DeadlineExceeded, + errString: "submit timeout expired while broadcasting to ordering service", + }, { name: "multiple orderers all fail", plan: endorsementPlan{ @@ -2284,6 +2394,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { options := config.Options{ Enabled: true, EndorsementTimeout: endorsementTimeout, + BroadcastTimeout: broadcastTimeout, } member := gdiscovery.NetworkMember{ diff --git a/internal/pkg/gateway/config/options.go b/internal/pkg/gateway/config/options.go index 0e044b00153..704d859d3cb 100644 --- a/internal/pkg/gateway/config/options.go +++ b/internal/pkg/gateway/config/options.go @@ -18,6 +18,8 @@ type Options struct { Enabled bool // EndorsementTimeout is used to specify the maximum time to wait for endorsement responses from external peers. EndorsementTimeout time.Duration + // BroadcastTimeout is used to specify the maximum time to wait for responses from ordering nodes. + BroadcastTimeout time.Duration // DialTimeout is used to specify the maximum time to wait for connecting to external peers and orderer nodes. DialTimeout time.Duration } @@ -25,6 +27,7 @@ type Options struct { var defaultOptions = Options{ Enabled: true, EndorsementTimeout: 10 * time.Second, + BroadcastTimeout: 10 * time.Second, DialTimeout: 30 * time.Second, } @@ -37,6 +40,9 @@ func GetOptions(v *viper.Viper) Options { if v.IsSet("peer.gateway.endorsementTimeout") { options.EndorsementTimeout = v.GetDuration("peer.gateway.endorsementTimeout") } + if v.IsSet("peer.gateway.broadcastTimeout") { + options.BroadcastTimeout = v.GetDuration("peer.gateway.broadcastTimeout") + } if v.IsSet("peer.gateway.dialTimeout") { options.DialTimeout = v.GetDuration("peer.gateway.dialTimeout") } diff --git a/internal/pkg/gateway/config/options_test.go b/internal/pkg/gateway/config/options_test.go index 375f10d866e..576f67e0e07 100644 --- a/internal/pkg/gateway/config/options_test.go +++ b/internal/pkg/gateway/config/options_test.go @@ -20,6 +20,7 @@ peer: gateway: enabled: true endorsementTimeout: 30s + broadcastTimeout: 20s dialTimeout: 2m `) @@ -44,6 +45,7 @@ func TestOverriddenOptions(t *testing.T) { expectedOptions := Options{ Enabled: true, EndorsementTimeout: 30 * time.Second, + BroadcastTimeout: 20 * time.Second, DialTimeout: 2 * time.Minute, } require.Equal(t, expectedOptions, options) @@ -58,6 +60,7 @@ func TestDisabledGatewayOption(t *testing.T) { expectedOptions := Options{ Enabled: false, EndorsementTimeout: 10 * time.Second, + BroadcastTimeout: 10 * time.Second, DialTimeout: 30 * time.Second, } require.Equal(t, expectedOptions, options) diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 475df1218b9..8ad1d1f7195 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -53,6 +53,9 @@ peer: # endorsementTimeout is the duration the gateway waits for a response # from other endorsing peers before returning a timeout error to the client. endorsementTimeout: 30s + # broadcastTimeout is the duration the gateway waits for a response + # from ordering nodes before returning a timeout error to the client. + broadcastTimeout: 30s # dialTimeout is the duration the gateway waits for a connection # to other network nodes. dialTimeout: 2m