Skip to content

Commit

Permalink
Identify self EP to BFT deliverer
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: Ic1a08b1d71ce4566b24f8a4f484f5831d94d3e07
  • Loading branch information
tock-ibm committed Feb 21, 2024
1 parent 78dbf3e commit 41ed9b6
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 37 deletions.
2 changes: 1 addition & 1 deletion core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dcBFT.Initialize(d.conf.ChannelConfig)
dcBFT.Initialize(d.conf.ChannelConfig, "")

return dcBFT, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/peer/blocksprovider/bft_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type BFTDeliverer struct {
censorshipMonitor CensorshipDetector
}

func (d *BFTDeliverer) Initialize(channelConfig *common.Config) {
func (d *BFTDeliverer) Initialize(channelConfig *common.Config, selfEndpoint string) {
d.requester = NewDeliveryRequester(
d.ChannelID,
d.Signer,
Expand All @@ -115,7 +115,7 @@ func (d *BFTDeliverer) Initialize(channelConfig *common.Config) {
)

osLogger := flogging.MustGetLogger("peer.orderers")
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger)
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, selfEndpoint)
globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider)
if err != nil {
// The bundle was created prior to calling this function, so it should not fail when we recreate it here.
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/peer/blocksprovider/bft_deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) {
MaxRetryDuration: 600 * time.Second,
MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler,
}
s.d.Initialize(s.channelConfig)
s.d.Initialize(s.channelConfig, "")

s.fakeSleeper = &fake.Sleeper{}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/peer/blocksprovider/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *Deliverer) Initialize(channelConfig *cb.Config) {
)

osLogger := flogging.MustGetLogger("peer.orderers")
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger)
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, "")
globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider)
if err != nil {
// The bundle was created prior to calling this function, so it should not fail when we recreate it here.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions internal/pkg/peer/orderers/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ConnectionSource struct {
orgToEndpointsHash map[string][]byte
logger *flogging.FabricLogger
overrides map[string]*Endpoint
// empty when used by a peer, or self endpoint address when the ConnectionSource is used by an orderer.
selfEndpoint string
}

type Endpoint struct {
Expand Down Expand Up @@ -56,11 +58,12 @@ type OrdererOrg struct {
RootCerts [][]byte
}

func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource {
func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint, selfEndpoint string) *ConnectionSource {
return &ConnectionSource{
orgToEndpointsHash: map[string][]byte{},
logger: logger,
overrides: overrides,
selfEndpoint: selfEndpoint,
}
}

Expand Down Expand Up @@ -202,6 +205,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer

// Note, if !hasOrgEndpoints, this for loop is a no-op
for _, address := range org.Addresses {
if address == cs.selfEndpoint {
cs.logger.Debugf("Skipping self endpoint [%s] from org specific endpoints", address)
continue
}
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Expand All @@ -228,6 +235,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
}

for _, address := range globalAddrs {
if address == cs.selfEndpoint {
cs.logger.Debugf("Skipping self endpoint [%s] from global endpoints", address)
continue
}
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Expand All @@ -245,5 +256,5 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
})
}

cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only")
cs.logger.Debug("Returning an orderer connection pool source with global endpoints only")
}
6 changes: 3 additions & 3 deletions internal/pkg/peer/orderers/connection_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type ConnectionSourcer interface {
}

type ConnectionSourceCreator interface {
CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer
CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer
}

type ConnectionSourceFactory struct {
Overrides map[string]*Endpoint
}

func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer {
return NewConnectionSource(logger, f.Overrides)
func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer {
return NewConnectionSource(logger, f.Overrides, selfEndpoint)
}
4 changes: 2 additions & 2 deletions internal/pkg/peer/orderers/connection_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCreateConnectionSource(t *testing.T) {
require.NotNil(t, factory)
require.Nil(t, factory.Overrides)
lg := flogging.MustGetLogger("test")
connSource := factory.CreateConnectionSource(lg)
connSource := factory.CreateConnectionSource(lg, "")
require.NotNil(t, connSource)

overrides := make(map[string]*orderers.Endpoint)
Expand All @@ -31,6 +31,6 @@ func TestCreateConnectionSource(t *testing.T) {
factory = &orderers.ConnectionSourceFactory{Overrides: overrides}
require.NotNil(t, factory)
require.Len(t, factory.Overrides, 1)
connSource = factory.CreateConnectionSource(lg)
connSource = factory.CreateConnectionSource(lg, "")
require.NotNil(t, connSource)
}
84 changes: 75 additions & 9 deletions internal/pkg/peer/orderers/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"sort"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/hyperledger/fabric-lib-go/common/flogging"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

type comparableEndpoint struct {
Expand Down Expand Up @@ -83,12 +82,13 @@ var _ = Describe("Connection", func() {
org2Certs = [][]byte{cert3}
overrideCerts = [][]byte{cert2}

cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
})
cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"),
map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
}, "") // << no self endpoint
cs.Update(nil, map[string]orderers.OrdererOrg{
"org1": org1,
"org2": org2,
Expand Down Expand Up @@ -567,4 +567,70 @@ var _ = Describe("Connection", func() {
})
})
})

When("self-endpoint exists as in the orderer", func() {
BeforeEach(func() {
cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"),
map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
},
"org1-address1")
cs.Update(nil, map[string]orderers.OrdererOrg{
"org1": org1,
"org2": org2,
})

endpoints = cs.Endpoints()
})

It("does not include the self-endpoint in endpoints", func() {
Expect(len(endpoints)).To(Equal(3))
Expect(stripEndpoints(endpoints)).To(ConsistOf(
stripEndpoints([]*orderers.Endpoint{
{
Address: "org1-address2",
RootCerts: org1Certs,
},
{
Address: "org2-address1",
RootCerts: org2Certs,
},
{
Address: "org2-address2",
RootCerts: org2Certs,
},
}),
))
})

It("does not include the self endpoint in shuffled endpoints", func() {
shuffledEndpoints := cs.ShuffledEndpoints()
Expect(len(shuffledEndpoints)).To(Equal(3))
Expect(stripEndpoints(endpoints)).To(ConsistOf(
stripEndpoints([]*orderers.Endpoint{
{
Address: "org1-address2",
RootCerts: org1Certs,
},
{
Address: "org2-address1",
RootCerts: org2Certs,
},
{
Address: "org2-address2",
RootCerts: org2Certs,
},
}),
))
})

It("does not mark any of the endpoints as refreshed", func() {
for _, endpoint := range endpoints {
Expect(endpoint.Refreshed).NotTo(BeClosed())
}
})
})
})
18 changes: 17 additions & 1 deletion orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,23 @@ type BFTChain struct {
}

// NewChain creates new BFT Smart chain
func NewChain(cv ConfigValidator, selfID uint64, config types.Configuration, walDir string, blockPuller BlockPuller, clusterDialer *cluster.PredicateDialer, localConfigCluster localconfig.Cluster, comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, support consensus.ConsenterSupport, metrics *Metrics, metricsBFT *api.Metrics, metricsWalBFT *wal.Metrics, bccsp bccsp.BCCSP) (*BFTChain, error) {
func NewChain(
cv ConfigValidator,
selfID uint64,
config types.Configuration,
walDir string,
blockPuller BlockPuller,
clusterDialer *cluster.PredicateDialer,
localConfigCluster localconfig.Cluster,
comm cluster.Communicator,
signerSerializer signerSerializer,
policyManager policies.Manager,
support consensus.ConsenterSupport,
metrics *Metrics,
metricsBFT *api.Metrics,
metricsWalBFT *wal.Metrics,
bccsp bccsp.BCCSP,
) (*BFTChain, error) {
logger := flogging.MustGetLogger("orderer.consensus.smartbft.chain").With(zap.String("channel", support.ChannelID()))

requestInspector := &RequestInspector{
Expand Down
4 changes: 2 additions & 2 deletions orderer/consensus/smartbft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb
config,
path.Join(c.WALBaseDir, support.ChannelID()),
puller,
c.ClusterDialer, //TODO BFT-sync
c.Conf.General.Cluster, //TODO BFT-sync
c.ClusterDialer, // TODO BFT-sync
c.Conf.General.Cluster, // TODO BFT-sync
c.Comm,
c.SignerSerializer,
c.GetPolicyManager(support.ChannelID()),
Expand Down
9 changes: 5 additions & 4 deletions orderer/consensus/smartbft/synchronizer_bft.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ SPDX-License-Identifier: Apache-2.0
package smartbft

import (
"sort"
"time"

"github.com/SmartBFT-Go/consensus/smartbftprotos"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protoutil"
"sort"
"time"

"github.com/SmartBFT-Go/consensus/pkg/types"
"github.com/hyperledger/fabric-lib-go/bccsp"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) {
return nil, errors.Wrap(err, "cannot get create BlockPuller")
}
defer blockPuller.Close()

heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints()
if err != nil {
return nil, errors.Wrap(err, "cannot get HeightsByEndpoints")
Expand Down Expand Up @@ -155,7 +156,7 @@ func (s *BFTSynchronizer) synchronize() (*types.Decision, error) {
}

s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer)
bftDeliverer.Initialize(lastConfigEnv.GetConfig()) //TODO allow input for self endpoint
bftDeliverer.Initialize(lastConfigEnv.GetConfig(), myEndpoint)

go bftDeliverer.DeliverBlocks()
defer bftDeliverer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/smartbft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,5 +526,5 @@ func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) {
}

func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) {
return nil, errors.New("not implemented")
return nil, errors.New("not implemented: never used in orderer")
}

0 comments on commit 41ed9b6

Please sign in to comment.