Skip to content

Commit

Permalink
Check if inner consensus message is missing
Browse files Browse the repository at this point in the history
Change-Id: I06b466e6ff6d43f2b9804dd21185241716356050
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm authored and C0rWin committed Jun 21, 2022
1 parent e7dc57d commit 80bcc18
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
61 changes: 59 additions & 2 deletions orderer/common/cluster/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -128,6 +129,11 @@ func (*mockChannelExtractor) TargetChannel(msg proto.Message) string {
}
}

type clusterServer interface {
// Step passes an implementation-specific message to another cluster member.
Step(server orderer.Cluster_StepServer) error
}

type clusterNode struct {
lock sync.Mutex
frozen bool
Expand All @@ -140,6 +146,7 @@ type clusterNode struct {
clientConfig comm_utils.ClientConfig
serverConfig comm_utils.ServerConfig
c *cluster.Comm
dispatcher clusterServer
}

func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
Expand Down Expand Up @@ -187,7 +194,7 @@ func (cn *clusterNode) resurrect() {
panic(fmt.Errorf("failed starting gRPC server: %v", err))
}
cn.srv = gRPCServer
orderer.RegisterClusterServer(gRPCServer.Server(), cn)
orderer.RegisterClusterServer(gRPCServer.Server(), cn.dispatcher)
go cn.srv.Start()
}

Expand Down Expand Up @@ -265,6 +272,10 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
srv: gRPCServer,
}

if tstSrv.dispatcher == nil {
tstSrv.dispatcher = tstSrv
}

tstSrv.freezeCond.L = &tstSrv.lock

compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool {
Expand All @@ -283,7 +294,7 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
CompareCertificate: compareCert,
}

orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv)
orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv.dispatcher)
go gRPCServer.Start()
return tstSrv
}
Expand Down Expand Up @@ -489,6 +500,52 @@ func TestBlockingSend(t *testing.T) {
}
}

func TestEmptyRequest(t *testing.T) {
// Scenario: Ensures empty messages are discarded and an error is returned
// back to the sender.

node1 := newTestNode(t)
node2 := newTestNode(t)

node2.srv.Stop()
svc := &cluster.Service{
StepLogger: flogging.MustGetLogger("test"),
Logger: flogging.MustGetLogger("test"),
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Dispatcher: node2.c,
}
node2.dispatcher = svc

// Sleep to let the gRPC service be closed
time.Sleep(time.Second)

// Resurrect the node with the new dispatcher
node2.resurrect()

defer node1.stop()
defer node2.stop()

config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)

assertBiDiCommunication(t, node1, node2, testReq)

rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)

stream, err := rm.NewStream(time.Second * 10)
require.NoError(t, err)

err = stream.Send(&orderer.StepRequest{})
require.NoError(t, err)

_, err = stream.Recv()
require.Error(t, err, "message is neither a Submit nor a Consensus request")
}

func TestBasic(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes and sends each other
// messages that are expected to be echoed back
Expand Down
6 changes: 4 additions & 2 deletions orderer/common/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -91,10 +92,11 @@ func (s *Service) handleMessage(stream StepStream, addr string, exp *certificate
nodeName := commonNameFromContext(stream.Context())
s.Logger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request))
return s.handleSubmit(submitReq, stream, addr)
} else if consensusReq := request.GetConsensusRequest(); consensusReq != nil {
return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest())
}

// Else, it's a consensus message.
return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest())
return errors.Errorf("message is neither a Submit nor a Consensus request")
}

func (s *Service) handleSubmit(request *orderer.SubmitRequest, stream StepStream, addr string) error {
Expand Down

0 comments on commit 80bcc18

Please sign in to comment.