Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

found and fixed a bug in raft integration tests #4596

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions orderer/common/follower/follower_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package follower
import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -60,7 +61,7 @@ type BlockPullerFactory interface {
// ChainCreator defines a function that creates a new consensus.Chain for this channel, to replace the current
// follower.Chain. This interface is meant to be implemented by the multichannel.Registrar.
type ChainCreator interface {
SwitchFollowerToChain(chainName string)
SwitchFollowerToChain(chainName string) bool
}

//go:generate counterfeiter -o mocks/channel_participation_metrics_reporter.go -fake-name ChannelParticipationMetricsReporter . ChannelParticipationMetricsReporter
Expand Down Expand Up @@ -100,6 +101,7 @@ type Chain struct {
doneChan chan struct{} // The go-routine signals the 'closer' that it is done by closing this channel.
consensusRelation types.ConsensusRelation
status types.Status
removingExternal atomic.Int32 // External channel removal once.

ledgerResources LedgerResources // ledger & config resources
clusterConsenter consensus.ClusterConsenter // detects whether a block indicates channel membership
Expand Down Expand Up @@ -224,6 +226,7 @@ func (c *Chain) Start() {

// Halt signals the Chain to stop and waits for the internal go-routine to exit.
func (c *Chain) Halt() {
c.removingExternal.Store(1)
c.halt()
<-c.doneChan
}
Expand Down Expand Up @@ -354,7 +357,22 @@ func (c *Chain) pull() error {
// Trigger creation of a new consensus.Chain.
c.logger.Info("Block pulling finished successfully, going to switch from follower to a consensus.Chain")
c.halt()
c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID())

ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()

// SwitchFollowerToChain is trying to take over the mutex.
// If it fails, false is returned.
// Perhaps at this moment the command "remove channel" is being executed,
// let's check it `c.removingExternal.Load() == 1`. If so, leave the loop.
// If SwitchFollowerToChain returns true, we will also leave the loop.
// In other cases, we need to repeat the loop.
for range ticker.C {
if isExec := c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID()); isExec ||
c.removingExternal.Load() == 1 {
break
}
}

return nil
}
Expand Down
27 changes: 21 additions & 6 deletions orderer/common/follower/follower_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ func TestFollowerPullUpToJoin(t *testing.T) {

wgChain = sync.WaitGroup{}
wgChain.Add(1)
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() })
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
})

wgChain = sync.WaitGroup{}
wgChain.Add(1)
Expand Down Expand Up @@ -559,7 +562,10 @@ func TestFollowerPullAfterJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, joinNum+1, localBlockchain.Height())

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, nil, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
Expand Down Expand Up @@ -604,7 +610,10 @@ func TestFollowerPullAfterJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, joinNum+1, localBlockchain.Height())

failPull := 10
Expand Down Expand Up @@ -794,7 +803,10 @@ func TestFollowerPullPastJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, uint64(6), localBlockchain.Height())

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, joinBlockAppRaft, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
Expand Down Expand Up @@ -839,7 +851,10 @@ func TestFollowerPullPastJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, uint64(0), localBlockchain.Height())

failPull := 10
Expand Down Expand Up @@ -935,7 +950,7 @@ func (mbc *memoryBlockChain) fill(numBlocks uint64) {
defer mbc.lock.Unlock()

height := uint64(len(mbc.chain))
prevHash := []byte{}
var prevHash []byte

for i := height; i < height+numBlocks; i++ {
if i > 0 {
Expand Down
46 changes: 40 additions & 6 deletions orderer/common/follower/mocks/chain_creator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,10 @@ func (r *Registrar) createNewChain(configtx *cb.Envelope) *ChainSupport {
// SwitchFollowerToChain creates a consensus.Chain from the tip of the ledger, and removes the follower.
// It is called when a follower detects a config block that indicates cluster membership and halts, transferring
// execution to the consensus.Chain.
func (r *Registrar) SwitchFollowerToChain(channelID string) {
r.lock.Lock()
func (r *Registrar) SwitchFollowerToChain(channelID string) bool {
if !r.lock.TryLock() {
return false
}
defer r.lock.Unlock()

lf, err := r.ledgerFactory.GetOrCreate(channelID)
Expand All @@ -504,11 +506,13 @@ func (r *Registrar) SwitchFollowerToChain(channelID string) {
delete(r.followers, channelID)
logger.Debugf("Removed follower for channel %s", channelID)
cs := r.createNewChain(configTx(lf))
if err := r.removeJoinBlock(channelID); err != nil {
if err = r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed removing join-block for channel: %s: %v", channelID, err)
}
cs.start()
logger.Infof("Created and started channel %s", cs.ChannelID())

return true
}

// SwitchChainToFollower creates a follower.Chain from the tip of the ledger and removes the consensus.Chain.
Expand Down