diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 75c2260a22e..11ba12b4b66 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -280,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0]) } - dcBFT.Initialize(d.conf.ChannelConfig) + dcBFT.Initialize(d.conf.ChannelConfig, "") return dcBFT, nil } diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index 13b96595357..aa6e1c349a7 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -105,7 +105,7 @@ type BFTDeliverer struct { censorshipMonitor CensorshipDetector } -func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { +func (d *BFTDeliverer) Initialize(channelConfig *common.Config, selfEndpoint string) { d.requester = NewDeliveryRequester( d.ChannelID, d.Signer, @@ -115,7 +115,7 @@ func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, selfEndpoint) globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go index 7578622ed6c..8356415b72e 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go @@ -228,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { MaxRetryDuration: 600 * time.Second, MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler, } - s.d.Initialize(s.channelConfig) + s.d.Initialize(s.channelConfig, "") s.fakeSleeper = &fake.Sleeper{} diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index 78b13dc2e92..8b2e3e57372 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -117,7 +117,7 @@ func (d *Deliverer) Initialize(channelConfig *cb.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, "") globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go index 9b904deca6c..4dc7adfbeca 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go @@ -10,10 +10,11 @@ import ( ) type OrdererConnectionSourceFactory struct { - CreateConnectionSourceStub func(*flogging.FabricLogger) orderers.ConnectionSourcer + CreateConnectionSourceStub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer createConnectionSourceMutex sync.RWMutex createConnectionSourceArgsForCall []struct { arg1 *flogging.FabricLogger + arg2 string } createConnectionSourceReturns struct { result1 orderers.ConnectionSourcer @@ -25,18 +26,19 @@ type OrdererConnectionSourceFactory struct { invocationsMutex sync.RWMutex } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger) orderers.ConnectionSourcer { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger, arg2 string) orderers.ConnectionSourcer { fake.createConnectionSourceMutex.Lock() ret, specificReturn := fake.createConnectionSourceReturnsOnCall[len(fake.createConnectionSourceArgsForCall)] fake.createConnectionSourceArgsForCall = append(fake.createConnectionSourceArgsForCall, struct { arg1 *flogging.FabricLogger - }{arg1}) + arg2 string + }{arg1, arg2}) stub := fake.CreateConnectionSourceStub fakeReturns := fake.createConnectionSourceReturns - fake.recordInvocation("CreateConnectionSource", []interface{}{arg1}) + fake.recordInvocation("CreateConnectionSource", []interface{}{arg1, arg2}) fake.createConnectionSourceMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -50,17 +52,17 @@ func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCallCount() in return len(fake.createConnectionSourceArgsForCall) } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger) orderers.ConnectionSourcer) { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer) { fake.createConnectionSourceMutex.Lock() defer fake.createConnectionSourceMutex.Unlock() fake.CreateConnectionSourceStub = stub } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) *flogging.FabricLogger { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) (*flogging.FabricLogger, string) { fake.createConnectionSourceMutex.RLock() defer fake.createConnectionSourceMutex.RUnlock() argsForCall := fake.createConnectionSourceArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceReturns(result1 orderers.ConnectionSourcer) { diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index 0a535704fef..ee0fab5b603 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -24,6 +24,8 @@ type ConnectionSource struct { orgToEndpointsHash map[string][]byte logger *flogging.FabricLogger overrides map[string]*Endpoint + // empty when used by a peer, or self endpoint address when the ConnectionSource is used by an orderer. + selfEndpoint string } type Endpoint struct { @@ -56,11 +58,12 @@ type OrdererOrg struct { RootCerts [][]byte } -func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource { +func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint, selfEndpoint string) *ConnectionSource { return &ConnectionSource{ orgToEndpointsHash: map[string][]byte{}, logger: logger, overrides: overrides, + selfEndpoint: selfEndpoint, } } @@ -202,6 +205,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer // Note, if !hasOrgEndpoints, this for loop is a no-op for _, address := range org.Addresses { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from org specific endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -228,6 +235,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer } for _, address := range globalAddrs { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from global endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -245,5 +256,5 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer }) } - cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only") + cs.logger.Debug("Returning an orderer connection pool source with global endpoints only") } diff --git a/internal/pkg/peer/orderers/connection_factory.go b/internal/pkg/peer/orderers/connection_factory.go index eaaa6448526..a6defc4744f 100644 --- a/internal/pkg/peer/orderers/connection_factory.go +++ b/internal/pkg/peer/orderers/connection_factory.go @@ -17,13 +17,13 @@ type ConnectionSourcer interface { } type ConnectionSourceCreator interface { - CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer + CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer } type ConnectionSourceFactory struct { Overrides map[string]*Endpoint } -func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer { - return NewConnectionSource(logger, f.Overrides) +func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer { + return NewConnectionSource(logger, f.Overrides, selfEndpoint) } diff --git a/internal/pkg/peer/orderers/connection_factory_test.go b/internal/pkg/peer/orderers/connection_factory_test.go index e780caaa8fd..882fb345ddd 100644 --- a/internal/pkg/peer/orderers/connection_factory_test.go +++ b/internal/pkg/peer/orderers/connection_factory_test.go @@ -19,7 +19,7 @@ func TestCreateConnectionSource(t *testing.T) { require.NotNil(t, factory) require.Nil(t, factory.Overrides) lg := flogging.MustGetLogger("test") - connSource := factory.CreateConnectionSource(lg) + connSource := factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) overrides := make(map[string]*orderers.Endpoint) @@ -31,6 +31,6 @@ func TestCreateConnectionSource(t *testing.T) { factory = &orderers.ConnectionSourceFactory{Overrides: overrides} require.NotNil(t, factory) require.Len(t, factory.Overrides, 1) - connSource = factory.CreateConnectionSource(lg) + connSource = factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) } diff --git a/internal/pkg/peer/orderers/connection_test.go b/internal/pkg/peer/orderers/connection_test.go index a5b0aaa5490..62dc8352900 100644 --- a/internal/pkg/peer/orderers/connection_test.go +++ b/internal/pkg/peer/orderers/connection_test.go @@ -12,11 +12,10 @@ import ( "sort" "strings" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/hyperledger/fabric-lib-go/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) type comparableEndpoint struct { @@ -83,12 +82,13 @@ var _ = Describe("Connection", func() { org2Certs = [][]byte{cert3} overrideCerts = [][]byte{cert2} - cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), map[string]*orderers.Endpoint{ - "override-address": { - Address: "re-mapped-address", - RootCerts: overrideCerts, - }, - }) + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, "") // << no self endpoint cs.Update(nil, map[string]orderers.OrdererOrg{ "org1": org1, "org2": org2, @@ -567,4 +567,70 @@ var _ = Describe("Connection", func() { }) }) }) + + When("self-endpoint exists as in the orderer", func() { + BeforeEach(func() { + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, + "org1-address1") + cs.Update(nil, map[string]orderers.OrdererOrg{ + "org1": org1, + "org2": org2, + }) + + endpoints = cs.Endpoints() + }) + + It("does not include the self-endpoint in endpoints", func() { + Expect(len(endpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not include the self endpoint in shuffled endpoints", func() { + shuffledEndpoints := cs.ShuffledEndpoints() + Expect(len(shuffledEndpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not mark any of the endpoints as refreshed", func() { + for _, endpoint := range endpoints { + Expect(endpoint.Refreshed).NotTo(BeClosed()) + } + }) + }) }) diff --git a/orderer/common/cluster/deliver.go b/orderer/common/cluster/deliver.go index 8f66061866f..444717e4af2 100644 --- a/orderer/common/cluster/deliver.go +++ b/orderer/common/cluster/deliver.go @@ -45,7 +45,7 @@ type BlockPuller struct { // A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this // channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe. - // It is the responsibility of the 'stopper' not to close the channel more then once. + // It is the responsibility of the 'stopper' not to close the channel more than once. StopChannel chan struct{} // Internal state diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 649f2e3a7b2..0cd95e80faf 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -68,6 +68,7 @@ type Cluster struct { ReplicationPullTimeout time.Duration ReplicationRetryTimeout time.Duration ReplicationMaxRetries int + ReplicationPolicy string // BFT: "simple" | "consensus" (default); etcdraft: ignored, always "simple" SendBufferSize int CertExpirationWarningThreshold time.Duration TLSHandshakeTimeShift time.Duration @@ -188,6 +189,7 @@ var Defaults = TopLevel{ ReplicationRetryTimeout: time.Second * 5, ReplicationPullTimeout: time.Second * 5, CertExpirationWarningThreshold: time.Hour * 24 * 7, + ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored }, LocalMSPDir: "msp", LocalMSPID: "SampleOrg", @@ -332,6 +334,9 @@ func (c *TopLevel) completeInitialization(configDir string) { c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout case c.General.Cluster.CertExpirationWarningThreshold == 0: c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold + case (c.General.Cluster.ReplicationPolicy != "simple") && (c.General.Cluster.ReplicationPolicy != "consensus"): + logger.Infof("General.Cluster.ReplicationPolicy is `%s`, setting to `%s`", c.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) + c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy case c.General.Profile.Enabled && c.General.Profile.Address == "": logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", Defaults.General.Profile.Address) diff --git a/orderer/common/localconfig/config_test.go b/orderer/common/localconfig/config_test.go index 62d924dc8f9..1efb226743f 100644 --- a/orderer/common/localconfig/config_test.go +++ b/orderer/common/localconfig/config_test.go @@ -165,6 +165,7 @@ func TestClusterDefaults(t *testing.T) { cfg, err := cc.load() require.NoError(t, err) require.Equal(t, cfg.General.Cluster.ReplicationMaxRetries, Defaults.General.Cluster.ReplicationMaxRetries) + require.Equal(t, cfg.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) } func TestConsensusConfig(t *testing.T) { diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 1ef8d4e14ef..bbd512e3b08 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/common/msgprocessor" types2 "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" @@ -67,24 +68,26 @@ type signerSerializer interface { // BFTChain implements Chain interface to wire with // BFT smart library type BFTChain struct { - RuntimeConfig *atomic.Value - Channel string - Config types.Configuration - BlockPuller BlockPuller - Comm cluster.Communicator - SignerSerializer signerSerializer - PolicyManager policies.Manager - Logger *flogging.FabricLogger - WALDir string - consensus *smartbft.Consensus - support consensus.ConsenterSupport - clusterService *cluster.ClusterService - verifier *Verifier - assembler *Assembler - Metrics *Metrics - MetricsBFT *api.Metrics - MetricsWalBFT *wal.Metrics - bccsp bccsp.BCCSP + RuntimeConfig *atomic.Value + Channel string + Config types.Configuration + BlockPuller BlockPuller // TODO make bft-synchro + clusterDialer *cluster.PredicateDialer // TODO make bft-synchro + localConfigCluster localconfig.Cluster // TODO make bft-synchro + Comm cluster.Communicator + SignerSerializer signerSerializer + PolicyManager policies.Manager + Logger *flogging.FabricLogger + WALDir string + consensus *smartbft.Consensus + support consensus.ConsenterSupport + clusterService *cluster.ClusterService + verifier *Verifier + assembler *Assembler + Metrics *Metrics + MetricsBFT *api.Metrics + MetricsWalBFT *wal.Metrics + bccsp bccsp.BCCSP statusReportMutex sync.Mutex consensusRelation types2.ConsensusRelation @@ -98,6 +101,8 @@ func NewChain( config types.Configuration, walDir string, blockPuller BlockPuller, + clusterDialer *cluster.PredicateDialer, + localConfigCluster localconfig.Cluster, comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, @@ -117,18 +122,20 @@ func NewChain( } c := &BFTChain{ - RuntimeConfig: &atomic.Value{}, - Channel: support.ChannelID(), - Config: config, - WALDir: walDir, - Comm: comm, - support: support, - SignerSerializer: signerSerializer, - PolicyManager: policyManager, - BlockPuller: blockPuller, - Logger: logger, - consensusRelation: types2.ConsensusRelationConsenter, - status: types2.StatusActive, + RuntimeConfig: &atomic.Value{}, + Channel: support.ChannelID(), + Config: config, + WALDir: walDir, + Comm: comm, + support: support, + SignerSerializer: signerSerializer, + PolicyManager: policyManager, + BlockPuller: blockPuller, // TODO make bft-synchro + clusterDialer: clusterDialer, // TODO make bft-synchro + localConfigCluster: localConfigCluster, // TODO make bft-synchro + Logger: logger, + consensusRelation: types2.ConsensusRelationConsenter, + status: types2.StatusActive, Metrics: &Metrics{ ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()), CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()), @@ -202,21 +209,45 @@ func bftSmartConsensusBuild( // report cluster size c.Metrics.ClusterSize.Set(float64(clusterSize)) - sync := &Synchronizer{ - selfID: rtc.id, - BlockToDecision: c.blockToDecision, - OnCommit: func(block *cb.Block) types.Reconfig { - c.pruneCommittedRequests(block) - return c.updateRuntimeConfig(block) - }, - Support: c.support, - BlockPuller: c.BlockPuller, - ClusterSize: clusterSize, - Logger: c.Logger, - LatestConfig: func() (types.Configuration, []uint64) { - rtc := c.RuntimeConfig.Load().(RuntimeConfig) - return rtc.BFTConfig, rtc.Nodes - }, + var sync api.Synchronizer + switch c.localConfigCluster.ReplicationPolicy { + case "consensus": + sync = &BFTSynchronizer{ + selfID: rtc.id, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + CryptoProvider: c.bccsp, + clusterDialer: c.clusterDialer, + localConfigCluster: c.localConfigCluster, + Logger: c.Logger, + } + case "simple": + sync = &Synchronizer{ + selfID: rtc.id, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + BlockPuller: c.BlockPuller, + ClusterSize: clusterSize, // TODO this must be dynamic as the cluster may change in size + Logger: c.Logger, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + } + default: + c.Logger.Panicf("unsupported ReplicationPolicy: %s", c.localConfigCluster.ReplicationPolicy) } channelDecorator := zap.String("channel", c.support.ChannelID()) diff --git a/orderer/consensus/smartbft/consenter.go b/orderer/consensus/smartbft/consenter.go index cd5a1bd11a0..666044ef6ff 100644 --- a/orderer/consensus/smartbft/consenter.go +++ b/orderer/consensus/smartbft/consenter.go @@ -203,7 +203,23 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb Logger: c.Logger, } - chain, err := NewChain(configValidator, (uint64)(selfID), config, path.Join(c.WALBaseDir, support.ChannelID()), puller, c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), support, c.Metrics, c.MetricsBFT, c.MetricsWalBFT, c.BCCSP) + chain, err := NewChain( + configValidator, + (uint64)(selfID), + config, + path.Join(c.WALBaseDir, support.ChannelID()), + puller, + c.ClusterDialer, // TODO BFT-sync + c.Conf.General.Cluster, // TODO BFT-sync + c.Comm, + c.SignerSerializer, + c.GetPolicyManager(support.ChannelID()), + support, + c.Metrics, + c.MetricsBFT, + c.MetricsWalBFT, + c.BCCSP, + ) if err != nil { return nil, errors.Wrap(err, "failed creating a new BFTChain") } diff --git a/orderer/consensus/smartbft/sync_buffer.go b/orderer/consensus/smartbft/sync_buffer.go new file mode 100644 index 00000000000..819c8a4877b --- /dev/null +++ b/orderer/consensus/smartbft/sync_buffer.go @@ -0,0 +1,70 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/pkg/errors" +) + +type SyncBuffer struct { + blockCh chan *common.Block + stopCh chan struct{} + stopOnce sync.Once +} + +func NewSyncBuffer() *SyncBuffer { + return &SyncBuffer{ + blockCh: make(chan *common.Block, 10), + stopCh: make(chan struct{}), + } +} + +// HandleBlock gives the block to the next stage of processing after fetching it from a remote orderer. +func (sb *SyncBuffer) HandleBlock(channelID string, block *common.Block) error { + if block == nil || block.Header == nil { + return errors.New("empty block or block header") + } + + select { + case sb.blockCh <- block: + return nil + case <-sb.stopCh: + return errors.New("SyncBuffer stopping") + } +} + +func (sb *SyncBuffer) PullBlock(seq uint64) *common.Block { + var block *common.Block + for { + select { + case block = <-sb.blockCh: + if block == nil || block.Header == nil { + return nil + } + if block.GetHeader().GetNumber() == seq { + return block + } + if block.GetHeader().GetNumber() < seq { + continue + } + if block.GetHeader().GetNumber() > seq { + return nil + } + case <-sb.stopCh: + return nil + } + } +} + +func (sb *SyncBuffer) Stop() { + sb.stopOnce.Do(func() { + close(sb.stopCh) + }) +} diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go new file mode 100644 index 00000000000..9416782fee9 --- /dev/null +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -0,0 +1,240 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sort" + "time" + + "github.com/SmartBFT-Go/consensus/smartbftprotos" + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/protoutil" + + "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/pkg/errors" +) + +type BFTSynchronizer struct { + lastReconfig types.Reconfig + selfID uint64 + LatestConfig func() (types.Configuration, []uint64) + BlockToDecision func(*common.Block) *types.Decision + OnCommit func(*common.Block) types.Reconfig + Support consensus.ConsenterSupport + CryptoProvider bccsp.BCCSP + BlockPuller BlockPuller // TODO improve - this only an endpoint prober - detect self EP + clusterDialer *cluster.PredicateDialer // TODO make bft-synchro + localConfigCluster localconfig.Cluster // TODO make bft-synchro + Logger *flogging.FabricLogger +} + +func (s *BFTSynchronizer) Sync() types.SyncResponse { + decision, err := s.synchronize() + if err != nil { + s.Logger.Warnf("Could not synchronize with remote orderers due to %s, returning state from local ledger", err) + block := s.Support.Block(s.Support.Height() - 1) + config, nodes := s.LatestConfig() + return types.SyncResponse{ + Latest: *s.BlockToDecision(block), + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, // If we read from ledger we do not need to reconfigure. + CurrentNodes: nodes, + CurrentConfig: config, + }, + } + } + + // After sync has ended, reset the state of the last reconfig. + defer func() { + s.lastReconfig = types.Reconfig{} + }() + return types.SyncResponse{ + Latest: *decision, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: s.lastReconfig.InLatestDecision, + CurrentConfig: s.lastReconfig.CurrentConfig, + CurrentNodes: s.lastReconfig.CurrentNodes, + }, + } +} + +func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { + //=== We use the BlockPuller to probe all the endpoints and establish a target height, as well as detect + // the self endpoint. + + // In BFT it is highly recommended that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the + // channel/orderers/consenters (for cluster consensus), that is, every consenter should be represented by a + // delivery endpoint. + blockPuller, err := newBlockPuller(s.Support, s.clusterDialer, s.localConfigCluster, s.CryptoProvider) + if err != nil { + return nil, errors.Wrap(err, "cannot get create BlockPuller") + } + defer blockPuller.Close() + + heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints() + if err != nil { + return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") + } + + s.Logger.Infof("HeightsByEndpoints: %+v, my endpoint: %s", heightByEndpoint, myEndpoint) + + var heights []uint64 + for ep, value := range heightByEndpoint { + if ep == myEndpoint { + continue + } + heights = append(heights, value) + } + + if len(heights) == 0 { + return nil, errors.New("no cluster members to synchronize with") + } + + targetHeight := s.computeTargetHeight(heights) + startHeight := s.Support.Height() + if startHeight >= targetHeight { + return nil, errors.Errorf("already at target height of %d", targetHeight) + } + + //==== + // create a buffer to accept the blocks delivered from the BFTDeliverer + syncBuffer := NewSyncBuffer() + + //=== + // create the deliverer + lastBlock := s.Support.Block(startHeight - 1) + lastConfigBlock, err := cluster.LastConfigBlock(lastBlock, s.Support) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config block") + } + lastConfigEnv, err := deliverclient.ConfigFromBlock(lastConfigBlock) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config envelope") + } + verifier, err := deliverclient.NewBlockVerificationAssistant(lastConfigBlock, lastBlock, s.CryptoProvider, s.Logger) + if err != nil { + return nil, errors.Wrapf(err, "failed to create BlockVerificationAssistant") + } + + clientConfig := s.clusterDialer.Config // The cluster and block puller use slightly different options + clientConfig.AsyncConnect = false + clientConfig.SecOpts.VerifyCertificate = nil + + bftDeliverer := &blocksprovider.BFTDeliverer{ + ChannelID: s.Support.ChannelID(), + BlockHandler: syncBuffer, + Ledger: &ledgerInfoAdapter{s.Support}, + UpdatableBlockVerifier: verifier, + Dialer: blocksprovider.DialerAdapter{ClientConfig: clientConfig}, + OrderersSourceFactory: &orderers.ConnectionSourceFactory{}, // no overrides in the orderer + CryptoProvider: s.CryptoProvider, + DoneC: make(chan struct{}), + Signer: s.Support, + DeliverStreamer: blocksprovider.DeliverAdapter{}, + CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{}, + Logger: flogging.MustGetLogger("orderer.blocksprovider").With("channel", s.Support.ChannelID()), + InitialRetryInterval: 10 * time.Millisecond, // TODO get it from config. + MaxRetryInterval: 2 * time.Second, // TODO get it from config. + BlockCensorshipTimeout: 20 * time.Second, // TODO get it from config. + MaxRetryDuration: time.Minute, // TODO get it from config. + MaxRetryDurationExceededHandler: func() (stopRetries bool) { + syncBuffer.Stop() + return true // In the orderer we must limit the time we try to do Synch() + }, + } + + s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) + bftDeliverer.Initialize(lastConfigEnv.GetConfig(), myEndpoint) + + go bftDeliverer.DeliverBlocks() + defer bftDeliverer.Stop() + + //=== + // Loop on sync-buffer + + targetSeq := targetHeight - 1 + seq := startHeight + var blocksFetched int + + s.Logger.Debugf("Will fetch sequences [%d-%d]", seq, targetSeq) + + var lastPulledBlock *common.Block + for seq <= targetSeq { + block := syncBuffer.PullBlock(seq) + if block == nil { + s.Logger.Debugf("Failed to fetch block [%d] from cluster", seq) + break + } + if protoutil.IsConfigBlock(block) { + s.Support.WriteConfigBlock(block, nil) + } else { + s.Support.WriteBlock(block, nil) + } + s.Logger.Debugf("Fetched and committed block [%d] from cluster", seq) + lastPulledBlock = block + + prevInLatestDecision := s.lastReconfig.InLatestDecision + s.lastReconfig = s.OnCommit(lastPulledBlock) + s.lastReconfig.InLatestDecision = s.lastReconfig.InLatestDecision || prevInLatestDecision + seq++ + blocksFetched++ + } + + syncBuffer.Stop() + + if lastPulledBlock == nil { + return nil, errors.Errorf("failed pulling block %d", seq) + } + + startSeq := startHeight + s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]", + blocksFetched, startSeq, lastPulledBlock.Header.Number) + + viewMetadata, lastConfigSqn := s.getViewMetadataLastConfigSqnFromBlock(lastPulledBlock) + + s.Logger.Infof("Returning view metadata of %v, lastConfigSeq %d", viewMetadata, lastConfigSqn) + return s.BlockToDecision(lastPulledBlock), nil +} + +// computeTargetHeight compute the target height to synchronize to. +// +// heights: a slice containing the heights of accessible peers, length must be >0. +// clusterSize: the cluster size, must be >0. +func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { + sort.Slice(heights, func(i, j int) bool { return heights[i] > heights[j] }) // Descending + clusterSize := len(s.Support.SharedConfig().Consenters()) + f := uint64(clusterSize-1) / 3 // The number of tolerated byzantine faults + lenH := uint64(len(heights)) + + s.Logger.Debugf("Heights: %v", heights) + + if lenH < f+1 { + s.Logger.Debugf("Returning %d", heights[0]) + return heights[int(lenH)-1] + } + s.Logger.Debugf("Returning %d", heights[f]) + return heights[f] +} + +func (s *BFTSynchronizer) getViewMetadataLastConfigSqnFromBlock(block *common.Block) (*smartbftprotos.ViewMetadata, uint64) { + viewMetadata, err := getViewMetadataFromBlock(block) + if err != nil { + return nil, 0 + } + + lastConfigSqn := s.Support.Sequence() + + return viewMetadata, lastConfigSqn +} diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index a7d03294aa2..113dcf54f6a 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -176,6 +176,8 @@ func newBlockPuller( Dialer: stdDialer, } + logger.Infof("Built new block puller with cluster config: %+v, endpoints: %+v", clusterConfig, endpoints) + return bp, nil } @@ -515,3 +517,16 @@ func createSmartBftConfig(odrdererConfig channelconfig.Orderer) (*smartbft.Optio configOptions.RequestBatchMaxBytes = uint64(batchSize.AbsoluteMaxBytes) return configOptions, nil } + +// ledgerInfoAdapter translates from blocksprovider.LedgerInfo in to calls to consensus.ConsenterSupport. +type ledgerInfoAdapter struct { + support consensus.ConsenterSupport +} + +func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) { + return a.support.Height(), nil +} + +func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) { + return nil, errors.New("not implemented: never used in orderer") +} diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index ee3ae31af41..36f70aa2a80 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -115,6 +115,12 @@ General: # ServerPrivateKey defines the file location of the private key of the TLS certificate. ServerPrivateKey: + # ReplicationPolicy defines how blocks are replicated between orderers. + # Permitted values: + # in BFT: "simple" | "consensus" (default); + # in etcdraft: ignored, (always "simple", regardless of value in config). + ReplicationPolicy: + # Bootstrap method: The method by which to obtain the bootstrap block # system channel is specified. The option can be one of: # "file" - path to a file containing the genesis block or config block of system channel