From c93202aae5663433b1b083e0a9edb48bcfb274b8 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 20 Feb 2024 11:19:40 +0200 Subject: [PATCH] Identify self EP to BFT deliverer Signed-off-by: Yoav Tock Change-Id: Ic1a08b1d71ce4566b24f8a4f484f5831d94d3e07 --- core/deliverservice/deliveryclient.go | 2 +- .../pkg/peer/blocksprovider/bft_deliverer.go | 4 +- .../peer/blocksprovider/bft_deliverer_test.go | 2 +- internal/pkg/peer/blocksprovider/deliverer.go | 2 +- .../fake/orderer_connection_source_factory.go | 18 ++-- internal/pkg/peer/orderers/connection.go | 15 +++- .../pkg/peer/orderers/connection_factory.go | 6 +- .../peer/orderers/connection_factory_test.go | 4 +- internal/pkg/peer/orderers/connection_test.go | 84 +++++++++++++++++-- orderer/consensus/smartbft/consenter.go | 4 +- .../consensus/smartbft/synchronizer_bft.go | 9 +- orderer/consensus/smartbft/util.go | 2 +- 12 files changed, 116 insertions(+), 36 deletions(-) diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 75c2260a22e..11ba12b4b66 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -280,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0]) } - dcBFT.Initialize(d.conf.ChannelConfig) + dcBFT.Initialize(d.conf.ChannelConfig, "") return dcBFT, nil } diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index 13b96595357..aa6e1c349a7 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -105,7 +105,7 @@ type BFTDeliverer struct { censorshipMonitor CensorshipDetector } -func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { +func (d *BFTDeliverer) Initialize(channelConfig *common.Config, selfEndpoint string) { d.requester = NewDeliveryRequester( d.ChannelID, d.Signer, @@ -115,7 +115,7 @@ func (d *BFTDeliverer) Initialize(channelConfig *common.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, selfEndpoint) globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go index 7578622ed6c..8356415b72e 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go @@ -228,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) { MaxRetryDuration: 600 * time.Second, MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler, } - s.d.Initialize(s.channelConfig) + s.d.Initialize(s.channelConfig, "") s.fakeSleeper = &fake.Sleeper{} diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index 78b13dc2e92..8b2e3e57372 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -117,7 +117,7 @@ func (d *Deliverer) Initialize(channelConfig *cb.Config) { ) osLogger := flogging.MustGetLogger("peer.orderers") - ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger) + ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, "") globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider) if err != nil { // The bundle was created prior to calling this function, so it should not fail when we recreate it here. diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go index 9b904deca6c..4dc7adfbeca 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source_factory.go @@ -10,10 +10,11 @@ import ( ) type OrdererConnectionSourceFactory struct { - CreateConnectionSourceStub func(*flogging.FabricLogger) orderers.ConnectionSourcer + CreateConnectionSourceStub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer createConnectionSourceMutex sync.RWMutex createConnectionSourceArgsForCall []struct { arg1 *flogging.FabricLogger + arg2 string } createConnectionSourceReturns struct { result1 orderers.ConnectionSourcer @@ -25,18 +26,19 @@ type OrdererConnectionSourceFactory struct { invocationsMutex sync.RWMutex } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger) orderers.ConnectionSourcer { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSource(arg1 *flogging.FabricLogger, arg2 string) orderers.ConnectionSourcer { fake.createConnectionSourceMutex.Lock() ret, specificReturn := fake.createConnectionSourceReturnsOnCall[len(fake.createConnectionSourceArgsForCall)] fake.createConnectionSourceArgsForCall = append(fake.createConnectionSourceArgsForCall, struct { arg1 *flogging.FabricLogger - }{arg1}) + arg2 string + }{arg1, arg2}) stub := fake.CreateConnectionSourceStub fakeReturns := fake.createConnectionSourceReturns - fake.recordInvocation("CreateConnectionSource", []interface{}{arg1}) + fake.recordInvocation("CreateConnectionSource", []interface{}{arg1, arg2}) fake.createConnectionSourceMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -50,17 +52,17 @@ func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCallCount() in return len(fake.createConnectionSourceArgsForCall) } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger) orderers.ConnectionSourcer) { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceCalls(stub func(*flogging.FabricLogger, string) orderers.ConnectionSourcer) { fake.createConnectionSourceMutex.Lock() defer fake.createConnectionSourceMutex.Unlock() fake.CreateConnectionSourceStub = stub } -func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) *flogging.FabricLogger { +func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceArgsForCall(i int) (*flogging.FabricLogger, string) { fake.createConnectionSourceMutex.RLock() defer fake.createConnectionSourceMutex.RUnlock() argsForCall := fake.createConnectionSourceArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *OrdererConnectionSourceFactory) CreateConnectionSourceReturns(result1 orderers.ConnectionSourcer) { diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index 0a535704fef..ee0fab5b603 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -24,6 +24,8 @@ type ConnectionSource struct { orgToEndpointsHash map[string][]byte logger *flogging.FabricLogger overrides map[string]*Endpoint + // empty when used by a peer, or self endpoint address when the ConnectionSource is used by an orderer. + selfEndpoint string } type Endpoint struct { @@ -56,11 +58,12 @@ type OrdererOrg struct { RootCerts [][]byte } -func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource { +func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint, selfEndpoint string) *ConnectionSource { return &ConnectionSource{ orgToEndpointsHash: map[string][]byte{}, logger: logger, overrides: overrides, + selfEndpoint: selfEndpoint, } } @@ -202,6 +205,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer // Note, if !hasOrgEndpoints, this for loop is a no-op for _, address := range org.Addresses { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from org specific endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -228,6 +235,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer } for _, address := range globalAddrs { + if address == cs.selfEndpoint { + cs.logger.Debugf("Skipping self endpoint [%s] from global endpoints", address) + continue + } overrideEndpoint, ok := cs.overrides[address] if ok { cs.allEndpoints = append(cs.allEndpoints, &Endpoint{ @@ -245,5 +256,5 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer }) } - cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only") + cs.logger.Debug("Returning an orderer connection pool source with global endpoints only") } diff --git a/internal/pkg/peer/orderers/connection_factory.go b/internal/pkg/peer/orderers/connection_factory.go index eaaa6448526..a6defc4744f 100644 --- a/internal/pkg/peer/orderers/connection_factory.go +++ b/internal/pkg/peer/orderers/connection_factory.go @@ -17,13 +17,13 @@ type ConnectionSourcer interface { } type ConnectionSourceCreator interface { - CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer + CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer } type ConnectionSourceFactory struct { Overrides map[string]*Endpoint } -func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer { - return NewConnectionSource(logger, f.Overrides) +func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer { + return NewConnectionSource(logger, f.Overrides, selfEndpoint) } diff --git a/internal/pkg/peer/orderers/connection_factory_test.go b/internal/pkg/peer/orderers/connection_factory_test.go index e780caaa8fd..882fb345ddd 100644 --- a/internal/pkg/peer/orderers/connection_factory_test.go +++ b/internal/pkg/peer/orderers/connection_factory_test.go @@ -19,7 +19,7 @@ func TestCreateConnectionSource(t *testing.T) { require.NotNil(t, factory) require.Nil(t, factory.Overrides) lg := flogging.MustGetLogger("test") - connSource := factory.CreateConnectionSource(lg) + connSource := factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) overrides := make(map[string]*orderers.Endpoint) @@ -31,6 +31,6 @@ func TestCreateConnectionSource(t *testing.T) { factory = &orderers.ConnectionSourceFactory{Overrides: overrides} require.NotNil(t, factory) require.Len(t, factory.Overrides, 1) - connSource = factory.CreateConnectionSource(lg) + connSource = factory.CreateConnectionSource(lg, "") require.NotNil(t, connSource) } diff --git a/internal/pkg/peer/orderers/connection_test.go b/internal/pkg/peer/orderers/connection_test.go index a5b0aaa5490..62dc8352900 100644 --- a/internal/pkg/peer/orderers/connection_test.go +++ b/internal/pkg/peer/orderers/connection_test.go @@ -12,11 +12,10 @@ import ( "sort" "strings" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/hyperledger/fabric-lib-go/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) type comparableEndpoint struct { @@ -83,12 +82,13 @@ var _ = Describe("Connection", func() { org2Certs = [][]byte{cert3} overrideCerts = [][]byte{cert2} - cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), map[string]*orderers.Endpoint{ - "override-address": { - Address: "re-mapped-address", - RootCerts: overrideCerts, - }, - }) + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, "") // << no self endpoint cs.Update(nil, map[string]orderers.OrdererOrg{ "org1": org1, "org2": org2, @@ -567,4 +567,70 @@ var _ = Describe("Connection", func() { }) }) }) + + When("self-endpoint exists as in the orderer", func() { + BeforeEach(func() { + cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), + map[string]*orderers.Endpoint{ + "override-address": { + Address: "re-mapped-address", + RootCerts: overrideCerts, + }, + }, + "org1-address1") + cs.Update(nil, map[string]orderers.OrdererOrg{ + "org1": org1, + "org2": org2, + }) + + endpoints = cs.Endpoints() + }) + + It("does not include the self-endpoint in endpoints", func() { + Expect(len(endpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not include the self endpoint in shuffled endpoints", func() { + shuffledEndpoints := cs.ShuffledEndpoints() + Expect(len(shuffledEndpoints)).To(Equal(3)) + Expect(stripEndpoints(endpoints)).To(ConsistOf( + stripEndpoints([]*orderers.Endpoint{ + { + Address: "org1-address2", + RootCerts: org1Certs, + }, + { + Address: "org2-address1", + RootCerts: org2Certs, + }, + { + Address: "org2-address2", + RootCerts: org2Certs, + }, + }), + )) + }) + + It("does not mark any of the endpoints as refreshed", func() { + for _, endpoint := range endpoints { + Expect(endpoint.Refreshed).NotTo(BeClosed()) + } + }) + }) }) diff --git a/orderer/consensus/smartbft/consenter.go b/orderer/consensus/smartbft/consenter.go index f972b9e122d..666044ef6ff 100644 --- a/orderer/consensus/smartbft/consenter.go +++ b/orderer/consensus/smartbft/consenter.go @@ -209,8 +209,8 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb config, path.Join(c.WALBaseDir, support.ChannelID()), puller, - c.ClusterDialer, //TODO BFT-sync - c.Conf.General.Cluster, //TODO BFT-sync + c.ClusterDialer, // TODO BFT-sync + c.Conf.General.Cluster, // TODO BFT-sync c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go index 29f5c68b9b5..9416782fee9 100644 --- a/orderer/consensus/smartbft/synchronizer_bft.go +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -7,11 +7,12 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "sort" + "time" + "github.com/SmartBFT-Go/consensus/smartbftprotos" "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/protoutil" - "sort" - "time" "github.com/SmartBFT-Go/consensus/pkg/types" "github.com/hyperledger/fabric-lib-go/bccsp" @@ -81,7 +82,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { return nil, errors.Wrap(err, "cannot get create BlockPuller") } defer blockPuller.Close() - + heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints() if err != nil { return nil, errors.Wrap(err, "cannot get HeightsByEndpoints") @@ -155,7 +156,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { } s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) - bftDeliverer.Initialize(lastConfigEnv.GetConfig()) //TODO allow input for self endpoint + bftDeliverer.Initialize(lastConfigEnv.GetConfig(), myEndpoint) go bftDeliverer.DeliverBlocks() defer bftDeliverer.Stop() diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index bb325a7625b..c8e7c4ab945 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -526,5 +526,5 @@ func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) { } func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) { - return nil, errors.New("not implemented") + return nil, errors.New("not implemented: never used in orderer") }