diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 649f2e3a7b2..0cd95e80faf 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -68,6 +68,7 @@ type Cluster struct { ReplicationPullTimeout time.Duration ReplicationRetryTimeout time.Duration ReplicationMaxRetries int + ReplicationPolicy string // BFT: "simple" | "consensus" (default); etcdraft: ignored, always "simple" SendBufferSize int CertExpirationWarningThreshold time.Duration TLSHandshakeTimeShift time.Duration @@ -188,6 +189,7 @@ var Defaults = TopLevel{ ReplicationRetryTimeout: time.Second * 5, ReplicationPullTimeout: time.Second * 5, CertExpirationWarningThreshold: time.Hour * 24 * 7, + ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored }, LocalMSPDir: "msp", LocalMSPID: "SampleOrg", @@ -332,6 +334,9 @@ func (c *TopLevel) completeInitialization(configDir string) { c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout case c.General.Cluster.CertExpirationWarningThreshold == 0: c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold + case (c.General.Cluster.ReplicationPolicy != "simple") && (c.General.Cluster.ReplicationPolicy != "consensus"): + logger.Infof("General.Cluster.ReplicationPolicy is `%s`, setting to `%s`", c.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) + c.General.Cluster.ReplicationPolicy = Defaults.General.Cluster.ReplicationPolicy case c.General.Profile.Enabled && c.General.Profile.Address == "": logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", Defaults.General.Profile.Address) diff --git a/orderer/common/localconfig/config_test.go b/orderer/common/localconfig/config_test.go index 62d924dc8f9..1efb226743f 100644 --- a/orderer/common/localconfig/config_test.go +++ b/orderer/common/localconfig/config_test.go @@ -165,6 +165,7 @@ func TestClusterDefaults(t *testing.T) { cfg, err := cc.load() require.NoError(t, err) require.Equal(t, cfg.General.Cluster.ReplicationMaxRetries, Defaults.General.Cluster.ReplicationMaxRetries) + require.Equal(t, cfg.General.Cluster.ReplicationPolicy, Defaults.General.Cluster.ReplicationPolicy) } func TestConsensusConfig(t *testing.T) { diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index 1ef8d4e14ef..b6b34ccc39b 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/common/msgprocessor" types2 "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" @@ -67,24 +68,26 @@ type signerSerializer interface { // BFTChain implements Chain interface to wire with // BFT smart library type BFTChain struct { - RuntimeConfig *atomic.Value - Channel string - Config types.Configuration - BlockPuller BlockPuller - Comm cluster.Communicator - SignerSerializer signerSerializer - PolicyManager policies.Manager - Logger *flogging.FabricLogger - WALDir string - consensus *smartbft.Consensus - support consensus.ConsenterSupport - clusterService *cluster.ClusterService - verifier *Verifier - assembler *Assembler - Metrics *Metrics - MetricsBFT *api.Metrics - MetricsWalBFT *wal.Metrics - bccsp bccsp.BCCSP + RuntimeConfig *atomic.Value + Channel string + Config types.Configuration + BlockPuller BlockPuller + clusterDialer *cluster.PredicateDialer // TODO Required by BFT-synchronizer + localConfigCluster localconfig.Cluster // TODO Required by BFT-synchronizer + Comm cluster.Communicator + SignerSerializer signerSerializer + PolicyManager policies.Manager + Logger *flogging.FabricLogger + WALDir string + consensus *smartbft.Consensus + support consensus.ConsenterSupport + clusterService *cluster.ClusterService + verifier *Verifier + assembler *Assembler + Metrics *Metrics + MetricsBFT *api.Metrics + MetricsWalBFT *wal.Metrics + bccsp bccsp.BCCSP statusReportMutex sync.Mutex consensusRelation types2.ConsensusRelation @@ -98,6 +101,8 @@ func NewChain( config types.Configuration, walDir string, blockPuller BlockPuller, + clusterDialer *cluster.PredicateDialer, + localConfigCluster localconfig.Cluster, comm cluster.Communicator, signerSerializer signerSerializer, policyManager policies.Manager, @@ -117,18 +122,20 @@ func NewChain( } c := &BFTChain{ - RuntimeConfig: &atomic.Value{}, - Channel: support.ChannelID(), - Config: config, - WALDir: walDir, - Comm: comm, - support: support, - SignerSerializer: signerSerializer, - PolicyManager: policyManager, - BlockPuller: blockPuller, - Logger: logger, - consensusRelation: types2.ConsensusRelationConsenter, - status: types2.StatusActive, + RuntimeConfig: &atomic.Value{}, + Channel: support.ChannelID(), + Config: config, + WALDir: walDir, + Comm: comm, + support: support, + SignerSerializer: signerSerializer, + PolicyManager: policyManager, + BlockPuller: blockPuller, // FIXME create internally or with a factory + clusterDialer: clusterDialer, // TODO Required by BFT-synchronizer + localConfigCluster: localConfigCluster, // TODO Required by BFT-synchronizer + Logger: logger, + consensusRelation: types2.ConsensusRelationConsenter, + status: types2.StatusActive, Metrics: &Metrics{ ClusterSize: metrics.ClusterSize.With("channel", support.ChannelID()), CommittedBlockNumber: metrics.CommittedBlockNumber.With("channel", support.ChannelID()), @@ -202,21 +209,31 @@ func bftSmartConsensusBuild( // report cluster size c.Metrics.ClusterSize.Set(float64(clusterSize)) - sync := &Synchronizer{ - selfID: rtc.id, - BlockToDecision: c.blockToDecision, - OnCommit: func(block *cb.Block) types.Reconfig { - c.pruneCommittedRequests(block) - return c.updateRuntimeConfig(block) - }, - Support: c.support, - BlockPuller: c.BlockPuller, - ClusterSize: clusterSize, - Logger: c.Logger, - LatestConfig: func() (types.Configuration, []uint64) { - rtc := c.RuntimeConfig.Load().(RuntimeConfig) - return rtc.BFTConfig, rtc.Nodes - }, + var sync api.Synchronizer + switch c.localConfigCluster.ReplicationPolicy { + case "consensus": + c.Logger.Debug("BFTSynchronizer not yet available") // TODO create BFTSynchronizer when available + fallthrough + case "simple": + c.Logger.Debug("Creating simple Synchronizer") + sync = &Synchronizer{ + selfID: rtc.id, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + BlockPuller: c.BlockPuller, // FIXME this must be created dynamically as the cluster may change config + ClusterSize: clusterSize, // FIXME this must be taken dynamically from `support` as the cluster may change in size + Logger: c.Logger, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + } + default: + c.Logger.Panicf("Unsupported Cluster.ReplicationPolicy: %s", c.localConfigCluster.ReplicationPolicy) } channelDecorator := zap.String("channel", c.support.ChannelID()) diff --git a/orderer/consensus/smartbft/consenter.go b/orderer/consensus/smartbft/consenter.go index cd5a1bd11a0..4b111bde357 100644 --- a/orderer/consensus/smartbft/consenter.go +++ b/orderer/consensus/smartbft/consenter.go @@ -15,8 +15,6 @@ import ( "path" "reflect" - "github.com/hyperledger/fabric/orderer/consensus/smartbft/util" - "github.com/SmartBFT-Go/consensus/pkg/api" "github.com/SmartBFT-Go/consensus/pkg/wal" "github.com/golang/protobuf/proto" @@ -34,6 +32,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/common/multichannel" "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/smartbft/util" "github.com/hyperledger/fabric/protoutil" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -203,7 +202,23 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb Logger: c.Logger, } - chain, err := NewChain(configValidator, (uint64)(selfID), config, path.Join(c.WALBaseDir, support.ChannelID()), puller, c.Comm, c.SignerSerializer, c.GetPolicyManager(support.ChannelID()), support, c.Metrics, c.MetricsBFT, c.MetricsWalBFT, c.BCCSP) + chain, err := NewChain( + configValidator, + (uint64)(selfID), + config, + path.Join(c.WALBaseDir, support.ChannelID()), + puller, + c.ClusterDialer, // required by the BFT-synchronizer + c.Conf.General.Cluster, // required by the BFT-synchronizer + c.Comm, + c.SignerSerializer, + c.GetPolicyManager(support.ChannelID()), + support, + c.Metrics, + c.MetricsBFT, + c.MetricsWalBFT, + c.BCCSP, + ) if err != nil { return nil, errors.Wrap(err, "failed creating a new BFTChain") } diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index ee3ae31af41..36f70aa2a80 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -115,6 +115,12 @@ General: # ServerPrivateKey defines the file location of the private key of the TLS certificate. ServerPrivateKey: + # ReplicationPolicy defines how blocks are replicated between orderers. + # Permitted values: + # in BFT: "simple" | "consensus" (default); + # in etcdraft: ignored, (always "simple", regardless of value in config). + ReplicationPolicy: + # Bootstrap method: The method by which to obtain the bootstrap block # system channel is specified. The option can be one of: # "file" - path to a file containing the genesis block or config block of system channel