Skip to content

Commit

Permalink
find and fix error (#4596)
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <pfi79@mail.ru>
  • Loading branch information
pfi79 authored Jan 31, 2024
1 parent 00c58dc commit 17e0a83
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 17 deletions.
22 changes: 20 additions & 2 deletions orderer/common/follower/follower_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package follower
import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -60,7 +61,7 @@ type BlockPullerFactory interface {
// ChainCreator defines a function that creates a new consensus.Chain for this channel, to replace the current
// follower.Chain. This interface is meant to be implemented by the multichannel.Registrar.
type ChainCreator interface {
SwitchFollowerToChain(chainName string)
SwitchFollowerToChain(chainName string) bool
}

//go:generate counterfeiter -o mocks/channel_participation_metrics_reporter.go -fake-name ChannelParticipationMetricsReporter . ChannelParticipationMetricsReporter
Expand Down Expand Up @@ -100,6 +101,7 @@ type Chain struct {
doneChan chan struct{} // The go-routine signals the 'closer' that it is done by closing this channel.
consensusRelation types.ConsensusRelation
status types.Status
removingExternal atomic.Int32 // External channel removal once.

ledgerResources LedgerResources // ledger & config resources
clusterConsenter consensus.ClusterConsenter // detects whether a block indicates channel membership
Expand Down Expand Up @@ -224,6 +226,7 @@ func (c *Chain) Start() {

// Halt signals the Chain to stop and waits for the internal go-routine to exit.
func (c *Chain) Halt() {
c.removingExternal.Store(1)
c.halt()
<-c.doneChan
}
Expand Down Expand Up @@ -354,7 +357,22 @@ func (c *Chain) pull() error {
// Trigger creation of a new consensus.Chain.
c.logger.Info("Block pulling finished successfully, going to switch from follower to a consensus.Chain")
c.halt()
c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID())

ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()

// SwitchFollowerToChain is trying to take over the mutex.
// If it fails, false is returned.
// Perhaps at this moment the command "remove channel" is being executed,
// let's check it `c.removingExternal.Load() == 1`. If so, leave the loop.
// If SwitchFollowerToChain returns true, we will also leave the loop.
// In other cases, we need to repeat the loop.
for range ticker.C {
if isExec := c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID()); isExec ||
c.removingExternal.Load() == 1 {
break
}
}

return nil
}
Expand Down
27 changes: 21 additions & 6 deletions orderer/common/follower/follower_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ func TestFollowerPullUpToJoin(t *testing.T) {

wgChain = sync.WaitGroup{}
wgChain.Add(1)
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() })
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
})

wgChain = sync.WaitGroup{}
wgChain.Add(1)
Expand Down Expand Up @@ -559,7 +562,10 @@ func TestFollowerPullAfterJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, joinNum+1, localBlockchain.Height())

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, nil, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
Expand Down Expand Up @@ -604,7 +610,10 @@ func TestFollowerPullAfterJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, joinNum+1, localBlockchain.Height())

failPull := 10
Expand Down Expand Up @@ -794,7 +803,10 @@ func TestFollowerPullPastJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, uint64(6), localBlockchain.Height())

chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, joinBlockAppRaft, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter)
Expand Down Expand Up @@ -839,7 +851,10 @@ func TestFollowerPullPastJoin(t *testing.T) {
}
return nil
})
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created
mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool {
wgChain.Done()
return true
}) // Stop when a new chain is created
require.Equal(t, uint64(0), localBlockchain.Height())

failPull := 10
Expand Down Expand Up @@ -935,7 +950,7 @@ func (mbc *memoryBlockChain) fill(numBlocks uint64) {
defer mbc.lock.Unlock()

height := uint64(len(mbc.chain))
prevHash := []byte{}
var prevHash []byte

for i := height; i < height+numBlocks; i++ {
if i > 0 {
Expand Down
46 changes: 40 additions & 6 deletions orderer/common/follower/mocks/chain_creator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,10 @@ func (r *Registrar) createNewChain(configtx *cb.Envelope) *ChainSupport {
// SwitchFollowerToChain creates a consensus.Chain from the tip of the ledger, and removes the follower.
// It is called when a follower detects a config block that indicates cluster membership and halts, transferring
// execution to the consensus.Chain.
func (r *Registrar) SwitchFollowerToChain(channelID string) {
r.lock.Lock()
func (r *Registrar) SwitchFollowerToChain(channelID string) bool {
if !r.lock.TryLock() {
return false
}
defer r.lock.Unlock()

lf, err := r.ledgerFactory.GetOrCreate(channelID)
Expand All @@ -504,11 +506,13 @@ func (r *Registrar) SwitchFollowerToChain(channelID string) {
delete(r.followers, channelID)
logger.Debugf("Removed follower for channel %s", channelID)
cs := r.createNewChain(configTx(lf))
if err := r.removeJoinBlock(channelID); err != nil {
if err = r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed removing join-block for channel: %s: %v", channelID, err)
}
cs.start()
logger.Infof("Created and started channel %s", cs.ChannelID())

return true
}

// SwitchChainToFollower creates a follower.Chain from the tip of the ledger and removes the consensus.Chain.
Expand Down

0 comments on commit 17e0a83

Please sign in to comment.