From b6822cb015bf94e3d686d2816565f075e0205791 Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Thu, 18 Feb 2021 18:40:01 +0200 Subject: [PATCH] Report correct reason of stream abort in orderer cluster This commit fixes a bug that makes the cluster communication infrastructure always report an "aborted" reason after a stream terminates. The reason for the bug is that the serviceStream() method was always overriding the real reason the stream was terminated, with the same "aborted" reason. Moving the stream termination reason to reside inside the sync.Once block along with the rest of the termination logic solved this bug. Change-Id: I7060a3c5630c6d28c73f025de8b85061077638d6 Signed-off-by: Yacov Manevich (cherry picked from commit f0584c610a7b4308bbb2bf1746101d6f2d08e93e) --- orderer/common/cluster/comm.go | 28 ++++++++--------- orderer/common/cluster/comm_test.go | 49 +++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/orderer/common/cluster/comm.go b/orderer/common/cluster/comm.go index d92e8f7c804..57d4fed303d 100644 --- a/orderer/common/cluster/comm.go +++ b/orderer/common/cluster/comm.go @@ -557,10 +557,9 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) { func (stream *Stream) serviceStream() { streamStartTime := time.Now() defer func() { - stream.Logger.Debugf("Stream %d to (%s) terminating at total lifetime of %s", - stream.ID, stream.Endpoint, time.Since(streamStartTime)) - stream.Cancel(errAborted) + stream.Logger.Debugf("Stream %d to (%s) terminated with total lifetime of %s", + stream.ID, stream.Endpoint, time.Since(streamStartTime)) }() for { @@ -666,21 +665,20 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) { var canceled uint32 abortChan := make(chan struct{}) - - abort := func() { - cancel() - rc.streamsByID.Delete(streamID) - rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size)) - rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint) - atomic.StoreUint32(&canceled, 1) - close(abortChan) - } + abortReason := &atomic.Value{} once := &sync.Once{} - abortReason := &atomic.Value{} + cancelWithReason := func(err error) { - abortReason.Store(err.Error()) - once.Do(abort) + once.Do(func() { + abortReason.Store(err.Error()) + cancel() + rc.streamsByID.Delete(streamID) + rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size)) + rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint) + atomic.StoreUint32(&canceled, 1) + close(abortChan) + }) } logger := flogging.MustGetLogger("orderer.common.cluster.step") diff --git a/orderer/common/cluster/comm_test.go b/orderer/common/cluster/comm_test.go index a0ff62d213f..ec653301b8b 100644 --- a/orderer/common/cluster/comm_test.go +++ b/orderer/common/cluster/comm_test.go @@ -533,6 +533,55 @@ func TestUnavailableHosts(t *testing.T) { assert.Contains(t, err.Error(), "connection") } +func TestStreamAbortReportCorrectError(t *testing.T) { + // Scenario: node 1 acquires a stream to node 2 and then the stream + // encounters an error and as a result, the stream is aborted. + // We ensure the error reported is the first error, even after + // multiple attempts of using it. + + node1 := newTestNode(t) + defer node1.stop() + + node2 := newTestNode(t) + defer node2.stop() + + node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo}) + node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo}) + + node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(errors.Errorf("whoops")).Once() + + rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID) + assert.NoError(t, err) + var streamTerminated sync.WaitGroup + streamTerminated.Add(1) + + stream := assertEventualEstablishStream(t, rm1) + + l, err := zap.NewDevelopment() + assert.NoError(t, err) + stream.Logger = flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error { + if strings.Contains(entry.Message, "Stream 1 to") && strings.Contains(entry.Message, "terminated") { + streamTerminated.Done() + } + return nil + })) + + // Probe the stream for the first time + err = stream.Send(wrapSubmitReq(testReq)) + assert.NoError(t, err) + + // We should receive back the crafted error + _, err = stream.Recv() + assert.Contains(t, err.Error(), "whoops") + + // Wait for the stream to be terminated from within the communication infrastructure + streamTerminated.Wait() + + // We should still receive the original crafted error despite the stream being terminated + err = stream.Send(wrapSubmitReq(testReq)) + assert.Contains(t, err.Error(), "whoops") +} + func TestStreamAbort(t *testing.T) { t.Parallel()