Skip to content

Commit

Permalink
Merge batch size in configtx for both Raft and BFT
Browse files Browse the repository at this point in the history
Signed-off-by: Emil Elizarov <emil.elizarov@ibm.com>
  • Loading branch information
Emil Elizarov committed Aug 1, 2023
1 parent 6990301 commit 9deb9a5
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 59 deletions.
38 changes: 38 additions & 0 deletions integration/ordererclient/envelope_creator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package ordererclient

import (
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/gomega"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/integration/nwo"
)

func CreateBroadcastEnvelope(n *nwo.Network, entity interface{}, channel string, data []byte) *common.Envelope {
var signer *nwo.SigningIdentity
switch creator := entity.(type) {
case *nwo.Peer:
signer = n.PeerUserSigner(creator, "Admin")
case *nwo.Orderer:
signer = n.OrdererUserSigner(creator, "Admin")
}
Expect(signer).NotTo(BeNil())

env, err := protoutil.CreateSignedEnvelope(
common.HeaderType_MESSAGE,
channel,
signer,
&common.Envelope{Payload: data},
0,
0,
)
Expect(err).NotTo(HaveOccurred())

return env
}
10 changes: 5 additions & 5 deletions integration/raft/cft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
FindLeader([]*ginkgomon.Runner{o1Runner, o2Runner, o3Runner})

By("performing operation with orderer1")
env := CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo"))
env := ordererclient.CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo"))
resp, err := ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -171,7 +171,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
By("Submitting several transactions to trigger snapshot")
o2SnapDir := path.Join(network.RootDir, "orderers", o2.ID(), "etcdraft", "snapshot")

env := CreateBroadcastEnvelope(network, o2, channelID, make([]byte, 2000))
env := ordererclient.CreateBroadcastEnvelope(network, o2, channelID, make([]byte, 2000))
for i := 1; i <= 4; i++ { // 4 < MaxSnapshotFiles(5), so that no snapshot is pruned
// Note that MaxMessageCount is 1 be default, so every tx results in a new block
resp, err := ordererclient.Broadcast(network, o2, env)
Expand Down Expand Up @@ -221,7 +221,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
}, network.EventuallyTimeout).Should(Equal(1))

By("Asserting cluster is still functional")
env = CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000))
env = ordererclient.CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000))
resp, err := ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Eventually(resp.Status, network.EventuallyTimeout).Should(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -366,7 +366,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
FindLeader([]*ginkgomon.Runner{r1, r2})

By("Submitting several transactions to trigger snapshot")
env := CreateBroadcastEnvelope(network, remainedOrderers[1], "testchannel", make([]byte, 2000))
env := ordererclient.CreateBroadcastEnvelope(network, remainedOrderers[1], "testchannel", make([]byte, 2000))
for i := 3; i <= 10; i++ {
// Note that MaxMessageCount is 1 be default, so every tx results in a new block
resp, err := ordererclient.Broadcast(network, remainedOrderers[1], env)
Expand Down Expand Up @@ -551,7 +551,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
By("Submitting tx to leader")
// This should fail because current leader steps down
// and there is no leader at this point of time
env := CreateBroadcastEnvelope(network, leader, "testchannel", []byte("foo"))
env := ordererclient.CreateBroadcastEnvelope(network, leader, "testchannel", []byte("foo"))
resp, err := ordererclient.Broadcast(network, leader, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SERVICE_UNAVAILABLE))
Expand Down
6 changes: 3 additions & 3 deletions integration/raft/channel_participation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ var _ = Describe("ChannelParticipation", func() {
}))

By("submitting transaction to orderer1")
env := CreateBroadcastEnvelope(network, peer, "participation-trophy", []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(network, peer, "participation-trophy", []byte("hello"))
resp, err := ordererclient.Broadcast(network, orderer1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_BAD_REQUEST))
Expand Down Expand Up @@ -702,14 +702,14 @@ var _ = Describe("ChannelParticipation", func() {
// submit a transaction signed by the peer and ensure it was
// committed to the ledger
func submitPeerTxn(o *nwo.Orderer, peer *nwo.Peer, n *nwo.Network, expectedChannelInfo channelparticipation.ChannelInfo) {
env := CreateBroadcastEnvelope(n, peer, expectedChannelInfo.Name, []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(n, peer, expectedChannelInfo.Name, []byte("hello"))
submitTxn(o, env, n, expectedChannelInfo)
}

// submit a transaction signed by the orderer and ensure it is
// committed to the ledger
func submitOrdererTxn(o *nwo.Orderer, n *nwo.Network, expectedChannelInfo channelparticipation.ChannelInfo) {
env := CreateBroadcastEnvelope(n, o, expectedChannelInfo.Name, []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(n, o, expectedChannelInfo.Name, []byte("hello"))
submitTxn(o, env, n, expectedChannelInfo)
}

Expand Down
23 changes: 0 additions & 23 deletions integration/raft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,6 @@ func FetchBlock(n *nwo.Network, o *nwo.Orderer, seq uint64, channel string) *com
return blk
}

func CreateBroadcastEnvelope(n *nwo.Network, entity interface{}, channel string, data []byte) *common.Envelope {
var signer *nwo.SigningIdentity
switch creator := entity.(type) {
case *nwo.Peer:
signer = n.PeerUserSigner(creator, "Admin")
case *nwo.Orderer:
signer = n.OrdererUserSigner(creator, "Admin")
}
Expect(signer).NotTo(BeNil())

env, err := protoutil.CreateSignedEnvelope(
common.HeaderType_MESSAGE,
channel,
signer,
&common.Envelope{Payload: data},
0,
0,
)
Expect(err).NotTo(HaveOccurred())

return env
}

// CreateDeliverEnvelope creates a deliver env to seek for specified block.
func CreateDeliverEnvelope(n *nwo.Network, o *nwo.Orderer, blkNum uint64, channel string) *common.Envelope {
specified := &orderer.SeekPosition{
Expand Down
16 changes: 8 additions & 8 deletions integration/raft/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}, orderers, peer, network)

By("Broadcasting envelope to testchannel")
env := CreateBroadcastEnvelope(network, peer, "testchannel", []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(network, peer, "testchannel", []byte("hello"))
resp, err := ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -852,13 +852,13 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}, orderers, peer, network)

By("Ensuring orderer4 doesn't serve testchannel2 and testchannel3")
env = CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
env = ordererclient.CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
resp, err = ordererclient.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_BAD_REQUEST))
Expect(orderer4Runner.Err()).To(gbytes.Say("channel does not exist"))

env = CreateBroadcastEnvelope(network, peer, "testchannel3", []byte("hello"))
env = ordererclient.CreateBroadcastEnvelope(network, peer, "testchannel3", []byte("hello"))
resp, err = ordererclient.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_BAD_REQUEST))
Expand Down Expand Up @@ -892,7 +892,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
Consistently(orderer4Runner.Err()).ShouldNot(gbytes.Say("ERRO"))

By("Submitting a transaction through orderer4")
env = CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
env = ordererclient.CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
resp, err = ordererclient.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -977,7 +977,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}, []*nwo.Orderer{o1}, peer, network)

By("Ensuring only orderer1 services the channel")
env := CreateBroadcastEnvelope(network, peer, "mychannel", []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(network, peer, "mychannel", []byte("hello"))
resp, err := ordererclient.Broadcast(network, o2, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_BAD_REQUEST))
Expand Down Expand Up @@ -1245,7 +1245,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}, []*nwo.Orderer{orderers[firstEvictedNode]}, peer, network)

By("Submitting tx")
env := CreateBroadcastEnvelope(network, orderers[survivor], "testchannel", []byte("foo"))
env := ordererclient.CreateBroadcastEnvelope(network, orderers[survivor], "testchannel", []byte("foo"))
resp, err := ordererclient.Broadcast(network, orderers[survivor], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -1353,7 +1353,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
channelparticipation.Join(network, o1, "testchannel", configBlock, expectedInfo)

By("Submitting tx")
env := CreateBroadcastEnvelope(network, o2, "testchannel", []byte("foo"))
env := ordererclient.CreateBroadcastEnvelope(network, o2, "testchannel", []byte("foo"))
resp, err := ordererclient.Broadcast(network, o2, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down Expand Up @@ -1595,7 +1595,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
FindLeader([]*ginkgomon.Runner{ordererRunners[4]})
}

env := CreateBroadcastEnvelope(network, orderers[4], "testchannel", []byte("hello"))
env := ordererclient.CreateBroadcastEnvelope(network, orderers[4], "testchannel", []byte("hello"))
resp, err := ordererclient.Broadcast(network, orderers[4], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Expand Down
85 changes: 85 additions & 0 deletions integration/smartbft/smartbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"syscall"
"time"

protosOrderer "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/integration/ordererclient"
"github.com/hyperledger/fabric/protoutil"

"github.com/golang/protobuf/proto"

"github.com/hyperledger/fabric/integration/channelparticipation"
Expand Down Expand Up @@ -905,6 +909,66 @@ var _ = Describe("EndToEnd Smart BFT configuration test", func() {
}
})

It("smartbft batch size max bytes config change", func() {
channel := "testchannel1"
By("Create network")
networkConfig := nwo.MultiNodeSmartBFT()
networkConfig.Channels = nil
network = nwo.New(networkConfig, testDir, client, StartPort(), components)
network.GenerateConfigTree()
network.Bootstrap()
network.EventuallyTimeout *= 2

By("Start orderers")
var ordererRunners []*ginkgomon.Runner
for _, orderer := range network.Orderers {
runner := network.OrdererRunner(orderer)
runner.Command.Env = append(runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug")
ordererRunners = append(ordererRunners, runner)
proc := ifrit.Invoke(runner)
ordererProcesses = append(ordererProcesses, proc)
Eventually(proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
}

By("Start peer")
peerRunner := network.PeerGroupRunner()
peerProcesses = ifrit.Invoke(peerRunner)
Eventually(peerProcesses.Ready(), network.EventuallyTimeout).Should(BeClosed())
peer := network.Peer("Org1", "peer0")

By("Join network to channel")
joinChannel(network, channel)

By("Waiting for followers to see the leader")
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))

By("Joining peers to channel")
orderer1 := network.Orderers[0]
network.JoinChannel(channel, orderer1, network.PeersWithChannel(channel)...)

By("Sending TX with batch size >1MB (the default batch max bytes is >10MB)")
// Old batch size max bytes is 10MB
newAbsoluteMaxBytes := 1_000_000
env := ordererclient.CreateBroadcastEnvelope(network, orderer1, channel, make([]byte, newAbsoluteMaxBytes+1))
resp, err := ordererclient.Broadcast(network, orderer1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

By("Changing the batch max bytes to 1MB")
updateBatchSize(network, peer, orderer1, channel,
func(batchSize *protosOrderer.BatchSize) {
batchSize.AbsoluteMaxBytes = uint32(newAbsoluteMaxBytes)
})

By("Sending TX with batch size >1MB")
env = ordererclient.CreateBroadcastEnvelope(network, orderer1, channel, make([]byte, newAbsoluteMaxBytes+1))
resp, err = ordererclient.Broadcast(network, orderer1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_BAD_REQUEST))
})

It("smartbft reconfiguration prevents blacklisting", func() {
channel := "testchannel1"
networkConfig := nwo.MultiNodeSmartBFT()
Expand Down Expand Up @@ -1443,3 +1507,24 @@ func deployChaincode(network *nwo.Network, channel string, testDir string) {
Label: "my_prebuilt_chaincode",
})
}

// updateBatchSize executes a config update that updates the orderer batch size
// according to the given batchSizeMutator.
func updateBatchSize(
network *nwo.Network,
peer *nwo.Peer,
orderer *nwo.Orderer,
channel string,
batchSizeMutator func(batchSize *protosOrderer.BatchSize)) {
config := nwo.GetConfig(network, peer, orderer, channel)
updatedConfig := proto.Clone(config).(*common.Config)
batchSizeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"]
batchSizeValue := &protosOrderer.BatchSize{}
Expect(proto.Unmarshal(batchSizeConfigValue.Value, batchSizeValue)).To(Succeed())
batchSizeMutator(batchSizeValue)
updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"] = &common.ConfigValue{
ModPolicy: "Admins",
Value: protoutil.MarshalOrPanic(batchSizeValue),
}
nwo.UpdateOrdererConfig(network, orderer, channel, config, updatedConfig, peer, orderer)
}
10 changes: 5 additions & 5 deletions internal/configtxgen/genesisconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,18 @@ var genesisDefaults = TopLevel{
},
},
SmartBFT: &smartbft.Options{
RequestBatchMaxCount: uint64(types.DefaultConfig.RequestBatchMaxCount),
RequestBatchMaxBytes: uint64(types.DefaultConfig.RequestBatchMaxBytes),
RequestBatchMaxCount: types.DefaultConfig.RequestBatchMaxCount,
RequestBatchMaxBytes: types.DefaultConfig.RequestBatchMaxBytes,
RequestBatchMaxInterval: types.DefaultConfig.RequestBatchMaxInterval.String(),
IncomingMessageBufferSize: uint64(types.DefaultConfig.IncomingMessageBufferSize),
RequestPoolSize: uint64(types.DefaultConfig.RequestPoolSize),
IncomingMessageBufferSize: types.DefaultConfig.IncomingMessageBufferSize,
RequestPoolSize: types.DefaultConfig.RequestPoolSize,
RequestForwardTimeout: types.DefaultConfig.RequestForwardTimeout.String(),
RequestComplainTimeout: types.DefaultConfig.RequestComplainTimeout.String(),
RequestAutoRemoveTimeout: types.DefaultConfig.RequestAutoRemoveTimeout.String(),
ViewChangeResendInterval: types.DefaultConfig.ViewChangeResendInterval.String(),
ViewChangeTimeout: types.DefaultConfig.ViewChangeTimeout.String(),
LeaderHeartbeatTimeout: types.DefaultConfig.LeaderHeartbeatTimeout.String(),
LeaderHeartbeatCount: uint64(types.DefaultConfig.LeaderHeartbeatCount),
LeaderHeartbeatCount: types.DefaultConfig.LeaderHeartbeatCount,
CollectTimeout: types.DefaultConfig.CollectTimeout.String(),
SyncOnStart: types.DefaultConfig.SyncOnStart,
SpeedUpViewChange: types.DefaultConfig.SpeedUpViewChange,
Expand Down
7 changes: 3 additions & 4 deletions orderer/consensus/smartbft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/msp"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/smartbft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
Expand Down Expand Up @@ -162,10 +161,10 @@ func (c *Consenter) ReceiverByChain(channelID string) MessageReceiver {

// HandleChain returns a new Chain instance or an error upon failure
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
configOptions := &smartbft.Options{}
consenters := support.SharedConfig().Consenters()
if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), configOptions); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
configOptions, err := createSmartBftConfig(support.SharedConfig())
if err != nil {
return nil, err
}

selfID, err := c.detectSelfID(consenters)
Expand Down
5 changes: 3 additions & 2 deletions orderer/consensus/smartbft/testdata/configtx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,10 @@ Profiles:
- *OrdererOrg
Capabilities: *OrdererCapabilities
OrdererType: BFT
BatchSize:
MaxMessageCount: 100
AbsoluteMaxBytes: 10 MB
SmartBFT:
RequestBatchMaxCount: 100
RequestBatchMaxInterval: 50ms
RequestForwardTimeout: 2s
RequestComplainTimeout: 20s
Expand All @@ -383,7 +385,6 @@ Profiles:
ViewChangeTimeout: 20s
LeaderHeartbeatTimeout: 1m0s
CollectTimeout: 1s
RequestBatchMaxBytes: 10485760
IncomingMessageBufferSize: 200
RequestPoolSize: 400
LeaderHeartbeatCount: 10
Expand Down
Loading

0 comments on commit 9deb9a5

Please sign in to comment.