Skip to content

Commit

Permalink
BFT Block Puller: updatable connection source (#4571)
Browse files Browse the repository at this point in the history
Change-Id: Id0b80df892595cf2b9d7b7b50bfa9069f0eb6905

Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm authored Jan 8, 2024
1 parent cf07d0e commit 6d6986a
Show file tree
Hide file tree
Showing 22 changed files with 806 additions and 322 deletions.
55 changes: 55 additions & 0 deletions common/deliverclient/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverclient

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

var ErrNotAConfig = errors.New("not a config block")

// ConfigFromBlock returns a ConfigEnvelope if exists, or a *ErrNotAConfig error.
// It may also return some other error in case parsing failed.
func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) {
if block == nil || block.Data == nil || len(block.Data.Data) == 0 {
return nil, errors.New("empty block")
}
txn := block.Data.Data[0]
env, err := protoutil.GetEnvelopeFromBlock(txn)
if err != nil {
return nil, errors.WithStack(err)
}
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
return nil, errors.WithStack(err)
}
if block.Header.Number == 0 {
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
if payload.Header == nil {
return nil, errors.New("nil header in payload")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, errors.WithStack(err)
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, ErrNotAConfig
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
106 changes: 106 additions & 0 deletions common/deliverclient/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverclient_test

import (
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/deliverclient"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

func TestConfigFromBlockBadInput(t *testing.T) {
for _, testCase := range []struct {
name string
block *common.Block
expectedError string
}{
{
name: "nil block",
expectedError: "empty block",
block: nil,
},
{
name: "nil block data",
expectedError: "empty block",
block: &common.Block{},
},
{
name: "no data in block",
expectedError: "empty block",
block: &common.Block{Data: &common.BlockData{}},
},
{
name: "invalid payload",
expectedError: "error unmarshalling Envelope",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}},
},
{
name: "bad genesis block",
expectedError: "invalid config envelope",
block: &common.Block{
Header: &common.BlockHeader{}, Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Data: []byte{1, 2, 3},
}),
})}},
},
},
{
name: "invalid envelope in block",
expectedError: "error unmarshalling Envelope",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}},
},
{
name: "invalid payload in block envelope",
expectedError: "error unmarshalling Payload",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: []byte{1, 2, 3},
})}}},
},
{
name: "invalid channel header",
expectedError: "error unmarshalling ChannelHeader",
block: &common.Block{
Header: &common.BlockHeader{Number: 1},
Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChannelHeader: []byte{1, 2, 3},
},
}),
})}},
},
},
{
name: "invalid config block",
expectedError: "invalid config envelope",
block: &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Data: []byte{1, 2, 3},
Header: &common.Header{
ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{
Type: int32(common.HeaderType_CONFIG),
}),
},
}),
})}},
},
},
} {
t.Run(testCase.name, func(t *testing.T) {
conf, err := deliverclient.ConfigFromBlock(testCase.block)
require.Nil(t, conf)
require.Error(t, err)
require.Contains(t, err.Error(), testCase.expectedError)
})
}
}
29 changes: 16 additions & 13 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type Config struct {
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer.
Gossip blocksprovider.GossipServiceAdapter
// OrdererSource provides orderer endpoints, complete with TLS cert pools.
OrdererSource *orderers.ConnectionSource
// OrdererEndpointOverrides provides peer-specific orderer endpoints overrides.
// These are loaded once when the peer starts.
OrdererEndpointOverrides map[string]*orderers.Endpoint
// Signer is the identity used to sign requests.
Signer identity.SignerSerializer
// DeliverServiceConfig is the configuration object.
Expand Down Expand Up @@ -191,14 +192,15 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return !d.conf.IsStaticLeader
},
Expand All @@ -212,7 +214,7 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
dc.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dc.Initialize()
dc.Initialize(d.conf.ChannelConfig)

return dc, nil
}
Expand Down Expand Up @@ -254,7 +256,8 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Expand All @@ -277,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dcBFT.Initialize()
dcBFT.Initialize(d.conf.ChannelConfig)

return dcBFT, nil
}
Expand Down
25 changes: 1 addition & 24 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,36 +288,13 @@ func (p *Peer) createChannel(
mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
}

osLogger := flogging.MustGetLogger("peer.orderers")
namedOSLogger := osLogger.With("channel", cid)
ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides)

ordererSourceCallback := func(bundle *channelconfig.Bundle) {
globalAddresses := bundle.ChannelConfig().OrdererAddresses()
orgAddresses := map[string]orderers.OrdererOrg{}
if ordererConfig, ok := bundle.OrdererConfig(); ok {
for orgName, org := range ordererConfig.Organizations() {
var certs [][]byte
certs = append(certs, org.MSP().GetTLSRootCerts()...)
certs = append(certs, org.MSP().GetTLSIntermediateCerts()...)

orgAddresses[orgName] = orderers.OrdererOrg{
Addresses: org.Endpoints(),
RootCerts: certs,
}
}
}
ordererSource.Update(globalAddresses, orgAddresses)
}

channel := &Channel{
ledger: l,
resources: bundle,
cryptoProvider: p.CryptoProvider,
}

callbacks := []channelconfig.BundleActor{
ordererSourceCallback,
gossipCallbackWrapper,
trustedRootsCallbackWrapper,
mspCallback,
Expand Down Expand Up @@ -373,7 +350,7 @@ func (p *Peer) createChannel(

p.GossipService.InitializeChannel(
bundle.ConfigtxValidator().ChannelID(),
ordererSource,
p.OrdererEndpointOverrides,
store,
gossipservice.Support{
Validator: validator,
Expand Down
30 changes: 18 additions & 12 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type GossipServiceAdapter interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
}

type deliveryFactoryImpl struct {
Expand All @@ -139,21 +139,20 @@ type deliveryFactoryImpl struct {
// Returns an instance of delivery service
func (df *deliveryFactoryImpl) Service(
g GossipServiceAdapter,
ordererSource *orderers.ConnectionSource,
mcs api.MessageCryptoService, // TODO remove
ordererEndpointOverrides map[string]*orderers.Endpoint,
isStaticLead bool,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) deliverservice.DeliverService {
return deliverservice.NewDeliverService(
&deliverservice.Config{
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererSource: ordererSource,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererEndpointOverrides: ordererEndpointOverrides,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
})
}

Expand Down Expand Up @@ -334,7 +333,14 @@ type Support struct {
}

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *GossipService) InitializeChannel(channelID string, ordererSource *orderers.ConnectionSource, store *transientstore.Store, support Support, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) {
func (g *GossipService) InitializeChannel(
channelID string,
ordererEndpointOverrides map[string]*orderers.Endpoint,
store *transientstore.Store,
support Support,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
Expand Down Expand Up @@ -393,7 +399,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
blockingMode,
stateConfig)
if g.deliveryService[channelID] == nil {
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererEndpointOverrides, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
}

// Delivery service might be nil only if it was not able to get connected
Expand Down
Loading

0 comments on commit 6d6986a

Please sign in to comment.