Skip to content

Commit

Permalink
BFT: configure orderer replication policy (#4692)
Browse files Browse the repository at this point in the history
Add a replication policy to the orderer.yaml that allows a BFT orderer to choose
whethet it uses the "simple" synchronizer which contacts a single target orderer at a time or
the BFT "consensus" synchronizer which contacts all target orderers and is resistant to censorship.


Change-Id: I89596f5d10e636e499c36799472e593582847a9e

Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm authored Feb 24, 2024
1 parent f21c6e0 commit 9ece858
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 48 deletions.
5 changes: 5 additions & 0 deletions orderer/common/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Cluster struct {
ReplicationPullTimeout time.Duration
ReplicationRetryTimeout time.Duration
ReplicationMaxRetries int
ReplicationPolicy string // BFT: "simple" | "consensus" (default); etcdraft: ignored, always "simple"
SendBufferSize int
CertExpirationWarningThreshold time.Duration
TLSHandshakeTimeShift time.Duration
Expand Down Expand Up @@ -188,6 +189,7 @@ var Defaults = TopLevel{
ReplicationRetryTimeout: time.Second * 5,
ReplicationPullTimeout: time.Second * 5,
CertExpirationWarningThreshold: time.Hour * 24 * 7,
ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored
},
LocalMSPDir: "msp",
LocalMSPID: "SampleOrg",
Expand Down Expand Up @@ -332,6 +334,9 @@ func (c *TopLevel) completeInitialization(configDir string) {
c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout
case c.General.Cluster.CertExpirationWarningThreshold == 0:
c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold
case (c.General.Cluster.ReplicationPolicy != "simple") && (c.General.Cluster.ReplicationPolicy != "consensus"):
logger.Infof("General.Cluster.ReplicationPolicy is `%s`, setting to `%s`", c.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy)
c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy

case c.General.Profile.Enabled && c.General.Profile.Address == "":
logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", Defaults.General.Profile.Address)
Expand Down
1 change: 1 addition & 0 deletions orderer/common/localconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestClusterDefaults(t *testing.T) {
cfg, err := cc.load()
require.NoError(t, err)
require.Equal(t, cfg.General.Cluster.ReplicationMaxRetries, Defaults.General.Cluster.ReplicationMaxRetries)
require.Equal(t, cfg.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy)
}

func TestConsensusConfig(t *testing.T) {
Expand Down
107 changes: 62 additions & 45 deletions orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/fabric-protos-go/msp"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
types2 "github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
Expand Down Expand Up @@ -67,24 +68,26 @@ type signerSerializer interface {
// BFTChain implements Chain interface to wire with
// BFT smart library
type BFTChain struct {
RuntimeConfig *atomic.Value
Channel string
Config types.Configuration
BlockPuller BlockPuller
Comm cluster.Communicator
SignerSerializer signerSerializer
PolicyManager policies.Manager
Logger *flogging.FabricLogger
WALDir string
consensus *smartbft.Consensus
support consensus.ConsenterSupport
clusterService *cluster.ClusterService
verifier *Verifier
assembler *Assembler
Metrics *Metrics
MetricsBFT *api.Metrics
MetricsWalBFT *wal.Metrics
bccsp bccsp.BCCSP
RuntimeConfig *atomic.Value
Channel string
Config types.Configuration
BlockPuller BlockPuller
clusterDialer *cluster.PredicateDialer // TODO Required by BFT-synchronizer
localConfigCluster localconfig.Cluster // TODO Required by BFT-synchronizer
Comm cluster.Communicator
SignerSerializer signerSerializer
PolicyManager policies.Manager
Logger *flogging.FabricLogger
WALDir string
consensus *smartbft.Consensus
support consensus.ConsenterSupport
clusterService *cluster.ClusterService
verifier *Verifier
assembler *Assembler
Metrics *Metrics
MetricsBFT *api.Metrics
MetricsWalBFT *wal.Metrics
bccsp bccsp.BCCSP

statusReportMutex sync.Mutex
consensusRelation types2.ConsensusRelation
Expand All @@ -98,6 +101,8 @@ func NewChain(
config types.Configuration,
walDir string,
blockPuller BlockPuller,
clusterDialer *cluster.PredicateDialer,
localConfigCluster localconfig.Cluster,
comm cluster.Communicator,
signerSerializer signerSerializer,
policyManager policies.Manager,
Expand All @@ -117,18 +122,20 @@ func NewChain(
}

c := &BFTChain{
RuntimeConfig: &atomic.Value{},
Channel: support.ChannelID(),
Config: config,
WALDir: walDir,
Comm: comm,
support: support,
SignerSerializer: signerSerializer,
PolicyManager: policyManager,
BlockPuller: blockPuller,
Logger: logger,
consensusRelation: types2.ConsensusRelationConsenter,
status: types2.StatusActive,
RuntimeConfig: &atomic.Value{},
Channel: support.ChannelID(),
Config: config,
WALDir: walDir,
Comm: comm,
support: support,
SignerSerializer: signerSerializer,
PolicyManager: policyManager,
BlockPuller: blockPuller, // FIXME create internally or with a factory
clusterDialer: clusterDialer, // TODO Required by BFT-synchronizer
localConfigCluster: localConfigCluster, // TODO Required by BFT-synchronizer
Logger: logger,
consensusRelation: types2.ConsensusRelationConsenter,
status: types2.StatusActive,
Metrics: &Metrics{
ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()),
CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()),
Expand Down Expand Up @@ -202,21 +209,31 @@ func bftSmartConsensusBuild(
// report cluster size
c.Metrics.ClusterSize.Set(float64(clusterSize))

sync := &Synchronizer{
selfID: rtc.id,
BlockToDecision: c.blockToDecision,
OnCommit: func(block *cb.Block) types.Reconfig {
c.pruneCommittedRequests(block)
return c.updateRuntimeConfig(block)
},
Support: c.support,
BlockPuller: c.BlockPuller,
ClusterSize: clusterSize,
Logger: c.Logger,
LatestConfig: func() (types.Configuration, []uint64) {
rtc := c.RuntimeConfig.Load().(RuntimeConfig)
return rtc.BFTConfig, rtc.Nodes
},
var sync api.Synchronizer
switch c.localConfigCluster.ReplicationPolicy {
case "consensus":
c.Logger.Debug("BFTSynchronizer not yet available") // TODO create BFTSynchronizer when available
fallthrough
case "simple":
c.Logger.Debug("Creating simple Synchronizer")
sync = &Synchronizer{
selfID: rtc.id,
BlockToDecision: c.blockToDecision,
OnCommit: func(block *cb.Block) types.Reconfig {
c.pruneCommittedRequests(block)
return c.updateRuntimeConfig(block)
},
Support: c.support,
BlockPuller: c.BlockPuller, // FIXME this must be created dynamically as the cluster may change config
ClusterSize: clusterSize, // FIXME this must be taken dynamically from `support` as the cluster may change in size
Logger: c.Logger,
LatestConfig: func() (types.Configuration, []uint64) {
rtc := c.RuntimeConfig.Load().(RuntimeConfig)
return rtc.BFTConfig, rtc.Nodes
},
}
default:
c.Logger.Panicf("Unsupported Cluster.ReplicationPolicy: %s", c.localConfigCluster.ReplicationPolicy)
}

channelDecorator := zap.String("channel", c.support.ChannelID())
Expand Down
21 changes: 18 additions & 3 deletions orderer/consensus/smartbft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"path"
"reflect"

"github.com/hyperledger/fabric/orderer/consensus/smartbft/util"

"github.com/SmartBFT-Go/consensus/pkg/api"
"github.com/SmartBFT-Go/consensus/pkg/wal"
"github.com/golang/protobuf/proto"
Expand All @@ -34,6 +32,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/smartbft/util"
"github.com/hyperledger/fabric/protoutil"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand Down Expand Up @@ -203,7 +202,23 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb
Logger: c.Logger,
}

chain, err := NewChain(configValidator, (uint64)(selfID), config, path.Join(c.WALBaseDir, support.ChannelID()), puller, c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), support, c.Metrics, c.MetricsBFT, c.MetricsWalBFT, c.BCCSP)
chain, err := NewChain(
configValidator,
(uint64)(selfID),
config,
path.Join(c.WALBaseDir, support.ChannelID()),
puller,
c.ClusterDialer, // required by the BFT-synchronizer
c.Conf.General.Cluster, // required by the BFT-synchronizer
c.Comm,
c.SignerSerializer,
c.GetPolicyManager(support.ChannelID()),
support,
c.Metrics,
c.MetricsBFT,
c.MetricsWalBFT,
c.BCCSP,
)
if err != nil {
return nil, errors.Wrap(err, "failed creating a new BFTChain")
}
Expand Down
6 changes: 6 additions & 0 deletions sampleconfig/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ General:
# ServerPrivateKey defines the file location of the private key of the TLS certificate.
ServerPrivateKey:

# ReplicationPolicy defines how blocks are replicated between orderers.
# Permitted values:
# in BFT: "simple" | "consensus" (default);
# in etcdraft: ignored, (always "simple", regardless of value in config).
ReplicationPolicy:

# Bootstrap method: The method by which to obtain the bootstrap block
# system channel is specified. The option can be one of:
# "file" - path to a file containing the genesis block or config block of system channel
Expand Down

0 comments on commit 9ece858

Please sign in to comment.