Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFT synchronizer in orderer #4637

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dcBFT.Initialize(d.conf.ChannelConfig)
dcBFT.Initialize(d.conf.ChannelConfig, "")

return dcBFT, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/peer/blocksprovider/bft_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type BFTDeliverer struct {
censorshipMonitor CensorshipDetector
}

func (d *BFTDeliverer) Initialize(channelConfig *common.Config) {
func (d *BFTDeliverer) Initialize(channelConfig *common.Config, selfEndpoint string) {
d.requester = NewDeliveryRequester(
d.ChannelID,
d.Signer,
Expand All @@ -115,7 +115,7 @@ func (d *BFTDeliverer) Initialize(channelConfig *common.Config) {
)

osLogger := flogging.MustGetLogger("peer.orderers")
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger)
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, selfEndpoint)
globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider)
if err != nil {
// The bundle was created prior to calling this function, so it should not fail when we recreate it here.
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/peer/blocksprovider/bft_deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *bftDelivererTestSetup) initialize(t *testing.T) {
MaxRetryDuration: 600 * time.Second,
MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler,
}
s.d.Initialize(s.channelConfig)
s.d.Initialize(s.channelConfig, "")

s.fakeSleeper = &fake.Sleeper{}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/peer/blocksprovider/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *Deliverer) Initialize(channelConfig *cb.Config) {
)

osLogger := flogging.MustGetLogger("peer.orderers")
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger)
ordererSource := d.OrderersSourceFactory.CreateConnectionSource(osLogger, "")
globalAddresses, orgAddresses, err := extractAddresses(d.ChannelID, channelConfig, d.CryptoProvider)
if err != nil {
// The bundle was created prior to calling this function, so it should not fail when we recreate it here.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions internal/pkg/peer/orderers/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ConnectionSource struct {
orgToEndpointsHash map[string][]byte
logger *flogging.FabricLogger
overrides map[string]*Endpoint
// empty when used by a peer, or self endpoint address when the ConnectionSource is used by an orderer.
selfEndpoint string
}

type Endpoint struct {
Expand Down Expand Up @@ -56,11 +58,12 @@ type OrdererOrg struct {
RootCerts [][]byte
}

func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource {
func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint, selfEndpoint string) *ConnectionSource {
return &ConnectionSource{
orgToEndpointsHash: map[string][]byte{},
logger: logger,
overrides: overrides,
selfEndpoint: selfEndpoint,
}
}

Expand Down Expand Up @@ -202,6 +205,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer

// Note, if !hasOrgEndpoints, this for loop is a no-op
for _, address := range org.Addresses {
if address == cs.selfEndpoint {
cs.logger.Debugf("Skipping self endpoint [%s] from org specific endpoints", address)
continue
}
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Expand All @@ -228,6 +235,10 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
}

for _, address := range globalAddrs {
if address == cs.selfEndpoint {
cs.logger.Debugf("Skipping self endpoint [%s] from global endpoints", address)
continue
}
overrideEndpoint, ok := cs.overrides[address]
if ok {
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
Expand All @@ -245,5 +256,5 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
})
}

cs.logger.Debugf("Returning an orderer connection pool source with global endpoints only")
cs.logger.Debug("Returning an orderer connection pool source with global endpoints only")
}
6 changes: 3 additions & 3 deletions internal/pkg/peer/orderers/connection_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type ConnectionSourcer interface {
}

type ConnectionSourceCreator interface {
CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer
CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer
}

type ConnectionSourceFactory struct {
Overrides map[string]*Endpoint
}

func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger) ConnectionSourcer {
return NewConnectionSource(logger, f.Overrides)
func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfEndpoint string) ConnectionSourcer {
return NewConnectionSource(logger, f.Overrides, selfEndpoint)
}
4 changes: 2 additions & 2 deletions internal/pkg/peer/orderers/connection_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCreateConnectionSource(t *testing.T) {
require.NotNil(t, factory)
require.Nil(t, factory.Overrides)
lg := flogging.MustGetLogger("test")
connSource := factory.CreateConnectionSource(lg)
connSource := factory.CreateConnectionSource(lg, "")
require.NotNil(t, connSource)

overrides := make(map[string]*orderers.Endpoint)
Expand All @@ -31,6 +31,6 @@ func TestCreateConnectionSource(t *testing.T) {
factory = &orderers.ConnectionSourceFactory{Overrides: overrides}
require.NotNil(t, factory)
require.Len(t, factory.Overrides, 1)
connSource = factory.CreateConnectionSource(lg)
connSource = factory.CreateConnectionSource(lg, "")
require.NotNil(t, connSource)
}
84 changes: 75 additions & 9 deletions internal/pkg/peer/orderers/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"sort"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/hyperledger/fabric-lib-go/common/flogging"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

type comparableEndpoint struct {
Expand Down Expand Up @@ -83,12 +82,13 @@ var _ = Describe("Connection", func() {
org2Certs = [][]byte{cert3}
overrideCerts = [][]byte{cert2}

cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
})
cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"),
map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
}, "") // << no self endpoint
cs.Update(nil, map[string]orderers.OrdererOrg{
"org1": org1,
"org2": org2,
Expand Down Expand Up @@ -567,4 +567,70 @@ var _ = Describe("Connection", func() {
})
})
})

When("self-endpoint exists as in the orderer", func() {
BeforeEach(func() {
cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"),
map[string]*orderers.Endpoint{
"override-address": {
Address: "re-mapped-address",
RootCerts: overrideCerts,
},
},
"org1-address1")
cs.Update(nil, map[string]orderers.OrdererOrg{
"org1": org1,
"org2": org2,
})

endpoints = cs.Endpoints()
})

It("does not include the self-endpoint in endpoints", func() {
Expect(len(endpoints)).To(Equal(3))
Expect(stripEndpoints(endpoints)).To(ConsistOf(
stripEndpoints([]*orderers.Endpoint{
{
Address: "org1-address2",
RootCerts: org1Certs,
},
{
Address: "org2-address1",
RootCerts: org2Certs,
},
{
Address: "org2-address2",
RootCerts: org2Certs,
},
}),
))
})

It("does not include the self endpoint in shuffled endpoints", func() {
shuffledEndpoints := cs.ShuffledEndpoints()
Expect(len(shuffledEndpoints)).To(Equal(3))
Expect(stripEndpoints(endpoints)).To(ConsistOf(
stripEndpoints([]*orderers.Endpoint{
{
Address: "org1-address2",
RootCerts: org1Certs,
},
{
Address: "org2-address1",
RootCerts: org2Certs,
},
{
Address: "org2-address2",
RootCerts: org2Certs,
},
}),
))
})

It("does not mark any of the endpoints as refreshed", func() {
for _, endpoint := range endpoints {
Expect(endpoint.Refreshed).NotTo(BeClosed())
}
})
})
})
2 changes: 1 addition & 1 deletion orderer/common/cluster/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type BlockPuller struct {

// A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this
// channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe.
// It is the responsibility of the 'stopper' not to close the channel more then once.
// It is the responsibility of the 'stopper' not to close the channel more than once.
StopChannel chan struct{}

// Internal state
Expand Down
5 changes: 5 additions & 0 deletions orderer/common/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Cluster struct {
ReplicationPullTimeout time.Duration
ReplicationRetryTimeout time.Duration
ReplicationMaxRetries int
ReplicationPolicy string // BFT: "simple" | "consensus" (default); etcdraft: ignored, always "simple"
SendBufferSize int
CertExpirationWarningThreshold time.Duration
TLSHandshakeTimeShift time.Duration
Expand Down Expand Up @@ -188,6 +189,7 @@ var Defaults = TopLevel{
ReplicationRetryTimeout: time.Second * 5,
ReplicationPullTimeout: time.Second * 5,
CertExpirationWarningThreshold: time.Hour * 24 * 7,
ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored
},
LocalMSPDir: "msp",
LocalMSPID: "SampleOrg",
Expand Down Expand Up @@ -332,6 +334,9 @@ func (c *TopLevel) completeInitialization(configDir string) {
c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout
case c.General.Cluster.CertExpirationWarningThreshold == 0:
c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold
case (c.General.Cluster.ReplicationPolicy != "simple") && (c.General.Cluster.ReplicationPolicy != "consensus"):
logger.Infof("General.Cluster.ReplicationPolicy is `%s`, setting to `%s`", c.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy)
c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy

case c.General.Profile.Enabled && c.General.Profile.Address == "":
logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", Defaults.General.Profile.Address)
Expand Down
1 change: 1 addition & 0 deletions orderer/common/localconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestClusterDefaults(t *testing.T) {
cfg, err := cc.load()
require.NoError(t, err)
require.Equal(t, cfg.General.Cluster.ReplicationMaxRetries, Defaults.General.Cluster.ReplicationMaxRetries)
require.Equal(t, cfg.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy)
}

func TestConsensusConfig(t *testing.T) {
Expand Down
Loading