From 0f18359493bcbd5f9f9d1a9b05adabfe5da23b06 Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Mon, 20 Jun 2022 21:09:26 +0300 Subject: [PATCH] Check if inner consensus message is missing Change-Id: I06b466e6ff6d43f2b9804dd21185241716356050 Signed-off-by: Yacov Manevich (cherry picked from commit 6e5e693a2d12aabee4bcf103ab7de3df4ccc49f9) --- orderer/common/cluster/comm_test.go | 60 ++++++++++++++++++++++++++++- orderer/common/cluster/service.go | 6 ++- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/orderer/common/cluster/comm_test.go b/orderer/common/cluster/comm_test.go index fff53799466..3b9b5f0e11b 100644 --- a/orderer/common/cluster/comm_test.go +++ b/orderer/common/cluster/comm_test.go @@ -120,6 +120,11 @@ func (*mockChannelExtractor) TargetChannel(msg proto.Message) string { } } +type clusterServer interface { + // Step passes an implementation-specific message to another cluster member. + Step(server orderer.Cluster_StepServer) error +} + type clusterNode struct { lock sync.Mutex frozen bool @@ -132,6 +137,7 @@ type clusterNode struct { clientConfig comm_utils.ClientConfig serverConfig comm_utils.ServerConfig c *cluster.Comm + dispatcher clusterServer } func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error { @@ -179,7 +185,7 @@ func (cn *clusterNode) resurrect() { panic(fmt.Errorf("failed starting gRPC server: %v", err)) } cn.srv = gRPCServer - orderer.RegisterClusterServer(gRPCServer.Server(), cn) + orderer.RegisterClusterServer(gRPCServer.Server(), cn.dispatcher) go cn.srv.Start() } @@ -257,6 +263,10 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo srv: gRPCServer, } + if tstSrv.dispatcher == nil { + tstSrv.dispatcher = tstSrv + } + tstSrv.freezeCond.L = &tstSrv.lock compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool { @@ -275,7 +285,7 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo CompareCertificate: compareCert, } - orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv) + orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv.dispatcher) go gRPCServer.Start() return tstSrv } @@ -481,6 +491,52 @@ func TestBlockingSend(t *testing.T) { } } +func TestEmptyRequest(t *testing.T) { + // Scenario: Ensures empty messages are discarded and an error is returned + // back to the sender. + + node1 := newTestNode(t) + node2 := newTestNode(t) + + node2.srv.Stop() + svc := &cluster.Service{ + StepLogger: flogging.MustGetLogger("test"), + Logger: flogging.MustGetLogger("test"), + StreamCountReporter: &cluster.StreamCountReporter{ + Metrics: cluster.NewMetrics(&disabled.Provider{}), + }, + Dispatcher: node2.c, + } + node2.dispatcher = svc + + // Sleep to let the gRPC service be closed + time.Sleep(time.Second) + + // Resurrect the node with the new dispatcher + node2.resurrect() + + defer node1.stop() + defer node2.stop() + + config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo} + node1.c.Configure(testChannel, config) + node2.c.Configure(testChannel, config) + + assertBiDiCommunication(t, node1, node2, testReq) + + rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID) + require.NoError(t, err) + + stream, err := rm.NewStream(time.Second * 10) + require.NoError(t, err) + + err = stream.Send(&orderer.StepRequest{}) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err, "message is neither a Submit nor a Consensus request") +} + func TestBasic(t *testing.T) { // Scenario: Basic test that spawns 2 nodes and sends each other // messages that are expected to be echoed back diff --git a/orderer/common/cluster/service.go b/orderer/common/cluster/service.go index 214b9525d28..cc8302028d3 100644 --- a/orderer/common/cluster/service.go +++ b/orderer/common/cluster/service.go @@ -14,6 +14,7 @@ import ( "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/util" + "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -90,10 +91,11 @@ func (s *Service) handleMessage(stream StepStream, addr string, exp *certificate nodeName := commonNameFromContext(stream.Context()) s.Logger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request)) return s.handleSubmit(submitReq, stream, addr) + } else if consensusReq := request.GetConsensusRequest(); consensusReq != nil { + return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest()) } - // Else, it's a consensus message. - return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest()) + return errors.Errorf("message is neither a Submit nor a Consensus request") } func (s *Service) handleSubmit(request *orderer.SubmitRequest, stream StepStream, addr string) error {