Skip to content

Commit

Permalink
BroadcastTimeout for Gateway
Browse files Browse the repository at this point in the history
The Gateway has retry logic for failing ordering nodes, however if an ordering node doesn’t respond, then the Submit call will timeout without allowing other ordering nodes to be tried.
This commit implements a BroadcastTimeout that allows individual OSN’s to timeout before the overall Submit timeout expires.
Similar logic already exists for calls to endorser peers.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
(cherry picked from commit 867fbed)
  • Loading branch information
andrew-coleman authored and denyeart committed Jan 25, 2023
1 parent 3fd9891 commit 218d1e9
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/peer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func TestGlobalConfig(t *testing.T) {
GatewayOptions: config.Options{
Enabled: true,
EndorsementTimeout: 10 * time.Second,
BroadcastTimeout: 10 * time.Second,
DialTimeout: 60 * time.Second,
},
}
Expand Down
23 changes: 21 additions & 2 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (gs *Server) planFromFirstEndorser(ctx context.Context, channel string, cha
}()
select {
case <-done:
// Endorser completedLayout normally
// Endorser completed normally
case <-ctx.Done():
// Overall endorsement timeout expired
logger.Warn("Endorse call timed out while collecting first endorsement")
Expand Down Expand Up @@ -438,7 +438,26 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
for _, index := range rand.Perm(len(orderers)) {
orderer := orderers[index]
logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress)
response, err := gs.broadcast(ctx, orderer, txn)

var response *ab.BroadcastResponse
var err error
done := make(chan struct{})
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(ctx, gs.options.BroadcastTimeout)
defer cancel()

response, err = gs.broadcast(ctx, orderer, txn)
}()
select {
case <-done:
// Broadcast completed normally
case <-ctx.Done():
// Overall submit timeout expired
logger.Warn("Submit call timed out while broadcasting to ordering service")
return nil, newRpcError(codes.DeadlineExceeded, "submit timeout expired while broadcasting to ordering service")
}

if err != nil {
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error()))
logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "err", err)
Expand Down
111 changes: 111 additions & 0 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ const (
testChannel = "test_channel"
testChaincode = "test_chaincode"
endorsementTimeout = -1 * time.Second
broadcastTimeout = 100 * time.Millisecond
)

type testDef struct {
Expand Down Expand Up @@ -1306,6 +1307,115 @@ func TestSubmit(t *testing.T) {
}
},
},
{
name: "orderer timeout - retry",
plan: endorsementPlan{
"g1": {{endorser: localhostMock}},
},
config: &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
{Host: "orderer1", Port: 7050},
{Host: "orderer2", Port: 7050},
{Host: "orderer3", Port: 7050},
},
},
},
},
postSetup: func(t *testing.T, def *preparedTest) {
def.ctx, def.cancel = context.WithTimeout(def.ctx, 300*time.Millisecond)
broadcastTime := 200 * time.Millisecond // first invocation exceeds BroadcastTimeout

abc := &mocks.ABClient{}
abbc := &mocks.ABBClient{}
abbc.SendReturns(nil)
abbc.RecvReturns(&ab.BroadcastResponse{
Info: "success",
Status: cp.Status(200),
}, nil)
abc.BroadcastStub = func(ctx context.Context, co ...grpc.CallOption) (ab.AtomicBroadcast_BroadcastClient, error) {
defer func() {
broadcastTime = time.Millisecond // subsequent invocations will not timeout
}()
select {
case <-time.After(broadcastTime):
return abbc, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
def.server.registry.endpointFactory = &endpointFactory{
timeout: 5 * time.Second,
connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient {
return &mocks.EndorserClient{}
},
connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
return abc
},
dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, nil
},
}
},
postTest: func(t *testing.T, def *preparedTest) {
def.cancel()
},
},
{
name: "submit timeout",
plan: endorsementPlan{
"g1": {{endorser: localhostMock}},
},
config: &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
{Host: "orderer1", Port: 7050},
{Host: "orderer2", Port: 7050},
{Host: "orderer3", Port: 7050},
},
},
},
},
postSetup: func(t *testing.T, def *preparedTest) {
def.ctx, def.cancel = context.WithTimeout(def.ctx, 50*time.Millisecond)
broadcastTime := 200 * time.Millisecond // invocation exceeds BroadcastTimeout

abc := &mocks.ABClient{}
abbc := &mocks.ABBClient{}
abbc.SendReturns(nil)
abbc.RecvReturns(&ab.BroadcastResponse{
Info: "success",
Status: cp.Status(200),
}, nil)
abc.BroadcastStub = func(ctx context.Context, co ...grpc.CallOption) (ab.AtomicBroadcast_BroadcastClient, error) {
select {
case <-time.After(broadcastTime):
return abbc, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
def.server.registry.endpointFactory = &endpointFactory{
timeout: 5 * time.Second,
connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient {
return &mocks.EndorserClient{}
},
connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
return abc
},
dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, nil
},
}
},
postTest: func(t *testing.T, def *preparedTest) {
def.cancel()
},
errCode: codes.DeadlineExceeded,
errString: "submit timeout expired while broadcasting to ordering service",
},
{
name: "multiple orderers all fail",
plan: endorsementPlan{
Expand Down Expand Up @@ -2284,6 +2394,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
options := config.Options{
Enabled: true,
EndorsementTimeout: endorsementTimeout,
BroadcastTimeout: broadcastTimeout,
}

member := gdiscovery.NetworkMember{
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/gateway/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ type Options struct {
Enabled bool
// EndorsementTimeout is used to specify the maximum time to wait for endorsement responses from external peers.
EndorsementTimeout time.Duration
// BroadcastTimeout is used to specify the maximum time to wait for responses from ordering nodes.
BroadcastTimeout time.Duration
// DialTimeout is used to specify the maximum time to wait for connecting to external peers and orderer nodes.
DialTimeout time.Duration
}

var defaultOptions = Options{
Enabled: true,
EndorsementTimeout: 10 * time.Second,
BroadcastTimeout: 10 * time.Second,
DialTimeout: 30 * time.Second,
}

Expand All @@ -37,6 +40,9 @@ func GetOptions(v *viper.Viper) Options {
if v.IsSet("peer.gateway.endorsementTimeout") {
options.EndorsementTimeout = v.GetDuration("peer.gateway.endorsementTimeout")
}
if v.IsSet("peer.gateway.broadcastTimeout") {
options.BroadcastTimeout = v.GetDuration("peer.gateway.broadcastTimeout")
}
if v.IsSet("peer.gateway.dialTimeout") {
options.DialTimeout = v.GetDuration("peer.gateway.dialTimeout")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/gateway/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ peer:
gateway:
enabled: true
endorsementTimeout: 30s
broadcastTimeout: 20s
dialTimeout: 2m
`)

Expand All @@ -44,6 +45,7 @@ func TestOverriddenOptions(t *testing.T) {
expectedOptions := Options{
Enabled: true,
EndorsementTimeout: 30 * time.Second,
BroadcastTimeout: 20 * time.Second,
DialTimeout: 2 * time.Minute,
}
require.Equal(t, expectedOptions, options)
Expand All @@ -58,6 +60,7 @@ func TestDisabledGatewayOption(t *testing.T) {
expectedOptions := Options{
Enabled: false,
EndorsementTimeout: 10 * time.Second,
BroadcastTimeout: 10 * time.Second,
DialTimeout: 30 * time.Second,
}
require.Equal(t, expectedOptions, options)
Expand Down
3 changes: 3 additions & 0 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ peer:
# endorsementTimeout is the duration the gateway waits for a response
# from other endorsing peers before returning a timeout error to the client.
endorsementTimeout: 30s
# broadcastTimeout is the duration the gateway waits for a response
# from ordering nodes before returning a timeout error to the client.
broadcastTimeout: 30s
# dialTimeout is the duration the gateway waits for a connection
# to other network nodes.
dialTimeout: 2m
Expand Down

0 comments on commit 218d1e9

Please sign in to comment.