From 9a88682d711c1b7571a8e2ba9308660925318c07 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Thu, 18 Jan 2024 10:21:24 +0200 Subject: [PATCH 1/5] BFT synchronizer Signed-off-by: Yoav Tock Change-Id: If590c9d4ed727f7c2805142dde36cef3c3f14fb1 --- orderer/common/cluster/deliver.go | 2 +- orderer/consensus/smartbft/chain.go | 6 +- orderer/consensus/smartbft/sync_buffer.go | 10 ++ .../consensus/smartbft/synchronizer_bft.go | 152 ++++++++++++++++++ 4 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 orderer/consensus/smartbft/sync_buffer.go create mode 100644 orderer/consensus/smartbft/synchronizer_bft.go diff --git a/orderer/common/cluster/deliver.go b/orderer/common/cluster/deliver.go index 8f66061866f..444717e4af2 100644 --- a/orderer/common/cluster/deliver.go +++ b/orderer/common/cluster/deliver.go @@ -45,7 +45,7 @@ type BlockPuller struct { // A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this // channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe. - // It is the responsibility of the 'stopper' not to close the channel more then once. + // It is the responsibility of the 'stopper' not to close the channel more than once. StopChannel chan struct{} // Internal state diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 1ef8d4e14ef..aad06c96b9c 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -70,7 +70,7 @@ type BFTChain struct { RuntimeConfig *atomic.Value Channel string Config types.Configuration - BlockPuller BlockPuller + BlockPuller BlockPuller // TODO make bft-synchro Comm cluster.Communicator SignerSerializer signerSerializer PolicyManager policies.Manager @@ -97,7 +97,7 @@ func NewChain( selfID uint64, config types.Configuration, walDir string, - blockPuller BlockPuller, + blockPuller BlockPuller, // TODO make bft-synchro comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, @@ -202,7 +202,7 @@ func bftSmartConsensusBuild( // report cluster size c.Metrics.ClusterSize.Set(float64(clusterSize)) - sync := &Synchronizer{ + sync := &Synchronizer{ // TODO make bft-synchro selfID: rtc.id, BlockToDecision: c.blockToDecision, OnCommit: func(block *cb.Block) types.Reconfig { diff --git a/orderer/consensus/smartbft/sync_buffer.go b/orderer/consensus/smartbft/sync_buffer.go new file mode 100644 index 00000000000..bba4fca3995 --- /dev/null +++ b/orderer/consensus/smartbft/sync_buffer.go @@ -0,0 +1,10 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +type SyncBuffer struct { +} \ No newline at end of file diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go new file mode 100644 index 00000000000..317a6aaa73d --- /dev/null +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -0,0 +1,152 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sort" + "time" + + "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/hyperledger/fabric-lib-go/common/flogging" + cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/pkg/errors" +) + +type BFTSynchronizer struct { + lastReconfig types.Reconfig + selfID uint64 + LatestConfig func() (types.Configuration, []uint64) + BlockToDecision func(*cb.Block) *types.Decision + OnCommit func(*cb.Block) types.Reconfig + Support consensus.ConsenterSupport + BlockPuller BlockPuller // TODO make bft-synchro - this only an endpoint prober + ClusterSize uint64 // TODO this can be taken from the channel/orderer config + Logger *flogging.FabricLogger +} + +func (s *BFTSynchronizer) Sync() types.SyncResponse { + decision, err := s.synchronize() + if err != nil { + s.Logger.Warnf("Could not synchronize with remote orderers due to %s, returning state from local ledger", err) + block := s.Support.Block(s.Support.Height() - 1) + config, nodes := s.LatestConfig() + return types.SyncResponse{ + Latest: *s.BlockToDecision(block), + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, // If we read from ledger we do not need to reconfigure. + CurrentNodes: nodes, + CurrentConfig: config, + }, + } + } + + // After sync has ended, reset the state of the last reconfig. + defer func() { + s.lastReconfig = types.Reconfig{} + }() + return types.SyncResponse{ + Latest: *decision, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: s.lastReconfig.InLatestDecision, + CurrentConfig: s.lastReconfig.CurrentConfig, + CurrentNodes: s.lastReconfig.CurrentNodes, + }, + } +} + +func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { + defer s.BlockPuller.Close() + + //=== we use the BlockPuller to probe all the endpoints and establish a target height + //TODO in BFT it is important that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the + // channel/orderers/consenters (for cluster consensus), that is, every consenter should be represented by a + // delivery endpoint. + heightByEndpoint, _, err := s.BlockPuller.HeightsByEndpoints() + if err != nil { + return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") + } + + s.Logger.Infof("HeightsByEndpoints: %v", heightByEndpoint) + + if len(heightByEndpoint) == 0 { + return nil, errors.New("no cluster members to synchronize with") + } + + var heights []uint64 + for _, value := range heightByEndpoint { + heights = append(heights, value) + } + + targetHeight := s.computeTargetHeight(heights) + startHeight := s.Support.Height() + if startHeight >= targetHeight { + return nil, errors.Errorf("already at target height of %d", targetHeight) + } + + //==== + //TODO create a buffer to accept the blocks delivered from the BFTDeliverer + + //=== + //TODO create the deliverer + bftDeliverer := &blocksprovider.BFTDeliverer{ + ChannelID: s.Support.ChannelID(), + BlockHandler: nil, // TODO handle the block into a buffer + //&GossipBlockHandler{ + // gossip: d.conf.Gossip, + // blockGossipDisabled: true, // Block gossip is deprecated since in v2.2 and is no longer supported in v3.x + // logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),}, + Ledger: nil, // ledgerInfo, + UpdatableBlockVerifier: nil, // ubv, + Dialer: blocksprovider.DialerAdapter{ + //ClientConfig: comm.ClientConfig{ + // DialTimeout: d.conf.DeliverServiceConfig.ConnectionTimeout, + // KaOpts: d.conf.DeliverServiceConfig.KeepaliveOptions, + // SecOpts: d.conf.DeliverServiceConfig.SecOpts, + //}, + }, + OrderersSourceFactory: &orderers.ConnectionSourceFactory{}, // no overrides in the orderer + CryptoProvider: nil, // d.conf.CryptoProvider, + DoneC: make(chan struct{}), + Signer: s.Support, + DeliverStreamer: blocksprovider.DeliverAdapter{}, + CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{}, + Logger: flogging.MustGetLogger("orderer.blocksprovider").With("channel", s.Support.ChannelID()), + InitialRetryInterval: 10 * time.Millisecond, // TODO get it from config. + MaxRetryInterval: 2 * time.Second, // TODO get it from config. + BlockCensorshipTimeout: 20 * time.Second, // TODO get it from config. + MaxRetryDuration: time.Minute, // TODO get it from config. + MaxRetryDurationExceededHandler: func() (stopRetries bool) { + return true // In the orderer we must limit the time we try to do Synch() + }, + } + + s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) + + return nil, errors.New("not implemented") +} + +// computeTargetHeight compute the target height to synchronize to. +// +// heights: a slice containing the heights of accessible peers, length must be >0. +// clusterSize: the cluster size, must be >0. +func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { + sort.Slice(heights, func(i, j int) bool { return heights[i] > heights[j] }) // Descending + f := uint64(s.ClusterSize-1) / 3 // The number of tolerated byzantine faults + lenH := uint64(len(heights)) + + s.Logger.Debugf("Heights: %v", heights) + + if lenH < f+1 { + s.Logger.Debugf("Returning %d", heights[0]) + return heights[int(lenH)-1] + } + s.Logger.Debugf("Returning %d", heights[f]) + return heights[f] +} From 78dbf3e632548e9365080036a281d9619e957d53 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 20 Feb 2024 11:18:02 +0200 Subject: [PATCH 2/5] commit 2 Signed-off-by: Yoav Tock Change-Id: I1001551cf2f5152b97431ceb0183b56769dd9b8c --- orderer/consensus/smartbft/chain.go | 84 ++++----- orderer/consensus/smartbft/consenter.go | 18 +- orderer/consensus/smartbft/sync_buffer.go | 62 ++++++- .../consensus/smartbft/synchronizer_bft.go | 167 +++++++++++++----- orderer/consensus/smartbft/util.go | 13 ++ 5 files changed, 256 insertions(+), 88 deletions(-) diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index aad06c96b9c..669fd77e740 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/common/msgprocessor" types2 "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" @@ -67,24 +68,26 @@ type signerSerializer interface { // BFTChain implements Chain interface to wire with // BFT smart library type BFTChain struct { - RuntimeConfig *atomic.Value - Channel string - Config types.Configuration - BlockPuller BlockPuller // TODO make bft-synchro - Comm cluster.Communicator - SignerSerializer signerSerializer - PolicyManager policies.Manager - Logger *flogging.FabricLogger - WALDir string - consensus *smartbft.Consensus - support consensus.ConsenterSupport - clusterService *cluster.ClusterService - verifier *Verifier - assembler *Assembler - Metrics *Metrics - MetricsBFT *api.Metrics - MetricsWalBFT *wal.Metrics - bccsp bccsp.BCCSP + RuntimeConfig *atomic.Value + Channel string + Config types.Configuration + BlockPuller BlockPuller // TODO make bft-synchro + clusterDialer *cluster.PredicateDialer // TODO make bft-synchro + localConfigCluster localconfig.Cluster // TODO make bft-synchro + Comm cluster.Communicator + SignerSerializer signerSerializer + PolicyManager policies.Manager + Logger *flogging.FabricLogger + WALDir string + consensus *smartbft.Consensus + support consensus.ConsenterSupport + clusterService *cluster.ClusterService + verifier *Verifier + assembler *Assembler + Metrics *Metrics + MetricsBFT *api.Metrics + MetricsWalBFT *wal.Metrics + bccsp bccsp.BCCSP statusReportMutex sync.Mutex consensusRelation types2.ConsensusRelation @@ -92,21 +95,7 @@ type BFTChain struct { } // NewChain creates new BFT Smart chain -func NewChain( - cv ConfigValidator, - selfID uint64, - config types.Configuration, - walDir string, - blockPuller BlockPuller, // TODO make bft-synchro - comm cluster.Communicator, - signerSerializer signerSerializer, - policyManager policies.Manager, - support consensus.ConsenterSupport, - metrics *Metrics, - metricsBFT *api.Metrics, - metricsWalBFT *wal.Metrics, - bccsp bccsp.BCCSP, -) (*BFTChain, error) { +func NewChain(cv ConfigValidator, selfID uint64, config types.Configuration, walDir string, blockPuller BlockPuller, clusterDialer *cluster.PredicateDialer, localConfigCluster localconfig.Cluster, comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, support consensus.ConsenterSupport, metrics *Metrics, metricsBFT *api.Metrics, metricsWalBFT *wal.Metrics, bccsp bccsp.BCCSP) (*BFTChain, error) { logger := flogging.MustGetLogger("orderer.consensus.smartbft.chain").With(zap.String("channel", support.ChannelID())) requestInspector := &RequestInspector{ @@ -117,18 +106,20 @@ func NewChain( } c := &BFTChain{ - RuntimeConfig: &atomic.Value{}, - Channel: support.ChannelID(), - Config: config, - WALDir: walDir, - Comm: comm, - support: support, - SignerSerializer: signerSerializer, - PolicyManager: policyManager, - BlockPuller: blockPuller, - Logger: logger, - consensusRelation: types2.ConsensusRelationConsenter, - status: types2.StatusActive, + RuntimeConfig: &atomic.Value{}, + Channel: support.ChannelID(), + Config: config, + WALDir: walDir, + Comm: comm, + support: support, + SignerSerializer: signerSerializer, + PolicyManager: policyManager, + BlockPuller: blockPuller, // TODO make bft-synchro + clusterDialer: clusterDialer, // TODO make bft-synchro + localConfigCluster: localConfigCluster, // TODO make bft-synchro + Logger: logger, + consensusRelation: types2.ConsensusRelationConsenter, + status: types2.StatusActive, Metrics: &Metrics{ ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()), CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()), @@ -211,7 +202,8 @@ func bftSmartConsensusBuild( }, Support: c.support, BlockPuller: c.BlockPuller, - ClusterSize: clusterSize, + + ClusterSize: clusterSize, // TODO this must be dynamic as the cluster may change in size Logger: c.Logger, LatestConfig: func() (types.Configuration, []uint64) { rtc := c.RuntimeConfig.Load().(RuntimeConfig) diff --git a/orderer/consensus/smartbft/consenter.go b/orderer/consensus/smartbft/consenter.go index cd5a1bd11a0..f972b9e122d 100644 --- a/orderer/consensus/smartbft/consenter.go +++ b/orderer/consensus/smartbft/consenter.go @@ -203,7 +203,23 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb Logger: c.Logger, } - chain, err := NewChain(configValidator, (uint64)(selfID), config, path.Join(c.WALBaseDir, support.ChannelID()), puller, c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), support, c.Metrics, c.MetricsBFT, c.MetricsWalBFT, c.BCCSP) + chain, err := NewChain( + configValidator, + (uint64)(selfID), + config, + path.Join(c.WALBaseDir, support.ChannelID()), + puller, + c.ClusterDialer, //TODO BFT-sync + c.Conf.General.Cluster, //TODO BFT-sync + c.Comm, + c.SignerSerializer, + c.GetPolicyManager(support.ChannelID()), + support, + c.Metrics, + c.MetricsBFT, + c.MetricsWalBFT, + c.BCCSP, + ) if err != nil { return nil, errors.Wrap(err, "failed creating a new BFTChain") } diff --git a/orderer/consensus/smartbft/sync_buffer.go b/orderer/consensus/smartbft/sync_buffer.go index bba4fca3995..819c8a4877b 100644 --- a/orderer/consensus/smartbft/sync_buffer.go +++ b/orderer/consensus/smartbft/sync_buffer.go @@ -6,5 +6,65 @@ SPDX-License-Identifier: Apache-2.0 package smartbft +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/pkg/errors" +) + type SyncBuffer struct { -} \ No newline at end of file + blockCh chan *common.Block + stopCh chan struct{} + stopOnce sync.Once +} + +func NewSyncBuffer() *SyncBuffer { + return &SyncBuffer{ + blockCh: make(chan *common.Block, 10), + stopCh: make(chan struct{}), + } +} + +// HandleBlock gives the block to the next stage of processing after fetching it from a remote orderer. +func (sb *SyncBuffer) HandleBlock(channelID string, block *common.Block) error { + if block == nil || block.Header == nil { + return errors.New("empty block or block header") + } + + select { + case sb.blockCh <- block: + return nil + case <-sb.stopCh: + return errors.New("SyncBuffer stopping") + } +} + +func (sb *SyncBuffer) PullBlock(seq uint64) *common.Block { + var block *common.Block + for { + select { + case block = <-sb.blockCh: + if block == nil || block.Header == nil { + return nil + } + if block.GetHeader().GetNumber() == seq { + return block + } + if block.GetHeader().GetNumber() < seq { + continue + } + if block.GetHeader().GetNumber() > seq { + return nil + } + case <-sb.stopCh: + return nil + } + } +} + +func (sb *SyncBuffer) Stop() { + sb.stopOnce.Do(func() { + close(sb.stopCh) + }) +} diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go index 317a6aaa73d..29f5c68b9b5 100644 --- a/orderer/consensus/smartbft/synchronizer_bft.go +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -7,28 +7,36 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "github.com/SmartBFT-Go/consensus/smartbftprotos" + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/protoutil" "sort" "time" "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/hyperledger/fabric-lib-go/bccsp" "github.com/hyperledger/fabric-lib-go/common/flogging" - cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/orderer/common/cluster" "github.com/hyperledger/fabric/orderer/consensus" "github.com/pkg/errors" ) type BFTSynchronizer struct { - lastReconfig types.Reconfig - selfID uint64 - LatestConfig func() (types.Configuration, []uint64) - BlockToDecision func(*cb.Block) *types.Decision - OnCommit func(*cb.Block) types.Reconfig - Support consensus.ConsenterSupport - BlockPuller BlockPuller // TODO make bft-synchro - this only an endpoint prober - ClusterSize uint64 // TODO this can be taken from the channel/orderer config - Logger *flogging.FabricLogger + lastReconfig types.Reconfig + selfID uint64 + LatestConfig func() (types.Configuration, []uint64) + BlockToDecision func(*common.Block) *types.Decision + OnCommit func(*common.Block) types.Reconfig + Support consensus.ConsenterSupport + CryptoProvider bccsp.BCCSP + BlockPuller BlockPuller // TODO improve - this only an endpoint prober - detect self EP + clusterDialer *cluster.PredicateDialer // TODO make bft-synchro + localConfigCluster localconfig.Cluster // TODO make bft-synchro + Logger *flogging.FabricLogger } func (s *BFTSynchronizer) Sync() types.SyncResponse { @@ -62,28 +70,37 @@ func (s *BFTSynchronizer) Sync() types.SyncResponse { } func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { - defer s.BlockPuller.Close() + //=== We use the BlockPuller to probe all the endpoints and establish a target height, as well as detect + // the self endpoint. - //=== we use the BlockPuller to probe all the endpoints and establish a target height - //TODO in BFT it is important that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the + // In BFT it is highly recommended that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the // channel/orderers/consenters (for cluster consensus), that is, every consenter should be represented by a // delivery endpoint. - heightByEndpoint, _, err := s.BlockPuller.HeightsByEndpoints() + blockPuller, err := newBlockPuller(s.Support, s.clusterDialer, s.localConfigCluster, s.CryptoProvider) + if err != nil { + return nil, errors.Wrap(err, "cannot get create BlockPuller") + } + defer blockPuller.Close() + + heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints() if err != nil { return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") } - s.Logger.Infof("HeightsByEndpoints: %v", heightByEndpoint) - - if len(heightByEndpoint) == 0 { - return nil, errors.New("no cluster members to synchronize with") - } + s.Logger.Infof("HeightsByEndpoints: %+v, my endpoint: %s", heightByEndpoint, myEndpoint) var heights []uint64 - for _, value := range heightByEndpoint { + for ep, value := range heightByEndpoint { + if ep == myEndpoint { + continue + } heights = append(heights, value) } + if len(heights) == 0 { + return nil, errors.New("no cluster members to synchronize with") + } + targetHeight := s.computeTargetHeight(heights) startHeight := s.Support.Height() if startHeight >= targetHeight { @@ -91,28 +108,37 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { } //==== - //TODO create a buffer to accept the blocks delivered from the BFTDeliverer + // create a buffer to accept the blocks delivered from the BFTDeliverer + syncBuffer := NewSyncBuffer() //=== - //TODO create the deliverer + // create the deliverer + lastBlock := s.Support.Block(startHeight - 1) + lastConfigBlock, err := cluster.LastConfigBlock(lastBlock, s.Support) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config block") + } + lastConfigEnv, err := deliverclient.ConfigFromBlock(lastConfigBlock) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config envelope") + } + verifier, err := deliverclient.NewBlockVerificationAssistant(lastConfigBlock, lastBlock, s.CryptoProvider, s.Logger) + if err != nil { + return nil, errors.Wrapf(err, "failed to create BlockVerificationAssistant") + } + + clientConfig := s.clusterDialer.Config // The cluster and block puller use slightly different options + clientConfig.AsyncConnect = false + clientConfig.SecOpts.VerifyCertificate = nil + bftDeliverer := &blocksprovider.BFTDeliverer{ - ChannelID: s.Support.ChannelID(), - BlockHandler: nil, // TODO handle the block into a buffer - //&GossipBlockHandler{ - // gossip: d.conf.Gossip, - // blockGossipDisabled: true, // Block gossip is deprecated since in v2.2 and is no longer supported in v3.x - // logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),}, - Ledger: nil, // ledgerInfo, - UpdatableBlockVerifier: nil, // ubv, - Dialer: blocksprovider.DialerAdapter{ - //ClientConfig: comm.ClientConfig{ - // DialTimeout: d.conf.DeliverServiceConfig.ConnectionTimeout, - // KaOpts: d.conf.DeliverServiceConfig.KeepaliveOptions, - // SecOpts: d.conf.DeliverServiceConfig.SecOpts, - //}, - }, + ChannelID: s.Support.ChannelID(), + BlockHandler: syncBuffer, + Ledger: &ledgerInfoAdapter{s.Support}, + UpdatableBlockVerifier: verifier, + Dialer: blocksprovider.DialerAdapter{ClientConfig: clientConfig}, OrderersSourceFactory: &orderers.ConnectionSourceFactory{}, // no overrides in the orderer - CryptoProvider: nil, // d.conf.CryptoProvider, + CryptoProvider: s.CryptoProvider, DoneC: make(chan struct{}), Signer: s.Support, DeliverStreamer: blocksprovider.DeliverAdapter{}, @@ -123,13 +149,62 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { BlockCensorshipTimeout: 20 * time.Second, // TODO get it from config. MaxRetryDuration: time.Minute, // TODO get it from config. MaxRetryDurationExceededHandler: func() (stopRetries bool) { + syncBuffer.Stop() return true // In the orderer we must limit the time we try to do Synch() }, } s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) + bftDeliverer.Initialize(lastConfigEnv.GetConfig()) //TODO allow input for self endpoint + + go bftDeliverer.DeliverBlocks() + defer bftDeliverer.Stop() - return nil, errors.New("not implemented") + //=== + // Loop on sync-buffer + + targetSeq := targetHeight - 1 + seq := startHeight + var blocksFetched int + + s.Logger.Debugf("Will fetch sequences [%d-%d]", seq, targetSeq) + + var lastPulledBlock *common.Block + for seq <= targetSeq { + block := syncBuffer.PullBlock(seq) + if block == nil { + s.Logger.Debugf("Failed to fetch block [%d] from cluster", seq) + break + } + if protoutil.IsConfigBlock(block) { + s.Support.WriteConfigBlock(block, nil) + } else { + s.Support.WriteBlock(block, nil) + } + s.Logger.Debugf("Fetched and committed block [%d] from cluster", seq) + lastPulledBlock = block + + prevInLatestDecision := s.lastReconfig.InLatestDecision + s.lastReconfig = s.OnCommit(lastPulledBlock) + s.lastReconfig.InLatestDecision = s.lastReconfig.InLatestDecision || prevInLatestDecision + seq++ + blocksFetched++ + } + + syncBuffer.Stop() + + if lastPulledBlock == nil { + return nil, errors.Errorf("failed pulling block %d", seq) + } + + startSeq := startHeight + s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]", + blocksFetched, startSeq, lastPulledBlock.Header.Number) + + viewMetadata, lastConfigSqn := s.getViewMetadataLastConfigSqnFromBlock(lastPulledBlock) + + s.Logger.Infof("Returning view metadata of %v, lastConfigSeq %d", viewMetadata, lastConfigSqn) + return s.BlockToDecision(lastPulledBlock), nil } // computeTargetHeight compute the target height to synchronize to. @@ -138,7 +213,8 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { // clusterSize: the cluster size, must be >0. func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { sort.Slice(heights, func(i, j int) bool { return heights[i] > heights[j] }) // Descending - f := uint64(s.ClusterSize-1) / 3 // The number of tolerated byzantine faults + clusterSize := len(s.Support.SharedConfig().Consenters()) + f := uint64(clusterSize-1) / 3 // The number of tolerated byzantine faults lenH := uint64(len(heights)) s.Logger.Debugf("Heights: %v", heights) @@ -150,3 +226,14 @@ func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { s.Logger.Debugf("Returning %d", heights[f]) return heights[f] } + +func (s *BFTSynchronizer) getViewMetadataLastConfigSqnFromBlock(block *common.Block) (*smartbftprotos.ViewMetadata, uint64) { + viewMetadata, err := getViewMetadataFromBlock(block) + if err != nil { + return nil, 0 + } + + lastConfigSqn := s.Support.Sequence() + + return viewMetadata, lastConfigSqn +} diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index a7d03294aa2..bb325a7625b 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -515,3 +515,16 @@ func createSmartBftConfig(odrdererConfig channelconfig.Orderer) (*smartbft.Optio configOptions.RequestBatchMaxBytes = uint64(batchSize.AbsoluteMaxBytes) return configOptions, nil } + +// ledgerInfoAdapter translates from blocksprovider.LedgerInfo in to calls to consensus.ConsenterSupport. +type ledgerInfoAdapter struct { + support consensus.ConsenterSupport +} + +func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) { + return a.support.Height(), nil +} + +func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) { + return nil, errors.New("not implemented") +} From 41ed9b6e68dfd5a3f8ee22b89c89dcffe44e47e1 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 20 Feb 2024 11:19:40 +0200 Subject: [PATCH 3/5] Identify self EP to BFT deliverer Signed-off-by: Yoav Tock Change-Id: Ic1a08b1d71ce4566b24f8a4f484f5831d94d3e07 --- core/deliverservice/deliveryclient.go | 2 +- .../pkg/peer/blocksprovider/bft_deliverer.go | 4 +- .../peer/blocksprovider/bft_deliverer_test.go | 2 +- internal/pkg/peer/blocksprovider/deliverer.go | 2 +- .../fake/orderer_connection_source_factory.go | 18 ++-- internal/pkg/peer/orderers/connection.go | 15 +++- .../pkg/peer/orderers/connection_factory.go | 6 +- .../peer/orderers/connection_factory_test.go | 4 +- internal/pkg/peer/orderers/connection_test.go | 84 +++++++++++++++++-- orderer/consensus/smartbft/chain.go | 18 +++- orderer/consensus/smartbft/consenter.go | 4 +- .../consensus/smartbft/synchronizer_bft.go | 9 +- orderer/consensus/smartbft/util.go | 2 +- 13 files changed, 133 insertions(+), 37 deletions(-) diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 75c2260a22e..11ba12b4b66 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -280,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0]) } - dcBFT.Initialize(d.conf.ChannelConfig) + dcBFT.Initialize(d.conf.ChannelConfig, "") return dcBFT, nil } diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index 13b96595357..aa6e1c349a7 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -105,7 +105,7 @@ type BFTDeliverer struct { censorshipMonitor CensorshipDetector } -func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { +func (d *BFTDeliverer) Initialize(channelConfig *common.Config, selfEndpoint string) { d.requester = NewDeliveryRequester( d.ChannelID, d.Signer, @@ -115,7 +115,7 @@ func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, selfEndpoint) globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go index 7578622ed6c..8356415b72e 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go @@ -228,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { MaxRetryDuration: 600 * time.Second, MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler, } - s.d.Initialize(s.channelConfig) + s.d.Initialize(s.channelConfig, "") s.fakeSleeper = &fake.Sleeper{} diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index 78b13dc2e92..8b2e3e57372 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -117,7 +117,7 @@ func (d *Deliverer) Initialize(channelConfig *cb.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, "") globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go index 9b904deca6c..4dc7adfbeca 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go @@ -10,10 +10,11 @@ import ( ) type OrdererConnectionSourceFactory struct { - CreateConnectionSourceStub func(*flogging.FabricLogger) orderers.ConnectionSourcer + CreateConnectionSourceStub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer createConnectionSourceMutex sync.RWMutex createConnectionSourceArgsForCall []struct { arg1 *flogging.FabricLogger + arg2 string } createConnectionSourceReturns struct { result1 orderers.ConnectionSourcer @@ -25,18 +26,19 @@ type OrdererConnectionSourceFactory struct { invocationsMutex sync.RWMutex } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger) orderers.ConnectionSourcer { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger, arg2 string) orderers.ConnectionSourcer { fake.createConnectionSourceMutex.Lock() ret, specificReturn := fake.createConnectionSourceReturnsOnCall[len(fake.createConnectionSourceArgsForCall)] fake.createConnectionSourceArgsForCall = append(fake.createConnectionSourceArgsForCall, struct { arg1 *flogging.FabricLogger - }{arg1}) + arg2 string + }{arg1, arg2}) stub := fake.CreateConnectionSourceStub fakeReturns := fake.createConnectionSourceReturns - fake.recordInvocation("CreateConnectionSource", []interface{}{arg1}) + fake.recordInvocation("CreateConnectionSource", []interface{}{arg1, arg2}) fake.createConnectionSourceMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -50,17 +52,17 @@ func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCallCount() in return len(fake.createConnectionSourceArgsForCall) } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger) orderers.ConnectionSourcer) { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer) { fake.createConnectionSourceMutex.Lock() defer fake.createConnectionSourceMutex.Unlock() fake.CreateConnectionSourceStub = stub } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) *flogging.FabricLogger { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) (*flogging.FabricLogger, string) { fake.createConnectionSourceMutex.RLock() defer fake.createConnectionSourceMutex.RUnlock() argsForCall := fake.createConnectionSourceArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceReturns(result1 orderers.ConnectionSourcer) { diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index 0a535704fef..ee0fab5b603 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -24,6 +24,8 @@ type ConnectionSource struct { orgToEndpointsHash map[string][]byte logger *flogging.FabricLogger overrides map[string]*Endpoint + // empty when used by a peer, or self endpoint address when the ConnectionSource is used by an orderer. + selfEndpoint string } type Endpoint struct { @@ -56,11 +58,12 @@ type OrdererOrg struct { RootCerts [][]byte } -func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource { +func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint, selfEndpoint string) *ConnectionSource { return &ConnectionSource{ orgToEndpointsHash: map[string][]byte{}, logger: logger, overrides: overrides, + selfEndpoint: selfEndpoint, } } @@ -202,6 +205,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer // Note, if !hasOrgEndpoints, this for loop is a no-op for _, address := range org.Addresses { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from org specific endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -228,6 +235,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer } for _, address := range globalAddrs { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from global endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -245,5 +256,5 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer }) } - cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only") + cs.logger.Debug("Returning an orderer connection pool source with global endpoints only") } diff --git a/internal/pkg/peer/orderers/connection_factory.go b/internal/pkg/peer/orderers/connection_factory.go index eaaa6448526..a6defc4744f 100644 --- a/internal/pkg/peer/orderers/connection_factory.go +++ b/internal/pkg/peer/orderers/connection_factory.go @@ -17,13 +17,13 @@ type ConnectionSourcer interface { } type ConnectionSourceCreator interface { - CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer + CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer } type ConnectionSourceFactory struct { Overrides map[string]*Endpoint } -func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer { - return NewConnectionSource(logger, f.Overrides) +func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer { + return NewConnectionSource(logger, f.Overrides, selfEndpoint) } diff --git a/internal/pkg/peer/orderers/connection_factory_test.go b/internal/pkg/peer/orderers/connection_factory_test.go index e780caaa8fd..882fb345ddd 100644 --- a/internal/pkg/peer/orderers/connection_factory_test.go +++ b/internal/pkg/peer/orderers/connection_factory_test.go @@ -19,7 +19,7 @@ func TestCreateConnectionSource(t *testing.T) { require.NotNil(t, factory) require.Nil(t, factory.Overrides) lg := flogging.MustGetLogger("test") - connSource := factory.CreateConnectionSource(lg) + connSource := factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) overrides := make(map[string]*orderers.Endpoint) @@ -31,6 +31,6 @@ func TestCreateConnectionSource(t *testing.T) { factory = &orderers.ConnectionSourceFactory{Overrides: overrides} require.NotNil(t, factory) require.Len(t, factory.Overrides, 1) - connSource = factory.CreateConnectionSource(lg) + connSource = factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) } diff --git a/internal/pkg/peer/orderers/connection_test.go b/internal/pkg/peer/orderers/connection_test.go index a5b0aaa5490..62dc8352900 100644 --- a/internal/pkg/peer/orderers/connection_test.go +++ b/internal/pkg/peer/orderers/connection_test.go @@ -12,11 +12,10 @@ import ( "sort" "strings" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/hyperledger/fabric-lib-go/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) type comparableEndpoint struct { @@ -83,12 +82,13 @@ var _ = Describe("Connection", func() { org2Certs = [][]byte{cert3} overrideCerts = [][]byte{cert2} - cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), map[string]*orderers.Endpoint{ - "override-address": { - Address: "re-mapped-address", - RootCerts: overrideCerts, - }, - }) + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, "") // << no self endpoint cs.Update(nil, map[string]orderers.OrdererOrg{ "org1": org1, "org2": org2, @@ -567,4 +567,70 @@ var _ = Describe("Connection", func() { }) }) }) + + When("self-endpoint exists as in the orderer", func() { + BeforeEach(func() { + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, + "org1-address1") + cs.Update(nil, map[string]orderers.OrdererOrg{ + "org1": org1, + "org2": org2, + }) + + endpoints = cs.Endpoints() + }) + + It("does not include the self-endpoint in endpoints", func() { + Expect(len(endpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not include the self endpoint in shuffled endpoints", func() { + shuffledEndpoints := cs.ShuffledEndpoints() + Expect(len(shuffledEndpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not mark any of the endpoints as refreshed", func() { + for _, endpoint := range endpoints { + Expect(endpoint.Refreshed).NotTo(BeClosed()) + } + }) + }) }) diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 669fd77e740..72b3ad3a161 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -95,7 +95,23 @@ type BFTChain struct { } // NewChain creates new BFT Smart chain -func NewChain(cv ConfigValidator, selfID uint64, config types.Configuration, walDir string, blockPuller BlockPuller, clusterDialer *cluster.PredicateDialer, localConfigCluster localconfig.Cluster, comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, support consensus.ConsenterSupport, metrics *Metrics, metricsBFT *api.Metrics, metricsWalBFT *wal.Metrics, bccsp bccsp.BCCSP) (*BFTChain, error) { +func NewChain( + cv ConfigValidator, + selfID uint64, + config types.Configuration, + walDir string, + blockPuller BlockPuller, + clusterDialer *cluster.PredicateDialer, + localConfigCluster localconfig.Cluster, + comm cluster.Communicator, + signerSerializer signerSerializer, + policyManager policies.Manager, + support consensus.ConsenterSupport, + metrics *Metrics, + metricsBFT *api.Metrics, + metricsWalBFT *wal.Metrics, + bccsp bccsp.BCCSP, +) (*BFTChain, error) { logger := flogging.MustGetLogger("orderer.consensus.smartbft.chain").With(zap.String("channel", support.ChannelID())) requestInspector := &RequestInspector{ diff --git a/orderer/consensus/smartbft/consenter.go b/orderer/consensus/smartbft/consenter.go index f972b9e122d..666044ef6ff 100644 --- a/orderer/consensus/smartbft/consenter.go +++ b/orderer/consensus/smartbft/consenter.go @@ -209,8 +209,8 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb config, path.Join(c.WALBaseDir, support.ChannelID()), puller, - c.ClusterDialer, //TODO BFT-sync - c.Conf.General.Cluster, //TODO BFT-sync + c.ClusterDialer, // TODO BFT-sync + c.Conf.General.Cluster, // TODO BFT-sync c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go index 29f5c68b9b5..9416782fee9 100644 --- a/orderer/consensus/smartbft/synchronizer_bft.go +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -7,11 +7,12 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "sort" + "time" + "github.com/SmartBFT-Go/consensus/smartbftprotos" "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/protoutil" - "sort" - "time" "github.com/SmartBFT-Go/consensus/pkg/types" "github.com/hyperledger/fabric-lib-go/bccsp" @@ -81,7 +82,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { return nil, errors.Wrap(err, "cannot get create BlockPuller") } defer blockPuller.Close() - + heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints() if err != nil { return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") @@ -155,7 +156,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { } s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) - bftDeliverer.Initialize(lastConfigEnv.GetConfig()) //TODO allow input for self endpoint + bftDeliverer.Initialize(lastConfigEnv.GetConfig(), myEndpoint) go bftDeliverer.DeliverBlocks() defer bftDeliverer.Stop() diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index bb325a7625b..c8e7c4ab945 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -526,5 +526,5 @@ func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) { } func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) { - return nil, errors.New("not implemented") + return nil, errors.New("not implemented: never used in orderer") } From 0fc8c4c077c2cf4ff8de08184af93b2ab4597e28 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Wed, 21 Feb 2024 15:13:30 +0200 Subject: [PATCH 4/5] Switch by replication policy Signed-off-by: Yoav Tock Change-Id: Iaecd27c0295748f94947f5af94a7385ca455f245 --- orderer/common/localconfig/config.go | 4 ++ orderer/common/localconfig/config_test.go | 1 + orderer/consensus/smartbft/chain.go | 55 ++++++++++++++++------- sampleconfig/orderer.yaml | 6 +++ 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 649f2e3a7b2..120ffd0d77b 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -68,6 +68,7 @@ type Cluster struct { ReplicationPullTimeout time.Duration ReplicationRetryTimeout time.Duration ReplicationMaxRetries int + ReplicationPolicy string // BFT: "simple" | "consensus" (default); etcdraft: ignored, always "simple" SendBufferSize int CertExpirationWarningThreshold time.Duration TLSHandshakeTimeShift time.Duration @@ -188,6 +189,7 @@ var Defaults = TopLevel{ ReplicationRetryTimeout: time.Second * 5, ReplicationPullTimeout: time.Second * 5, CertExpirationWarningThreshold: time.Hour * 24 * 7, + ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored }, LocalMSPDir: "msp", LocalMSPID: "SampleOrg", @@ -332,6 +334,8 @@ func (c *TopLevel) completeInitialization(configDir string) { c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout case c.General.Cluster.CertExpirationWarningThreshold == 0: c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold + case c.General.Cluster.ReplicationPolicy == "": + c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy case c.General.Profile.Enabled && c.General.Profile.Address == "": logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", Defaults.General.Profile.Address) diff --git a/orderer/common/localconfig/config_test.go b/orderer/common/localconfig/config_test.go index 62d924dc8f9..1efb226743f 100644 --- a/orderer/common/localconfig/config_test.go +++ b/orderer/common/localconfig/config_test.go @@ -165,6 +165,7 @@ func TestClusterDefaults(t *testing.T) { cfg, err := cc.load() require.NoError(t, err) require.Equal(t, cfg.General.Cluster.ReplicationMaxRetries, Defaults.General.Cluster.ReplicationMaxRetries) + require.Equal(t, cfg.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) } func TestConsensusConfig(t *testing.T) { diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 72b3ad3a161..133103fbf11 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -209,22 +209,45 @@ func bftSmartConsensusBuild( // report cluster size c.Metrics.ClusterSize.Set(float64(clusterSize)) - sync := &Synchronizer{ // TODO make bft-synchro - selfID: rtc.id, - BlockToDecision: c.blockToDecision, - OnCommit: func(block *cb.Block) types.Reconfig { - c.pruneCommittedRequests(block) - return c.updateRuntimeConfig(block) - }, - Support: c.support, - BlockPuller: c.BlockPuller, - - ClusterSize: clusterSize, // TODO this must be dynamic as the cluster may change in size - Logger: c.Logger, - LatestConfig: func() (types.Configuration, []uint64) { - rtc := c.RuntimeConfig.Load().(RuntimeConfig) - return rtc.BFTConfig, rtc.Nodes - }, + var sync api.Synchronizer + switch c.localConfigCluster.ReplicationPolicy { + case "consensus": + sync = &Synchronizer{ // TODO make bft-synchronizer + selfID: rtc.id, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + BlockPuller: c.BlockPuller, + ClusterSize: clusterSize, + Logger: c.Logger, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + } + case "simple": + sync = &Synchronizer{ + selfID: rtc.id, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + BlockPuller: c.BlockPuller, + + ClusterSize: clusterSize, // TODO this must be dynamic as the cluster may change in size + Logger: c.Logger, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + } + default: + } channelDecorator := zap.String("channel", c.support.ChannelID()) diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index ee3ae31af41..36f70aa2a80 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -115,6 +115,12 @@ General: # ServerPrivateKey defines the file location of the private key of the TLS certificate. ServerPrivateKey: + # ReplicationPolicy defines how blocks are replicated between orderers. + # Permitted values: + # in BFT: "simple" | "consensus" (default); + # in etcdraft: ignored, (always "simple", regardless of value in config). + ReplicationPolicy: + # Bootstrap method: The method by which to obtain the bootstrap block # system channel is specified. The option can be one of: # "file" - path to a file containing the genesis block or config block of system channel From 76fbeac6c94e3eb367e6ac060246a6ccf6f80cf9 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Thu, 22 Feb 2024 11:17:25 +0200 Subject: [PATCH 5/5] Integrate BFTSynchronizer Signed-off-by: Yoav Tock Change-Id: I20182b39eafeb5465fb37d1248cdb70b025dc55c --- orderer/common/localconfig/config.go | 3 ++- orderer/consensus/smartbft/chain.go | 24 ++++++++++++------------ orderer/consensus/smartbft/util.go | 2 ++ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 120ffd0d77b..0cd95e80faf 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -334,7 +334,8 @@ func (c *TopLevel) completeInitialization(configDir string) { c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout case c.General.Cluster.CertExpirationWarningThreshold == 0: c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold - case c.General.Cluster.ReplicationPolicy == "": + case (c.General.Cluster.ReplicationPolicy != "simple") && (c.General.Cluster.ReplicationPolicy != "consensus"): + logger.Infof("General.Cluster.ReplicationPolicy is `%s`, setting to `%s`", c.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy case c.General.Profile.Enabled && c.General.Profile.Address == "": diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 133103fbf11..bbd512e3b08 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -212,21 +212,22 @@ func bftSmartConsensusBuild( var sync api.Synchronizer switch c.localConfigCluster.ReplicationPolicy { case "consensus": - sync = &Synchronizer{ // TODO make bft-synchronizer - selfID: rtc.id, + sync = &BFTSynchronizer{ + selfID: rtc.id, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, BlockToDecision: c.blockToDecision, OnCommit: func(block *cb.Block) types.Reconfig { c.pruneCommittedRequests(block) return c.updateRuntimeConfig(block) }, - Support: c.support, - BlockPuller: c.BlockPuller, - ClusterSize: clusterSize, - Logger: c.Logger, - LatestConfig: func() (types.Configuration, []uint64) { - rtc := c.RuntimeConfig.Load().(RuntimeConfig) - return rtc.BFTConfig, rtc.Nodes - }, + Support: c.support, + CryptoProvider: c.bccsp, + clusterDialer: c.clusterDialer, + localConfigCluster: c.localConfigCluster, + Logger: c.Logger, } case "simple": sync = &Synchronizer{ @@ -238,7 +239,6 @@ func bftSmartConsensusBuild( }, Support: c.support, BlockPuller: c.BlockPuller, - ClusterSize: clusterSize, // TODO this must be dynamic as the cluster may change in size Logger: c.Logger, LatestConfig: func() (types.Configuration, []uint64) { @@ -247,7 +247,7 @@ func bftSmartConsensusBuild( }, } default: - + c.Logger.Panicf("unsupported ReplicationPolicy: %s", c.localConfigCluster.ReplicationPolicy) } channelDecorator := zap.String("channel", c.support.ChannelID()) diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index c8e7c4ab945..113dcf54f6a 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -176,6 +176,8 @@ func newBlockPuller( Dialer: stdDialer, } + logger.Infof("Built new block puller with cluster config: %+v, endpoints: %+v", clusterConfig, endpoints) + return bp, nil }