Skip to content

Commit

Permalink
Follower: check if join block different from fetched block
Browse files Browse the repository at this point in the history
- before fetched block is committed
- don't panic
- continue to retry

Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: I7089df4f1bfc200a5f17582df0da7d4172f8f09b
  • Loading branch information
tock-ibm committed Jun 6, 2023
1 parent 9e6efe9 commit eebdca3
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 14 deletions.
27 changes: 18 additions & 9 deletions orderer/common/follower/follower_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/golang/protobuf/proto"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/flogging"
Expand Down Expand Up @@ -297,7 +296,7 @@ func (c *Chain) run() {
}()

if err := c.pull(); err != nil {
c.logger.Warnf("Pull failed, error: %s", err)
c.logger.Warnf("Pull failed, follower chain stopped, error: %s", err)
// TODO set the status to StatusError (see FAB-18106)
}
}
Expand Down Expand Up @@ -347,10 +346,6 @@ func (c *Chain) pull() error {
c.logger.Info("Onboarding finished successfully, pulled blocks up to join-block")
}

if c.joinBlock != nil && !proto.Equal(c.ledgerResources.Block(c.joinBlock.Header.Number).Data, c.joinBlock.Data) {
c.logger.Panicf("Join block (%d) we pulled mismatches block we joined with", c.joinBlock.Header.Number)
}

err = c.pullAfterJoin()
if err != nil {
return errors.WithMessage(err, "failed to pull after join block")
Expand Down Expand Up @@ -479,8 +474,9 @@ func (c *Chain) pullUntilLatestWithRetry(latestNetworkHeight uint64, updateEndpo
break
}

c.logger.Debugf("Error while trying to pull to latest height: %d; going to try again in %v",
latestNetworkHeight, retryInterval)
c.logger.Debugf("Error while trying to pull to latest height: %d; going to try again in %v; error: %s",
latestNetworkHeight, retryInterval, errPull)

select {
case <-c.stopChan:
c.logger.Debug("Received a stop signal")
Expand Down Expand Up @@ -527,11 +523,24 @@ func (c *Chain) pullUntilTarget(targetHeight uint64, updateEndpoints bool) (uint
if nextBlock == nil {
return n, errors.WithMessagef(cluster.ErrRetryCountExhausted, "failed to pull block %d", seq)
}

reportedPrevHash := nextBlock.Header.PreviousHash
if (nextBlock.Header.Number > 0) && !bytes.Equal(reportedPrevHash, actualPrevHash) {
return n, errors.Errorf("block header mismatch on sequence %d, expected %x, got %x",
return n, errors.Errorf("block header previous hash mismatch on sequence %d, expected %x, got %x",
nextBlock.Header.Number, actualPrevHash, reportedPrevHash)
}

if c.joinBlock != nil && c.joinBlock.Header.Number == nextBlock.Header.Number {
if !proto.Equal(nextBlock.Header, c.joinBlock.Header) {
c.logger.Errorf("Block header mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
return n, errors.Errorf("block header mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
}
if !proto.Equal(nextBlock.Data, c.joinBlock.Data) {
c.logger.Errorf("Block data mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
return n, errors.Errorf("block data mismatch between the block we pulled and the block we joined with, sequence %d", c.joinBlock.Header.Number)
}
}

actualPrevHash = protoutil.BlockHeaderHash(nextBlock.Header)
if err := c.ledgerResources.Append(nextBlock); err != nil {
return n, errors.WithMessagef(err, "failed to append block %d to the ledger", nextBlock.Header.Number)
Expand Down
88 changes: 83 additions & 5 deletions orderer/common/follower/follower_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ func TestFollowerNewChain(t *testing.T) {

func TestFollowerPullUpToJoin(t *testing.T) {
joinNum := uint64(10)
joinBlockAppRaft := makeConfigBlock(joinNum, []byte{}, 1)
require.NotNil(t, joinBlockAppRaft)
var joinBlockAppRaft *common.Block

var wgChain sync.WaitGroup

setup := func() {
globalSetup(t)
remoteBlockchain.fill(joinNum)
remoteBlockchain.appendConfig(1)
joinBlockAppRaft = remoteBlockchain.Block(joinNum)

ledgerResources.AppendCalls(localBlockchain.Append)

Expand Down Expand Up @@ -341,6 +341,76 @@ func TestFollowerPullUpToJoin(t *testing.T) {
require.Equal(t, 50, timeAfterCount.AfterCallCount())
require.Equal(t, int64(5000), atomic.LoadInt64(&maxDelay))
})
t.Run("join block header mismatch", func(t *testing.T) {
setup()
mockClusterConsenter.IsChannelMemberCalls(amIReallyInChannel)

joinBlockAppRaftBad := proto.Clone(joinBlockAppRaft).(*common.Block)
joinBlockAppRaftBad.Header.DataHash = []byte("bogus")

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, joinBlockAppRaftBad, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
require.NoError(t, err)

consensusRelation, status := chain.StatusReport()
require.Equal(t, types.ConsensusRelationConsenter, consensusRelation)
require.Equal(t, types.StatusOnBoarding, status)

require.NotPanics(t, chain.Start)
require.Eventually(t,
func() bool {
return puller.PullBlockCallCount() > int(joinNum*3) // it will retry forever unless we stop it
},
10*time.Second, time.Millisecond)

require.NotPanics(t, chain.Halt)
require.False(t, chain.IsRunning())

consensusRelation, status = chain.StatusReport()
require.Equal(t, types.ConsensusRelationConsenter, consensusRelation)
require.Equal(t, types.StatusOnBoarding, status)

require.Equal(t, 10, ledgerResources.AppendCallCount())
require.Equal(t, uint64(10), ledgerResources.Height())
for i := uint64(0); i < joinNum; i++ {
require.Equal(t, remoteBlockchain.Block(i).Header, localBlockchain.Block(i).Header, "failed block i=%d", i)
}
require.Equal(t, 0, mockChainCreator.SwitchFollowerToChainCallCount())
})
t.Run("join block data mismatch", func(t *testing.T) {
setup()
mockClusterConsenter.IsChannelMemberCalls(amIReallyInChannel)

joinBlockAppRaftBad := proto.Clone(joinBlockAppRaft).(*common.Block)
joinBlockAppRaftBad.Data.Data = append(joinBlockAppRaftBad.Data.Data, []byte("bogus"))

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, joinBlockAppRaftBad, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
require.NoError(t, err)

consensusRelation, status := chain.StatusReport()
require.Equal(t, types.ConsensusRelationConsenter, consensusRelation)
require.Equal(t, types.StatusOnBoarding, status)

require.NotPanics(t, chain.Start)
require.Eventually(t,
func() bool {
return puller.PullBlockCallCount() > int(joinNum*3) // it will retry forever unless we stop it
},
10*time.Second, time.Millisecond)

require.NotPanics(t, chain.Halt)
require.False(t, chain.IsRunning())

consensusRelation, status = chain.StatusReport()
require.Equal(t, types.ConsensusRelationConsenter, consensusRelation)
require.Equal(t, types.StatusOnBoarding, status)

require.Equal(t, 10, ledgerResources.AppendCallCount())
require.Equal(t, uint64(10), ledgerResources.Height())
for i := uint64(0); i < joinNum; i++ {
require.Equal(t, remoteBlockchain.Block(i).Header, localBlockchain.Block(i).Header, "failed block i=%d", i)
}
require.Equal(t, 0, mockChainCreator.SwitchFollowerToChainCallCount())
})
}

func TestFollowerPullAfterJoin(t *testing.T) {
Expand Down Expand Up @@ -739,8 +809,10 @@ func TestFollowerPullPastJoin(t *testing.T) {
})
t.Run("Configs in the middle, latest height increasing", func(t *testing.T) {
setup()
localBlockchain.fill(5)
localBlockchain.appendConfig(0)
for i := uint64(0); i < 6; i++ {
localBlockchain.Append(remoteBlockchain.Block(i))
}

ledgerResources.AppendCalls(func(block *common.Block) error {
_ = localBlockchain.Append(block)

Expand Down Expand Up @@ -908,8 +980,11 @@ func (mbc *memoryBlockChain) fill(numBlocks uint64) {
var block *common.Block
if i == 0 {
block = makeConfigBlock(i, prevHash, 0)
block.Header.DataHash = protoutil.BlockDataHash(block.Data)
} else {
block = protoutil.NewBlock(i, prevHash)
block.Data.Data = [][]byte{{uint8(i)}, {uint8(i)}}
block.Header.DataHash = protoutil.BlockDataHash(block.Data)
protoutil.CopyBlockMetadata(mbc.chain[i-1], block)
}

Expand All @@ -923,6 +998,7 @@ func (mbc *memoryBlockChain) appendConfig(isMember uint8) {

h := uint64(len(mbc.chain))
configBlock := makeConfigBlock(h, protoutil.BlockHeaderHash(mbc.chain[h-1].Header), isMember)
configBlock.Header.DataHash = protoutil.BlockDataHash(configBlock.Data)
mbc.chain = append(mbc.chain, configBlock)
}

Expand All @@ -949,14 +1025,16 @@ func makeConfigBlock(num uint64, prevHash []byte, isMember uint8) *common.Block
env := &common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Header: protoutil.MakePayloadHeader(
protoutil.MakeChannelHeader(common.HeaderType_CONFIG, 0, "my-chennel", 0),
protoutil.MakeChannelHeader(common.HeaderType_CONFIG, 0, "my-channel", 0),
protoutil.MakeSignatureHeader([]byte{}, []byte{}),
),
Data: []byte{isMember},
},
),
}
block.Data.Data = append(block.Data.Data, protoutil.MarshalOrPanic(env))
block.Header.DataHash = protoutil.BlockDataHash(block.Data)

protoutil.InitBlockMetadata(block)
obm := &common.OrdererBlockMetadata{LastConfig: &common.LastConfig{Index: num}}
block.Metadata.Metadata[common.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(
Expand Down

0 comments on commit eebdca3

Please sign in to comment.