From a9b369eb7c4364e01cf19781aee789c1eddab647 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Thu, 18 Jan 2024 10:21:24 +0200 Subject: [PATCH] BFT synchronizer Signed-off-by: Yoav Tock Change-Id: If590c9d4ed727f7c2805142dde36cef3c3f14fb1 --- common/deliver/mock/block_iterator.go | 10 +- common/deliver/mock/block_reader.go | 65 ++++++++ common/ledger/blockledger/fileledger/impl.go | 10 ++ .../blockledger/fileledger/impl_test.go | 14 +- common/ledger/blockledger/ledger.go | 7 +- core/peer/deliverevents_test.go | 9 +- orderer/common/cluster/deliver.go | 2 +- .../common/multichannel/mocks/read_writer.go | 65 ++++++++ orderer/consensus/smartbft/chain.go | 6 +- .../consensus/smartbft/synchronizer_bft.go | 152 ++++++++++++++++++ 10 files changed, 324 insertions(+), 16 deletions(-) create mode 100644 orderer/consensus/smartbft/synchronizer_bft.go diff --git a/common/deliver/mock/block_iterator.go b/common/deliver/mock/block_iterator.go index 57936d996bc..4c6934631c4 100644 --- a/common/deliver/mock/block_iterator.go +++ b/common/deliver/mock/block_iterator.go @@ -32,9 +32,10 @@ func (fake *BlockIterator) Close() { fake.closeMutex.Lock() fake.closeArgsForCall = append(fake.closeArgsForCall, struct { }{}) + stub := fake.CloseStub fake.recordInvocation("Close", []interface{}{}) fake.closeMutex.Unlock() - if fake.CloseStub != nil { + if stub != nil { fake.CloseStub() } } @@ -56,15 +57,16 @@ func (fake *BlockIterator) Next() (*common.Block, common.Status) { ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] fake.nextArgsForCall = append(fake.nextArgsForCall, struct { }{}) + stub := fake.NextStub + fakeReturns := fake.nextReturns fake.recordInvocation("Next", []interface{}{}) fake.nextMutex.Unlock() - if fake.NextStub != nil { - return fake.NextStub() + if stub != nil { + return stub() } if specificReturn { return ret.result1, ret.result2 } - fakeReturns := fake.nextReturns return fakeReturns.result1, fakeReturns.result2 } diff --git a/common/deliver/mock/block_reader.go b/common/deliver/mock/block_reader.go index 5f71afe7061..070ab1a094b 100644 --- a/common/deliver/mock/block_reader.go +++ b/common/deliver/mock/block_reader.go @@ -10,6 +10,16 @@ import ( ) type BlockReader struct { + GetCurrentBlockHashStub func() []byte + getCurrentBlockHashMutex sync.RWMutex + getCurrentBlockHashArgsForCall []struct { + } + getCurrentBlockHashReturns struct { + result1 []byte + } + getCurrentBlockHashReturnsOnCall map[int]struct { + result1 []byte + } HeightStub func() uint64 heightMutex sync.RWMutex heightArgsForCall []struct { @@ -50,6 +60,59 @@ type BlockReader struct { invocationsMutex sync.RWMutex } +func (fake *BlockReader) GetCurrentBlockHash() []byte { + fake.getCurrentBlockHashMutex.Lock() + ret, specificReturn := fake.getCurrentBlockHashReturnsOnCall[len(fake.getCurrentBlockHashArgsForCall)] + fake.getCurrentBlockHashArgsForCall = append(fake.getCurrentBlockHashArgsForCall, struct { + }{}) + stub := fake.GetCurrentBlockHashStub + fakeReturns := fake.getCurrentBlockHashReturns + fake.recordInvocation("GetCurrentBlockHash", []interface{}{}) + fake.getCurrentBlockHashMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *BlockReader) GetCurrentBlockHashCallCount() int { + fake.getCurrentBlockHashMutex.RLock() + defer fake.getCurrentBlockHashMutex.RUnlock() + return len(fake.getCurrentBlockHashArgsForCall) +} + +func (fake *BlockReader) GetCurrentBlockHashCalls(stub func() []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = stub +} + +func (fake *BlockReader) GetCurrentBlockHashReturns(result1 []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = nil + fake.getCurrentBlockHashReturns = struct { + result1 []byte + }{result1} +} + +func (fake *BlockReader) GetCurrentBlockHashReturnsOnCall(i int, result1 []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = nil + if fake.getCurrentBlockHashReturnsOnCall == nil { + fake.getCurrentBlockHashReturnsOnCall = make(map[int]struct { + result1 []byte + }) + } + fake.getCurrentBlockHashReturnsOnCall[i] = struct { + result1 []byte + }{result1} +} + func (fake *BlockReader) Height() uint64 { fake.heightMutex.Lock() ret, specificReturn := fake.heightReturnsOnCall[len(fake.heightArgsForCall)] @@ -234,6 +297,8 @@ func (fake *BlockReader) RetrieveBlockByNumberReturnsOnCall(i int, result1 *comm func (fake *BlockReader) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.getCurrentBlockHashMutex.RLock() + defer fake.getCurrentBlockHashMutex.RUnlock() fake.heightMutex.RLock() defer fake.heightMutex.RUnlock() fake.iteratorMutex.RLock() diff --git a/common/ledger/blockledger/fileledger/impl.go b/common/ledger/blockledger/fileledger/impl.go index 910104461bf..0dbd7531b79 100644 --- a/common/ledger/blockledger/fileledger/impl.go +++ b/common/ledger/blockledger/fileledger/impl.go @@ -110,6 +110,16 @@ func (fl *FileLedger) Height() uint64 { return info.Height } +// GetCurrentBlockHash returns the hash of the current block's header +func (fl *FileLedger) GetCurrentBlockHash() []byte { + info, err := fl.blockStore.GetBlockchainInfo() + if err != nil { + logger.Panic(err) + } + + return info.CurrentBlockHash +} + // Append a new block to the ledger func (fl *FileLedger) Append(block *cb.Block) error { err := fl.blockStore.AddBlock(block) diff --git a/common/ledger/blockledger/fileledger/impl_test.go b/common/ledger/blockledger/fileledger/impl_test.go index 55ddd814cc3..db5cb9a144f 100644 --- a/common/ledger/blockledger/fileledger/impl_test.go +++ b/common/ledger/blockledger/fileledger/impl_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package fileledger import ( + "bytes" "errors" "fmt" "testing" @@ -129,6 +130,10 @@ func TestInitialization(t *testing.T) { block := blockledger.GetBlock(fl, 0) require.NotNil(t, block, "Error retrieving genesis block") require.Equal(t, protoutil.BlockHeaderHash(genesisBlock.Header), protoutil.BlockHeaderHash(block.Header), "Block hashes did no match") + + h := fl.GetCurrentBlockHash() + require.NotNil(t, h, "block hash is nil") + require.True(t, bytes.Equal(h, protoutil.BlockHeaderHash(block.Header)), "block hash mismatch") } func TestReinitialization(t *testing.T) { @@ -175,16 +180,19 @@ func TestReinitialization(t *testing.T) { func TestAddition(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() - info, _ := fl.blockStore.GetBlockchainInfo() - prevHash := info.CurrentBlockHash + + prevHash := fl.GetCurrentBlockHash() envelope := getSampleEnvelopeWithSignatureHeader() b1 := blockledger.CreateNextBlock(fl, []*cb.Envelope{envelope}) fl.Append(b1) require.Equal(t, uint64(2), fl.Height(), "Block height should be 2") block := blockledger.GetBlock(fl, 1) - require.NotNil(t, block, "Error retrieving genesis block") + require.NotNil(t, block, "Error retrieving second block") require.Equal(t, prevHash, block.Header.PreviousHash, "Block hashes did no match") + + secondHash := fl.GetCurrentBlockHash() + require.True(t, bytes.Equal(secondHash, protoutil.BlockHeaderHash(block.Header)), "block hash mismatch") } func TestRetrieval(t *testing.T) { diff --git a/common/ledger/blockledger/ledger.go b/common/ledger/blockledger/ledger.go index 1a734aad374..8fc91cba550 100644 --- a/common/ledger/blockledger/ledger.go +++ b/common/ledger/blockledger/ledger.go @@ -38,13 +38,14 @@ type Iterator interface { // Reader allows the caller to inspect the ledger type Reader interface { - // Iterator returns an Iterator, as specified by an ab.SeekInfo message, and - // its starting block number + // Iterator returns an Iterator, as specified by an ab.SeekInfo message, and its starting block number Iterator(startType *ab.SeekPosition) (Iterator, uint64) // Height returns the number of blocks on the ledger Height() uint64 - // retrieve blockByNumber + // RetrieveBlockByNumber retrieve blockByNumber RetrieveBlockByNumber(blockNumber uint64) (*cb.Block, error) + // GetCurrentBlockHash returns the block header hash of the last block in the ledger. + GetCurrentBlockHash() []byte } // Writer allows the caller to modify the ledger diff --git a/core/peer/deliverevents_test.go b/core/peer/deliverevents_test.go index 4674f96c1dc..3bd985ea034 100644 --- a/core/peer/deliverevents_test.go +++ b/core/peer/deliverevents_test.go @@ -90,9 +90,9 @@ func (m *mockReader) Iterator(startType *orderer.SeekPosition) (blockledger.Iter return args.Get(0).(blockledger.Iterator), args.Get(1).(uint64) } -func (m *mockReader) Height() uint64 { +func (m *mockReader) GetCurrentBlockHash() []byte { args := m.Called() - return args.Get(0).(uint64) + return args.Get(0).([]byte) } func (m *mockReader) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) { @@ -100,6 +100,11 @@ func (m *mockReader) RetrieveBlockByNumber(blockNum uint64) (*common.Block, erro return args.Get(0).(*common.Block), args.Error(1) } +func (m *mockReader) Height() uint64 { + args := m.Called() + return args.Get(0).(uint64) +} + // mockChainSupport type mockChainSupport struct { mock.Mock diff --git a/orderer/common/cluster/deliver.go b/orderer/common/cluster/deliver.go index eee7dce0391..f830b692cf0 100644 --- a/orderer/common/cluster/deliver.go +++ b/orderer/common/cluster/deliver.go @@ -43,7 +43,7 @@ type BlockPuller struct { // A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this // channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe. - // It is the responsibility of the 'stopper' not to close the channel more then once. + // It is the responsibility of the 'stopper' not to close the channel more than once. StopChannel chan struct{} // Internal state diff --git a/orderer/common/multichannel/mocks/read_writer.go b/orderer/common/multichannel/mocks/read_writer.go index be2768bc0c5..3efc35b27bf 100644 --- a/orderer/common/multichannel/mocks/read_writer.go +++ b/orderer/common/multichannel/mocks/read_writer.go @@ -21,6 +21,16 @@ type ReadWriter struct { appendReturnsOnCall map[int]struct { result1 error } + GetCurrentBlockHashStub func() []byte + getCurrentBlockHashMutex sync.RWMutex + getCurrentBlockHashArgsForCall []struct { + } + getCurrentBlockHashReturns struct { + result1 []byte + } + getCurrentBlockHashReturnsOnCall map[int]struct { + result1 []byte + } HeightStub func() uint64 heightMutex sync.RWMutex heightArgsForCall []struct { @@ -122,6 +132,59 @@ func (fake *ReadWriter) AppendReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *ReadWriter) GetCurrentBlockHash() []byte { + fake.getCurrentBlockHashMutex.Lock() + ret, specificReturn := fake.getCurrentBlockHashReturnsOnCall[len(fake.getCurrentBlockHashArgsForCall)] + fake.getCurrentBlockHashArgsForCall = append(fake.getCurrentBlockHashArgsForCall, struct { + }{}) + stub := fake.GetCurrentBlockHashStub + fakeReturns := fake.getCurrentBlockHashReturns + fake.recordInvocation("GetCurrentBlockHash", []interface{}{}) + fake.getCurrentBlockHashMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *ReadWriter) GetCurrentBlockHashCallCount() int { + fake.getCurrentBlockHashMutex.RLock() + defer fake.getCurrentBlockHashMutex.RUnlock() + return len(fake.getCurrentBlockHashArgsForCall) +} + +func (fake *ReadWriter) GetCurrentBlockHashCalls(stub func() []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = stub +} + +func (fake *ReadWriter) GetCurrentBlockHashReturns(result1 []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = nil + fake.getCurrentBlockHashReturns = struct { + result1 []byte + }{result1} +} + +func (fake *ReadWriter) GetCurrentBlockHashReturnsOnCall(i int, result1 []byte) { + fake.getCurrentBlockHashMutex.Lock() + defer fake.getCurrentBlockHashMutex.Unlock() + fake.GetCurrentBlockHashStub = nil + if fake.getCurrentBlockHashReturnsOnCall == nil { + fake.getCurrentBlockHashReturnsOnCall = make(map[int]struct { + result1 []byte + }) + } + fake.getCurrentBlockHashReturnsOnCall[i] = struct { + result1 []byte + }{result1} +} + func (fake *ReadWriter) Height() uint64 { fake.heightMutex.Lock() ret, specificReturn := fake.heightReturnsOnCall[len(fake.heightArgsForCall)] @@ -308,6 +371,8 @@ func (fake *ReadWriter) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.appendMutex.RLock() defer fake.appendMutex.RUnlock() + fake.getCurrentBlockHashMutex.RLock() + defer fake.getCurrentBlockHashMutex.RUnlock() fake.heightMutex.RLock() defer fake.heightMutex.RUnlock() fake.iteratorMutex.RLock() diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 9f006157258..fe783b57245 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -70,7 +70,7 @@ type BFTChain struct { RuntimeConfig *atomic.Value Channel string Config types.Configuration - BlockPuller BlockPuller + BlockPuller BlockPuller // TODO make bft-synchro Comm cluster.Communicator SignerSerializer signerSerializer PolicyManager policies.Manager @@ -97,7 +97,7 @@ func NewChain( selfID uint64, config types.Configuration, walDir string, - blockPuller BlockPuller, + blockPuller BlockPuller, // TODO make bft-synchro comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, @@ -202,7 +202,7 @@ func bftSmartConsensusBuild( // report cluster size c.Metrics.ClusterSize.Set(float64(clusterSize)) - sync := &Synchronizer{ + sync := &Synchronizer{ // TODO make bft-synchro selfID: rtc.id, BlockToDecision: c.blockToDecision, OnCommit: func(block *cb.Block) types.Reconfig { diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go new file mode 100644 index 00000000000..f5200ccc0b5 --- /dev/null +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -0,0 +1,152 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sort" + "time" + + "github.com/SmartBFT-Go/consensus/pkg/types" + cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/pkg/errors" +) + +type BFTSynchronizer struct { + lastReconfig types.Reconfig + selfID uint64 + LatestConfig func() (types.Configuration, []uint64) + BlockToDecision func(*cb.Block) *types.Decision + OnCommit func(*cb.Block) types.Reconfig + Support consensus.ConsenterSupport + BlockPuller BlockPuller // TODO make bft-synchro - this only an endpoint prober + ClusterSize uint64 // TODO this can be taken from the channel/orderer config + Logger *flogging.FabricLogger +} + +func (s *BFTSynchronizer) Sync() types.SyncResponse { + decision, err := s.synchronize() + if err != nil { + s.Logger.Warnf("Could not synchronize with remote peers due to %s, returning state from local ledger", err) + block := s.Support.Block(s.Support.Height() - 1) + config, nodes := s.LatestConfig() + return types.SyncResponse{ + Latest: *s.BlockToDecision(block), + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, // If we read from ledger we do not need to reconfigure. + CurrentNodes: nodes, + CurrentConfig: config, + }, + } + } + + // After sync has ended, reset the state of the last reconfig. + defer func() { + s.lastReconfig = types.Reconfig{} + }() + return types.SyncResponse{ + Latest: *decision, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: s.lastReconfig.InLatestDecision, + CurrentConfig: s.lastReconfig.CurrentConfig, + CurrentNodes: s.lastReconfig.CurrentNodes, + }, + } +} + +func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { + defer s.BlockPuller.Close() + + //=== we use the BlockPuller to probe all the endpoints and establish a target height + //TODO in BFT it is important that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the + // channel/orderers/consenters (for cluster consensus), that is, every consenter should be represented by a + // delivery endpoint. + heightByEndpoint, err := s.BlockPuller.HeightsByEndpoints() + if err != nil { + return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") + } + + s.Logger.Infof("HeightsByEndpoints: %v", heightByEndpoint) + + if len(heightByEndpoint) == 0 { + return nil, errors.New("no cluster members to synchronize with") + } + + var heights []uint64 + for _, value := range heightByEndpoint { + heights = append(heights, value) + } + + targetHeight := s.computeTargetHeight(heights) + startHeight := s.Support.Height() + if startHeight >= targetHeight { + return nil, errors.Errorf("already at height of %d", targetHeight) + } + + //==== + //TODO create a buffer to accept the blocks delivered from the BFTDeliverer + + //=== + //TODO create the deliverer + bftDeliverer := &blocksprovider.BFTDeliverer{ + ChannelID: s.Support.ChannelID(), + BlockHandler: nil, // TODO handle the block into a buffer + //&GossipBlockHandler{ + // gossip: d.conf.Gossip, + // blockGossipDisabled: true, // Block gossip is deprecated since in v2.2 and is no longer supported in v3.x + // logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),}, + Ledger: nil, // ledgerInfo, + UpdatableBlockVerifier: nil, // ubv, + Dialer: blocksprovider.DialerAdapter{ + //ClientConfig: comm.ClientConfig{ + // DialTimeout: d.conf.DeliverServiceConfig.ConnectionTimeout, + // KaOpts: d.conf.DeliverServiceConfig.KeepaliveOptions, + // SecOpts: d.conf.DeliverServiceConfig.SecOpts, + //}, + }, + OrderersSourceFactory: &orderers.ConnectionSourceFactory{}, // no overrides in the orderer + CryptoProvider: nil, // d.conf.CryptoProvider, + DoneC: make(chan struct{}), + Signer: s.Support, + DeliverStreamer: blocksprovider.DeliverAdapter{}, + CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{}, + Logger: flogging.MustGetLogger("orderer.blocksprovider").With("channel", s.Support.ChannelID()), + InitialRetryInterval: 10 * time.Millisecond, // TODO get it from config. + MaxRetryInterval: 2 * time.Second, // TODO get it from config. + BlockCensorshipTimeout: 20 * time.Second, // TODO get it from config. + MaxRetryDuration: time.Minute, // TODO get it from config. + MaxRetryDurationExceededHandler: func() (stopRetries bool) { + return true // In the orderer we must limit the time we try to do Synch() + }, + } + + s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) + + return nil, errors.New("not implemented") +} + +// computeTargetHeight compute the target height to synchronize to. +// +// heights: a slice containing the heights of accessible peers, length must be >0. +// clusterSize: the cluster size, must be >0. +func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { + sort.Slice(heights, func(i, j int) bool { return heights[i] > heights[j] }) // Descending + f := uint64(s.ClusterSize-1) / 3 // The number of tolerated byzantine faults + lenH := uint64(len(heights)) + + s.Logger.Debugf("Heights: %v", heights) + + if lenH < f+1 { + s.Logger.Debugf("Returning %d", heights[0]) + return heights[int(lenH)-1] + } + s.Logger.Debugf("Returning %d", heights[f]) + return heights[f] +}