Skip to content

Commit

Permalink
BFT Block Puller: block deliverer, connection source (#4258)
Browse files Browse the repository at this point in the history
- Introduce an abstraction BlockDeliverer that can CFT/BFT implementations
- Expose ConnectionSource method Endpoints


Change-Id: I4c2d9b8f74a4004d55e779438e41e7a80b274504

Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm authored Jun 4, 2023
1 parent 288e093 commit 4a179d3
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 31 deletions.
12 changes: 10 additions & 2 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ type DeliverService interface {
Stop()
}

// BlockDeliverer communicates with orderers to obtain new blocks and send them to the committer service, for a
// specific channel. It can be implemented using different protocols depending on the ordering service consensus type,
// e.g CFT (etcdraft) or BFT (SmartBFT).
type BlockDeliverer interface {
Stop()
DeliverBlocks()
}

// deliverServiceImpl the implementation of the delivery service
// maintains connection to the ordering service and maps of
// blocks providers
type deliverServiceImpl struct {
conf *Config
blockProviders map[string]*blocksprovider.Deliverer
blockProviders map[string]BlockDeliverer
lock sync.RWMutex
stopping bool
}
Expand Down Expand Up @@ -78,7 +86,7 @@ type Config struct {
func NewDeliverService(conf *Config) DeliverService {
ds := &deliverServiceImpl{
conf: conf,
blockProviders: make(map[string]*blocksprovider.Deliverer),
blockProviders: make(map[string]BlockDeliverer),
}
return ds
}
Expand Down
36 changes: 20 additions & 16 deletions core/deliverservice/deliveryclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ eUCutqn1KYDMYh54i6p723cXbdDkmvL2UCciHyHdSWS9lmkKVdyNGIJ6
}

bp, ok := ds.blockProviders["channel-id"]
bpd := bp.(*blocksprovider.Deliverer)
require.True(t, ok, "map entry must exist")
require.Equal(t, "76f7a03f8dfdb0ef7c4b28b3901fe163c730e906c70e4cdf887054ad5f608bed", fmt.Sprintf("%x", bp.TLSCertHash))
require.Equal(t, "76f7a03f8dfdb0ef7c4b28b3901fe163c730e906c70e4cdf887054ad5f608bed", fmt.Sprintf("%x", bpd.TLSCertHash))
})

t.Run("Green Path without mutual TLS", func(t *testing.T) {
Expand All @@ -99,8 +100,9 @@ eUCutqn1KYDMYh54i6p723cXbdDkmvL2UCciHyHdSWS9lmkKVdyNGIJ6
}

bp, ok := ds.blockProviders["channel-id"]
bpd := bp.(*blocksprovider.Deliverer)
require.True(t, ok, "map entry must exist")
require.Nil(t, bp.TLSCertHash)
require.Nil(t, bpd.TLSCertHash)
})

t.Run("Exists", func(t *testing.T) {
Expand Down Expand Up @@ -131,11 +133,11 @@ func TestStopDeliverForChannel(t *testing.T) {
t.Run("Green path", func(t *testing.T) {
ds := NewDeliverService(&Config{}).(*deliverServiceImpl)
doneA := make(chan struct{})
ds.blockProviders = map[string]*blocksprovider.Deliverer{
"a": {
ds.blockProviders = map[string]BlockDeliverer{
"a": &blocksprovider.Deliverer{
DoneC: doneA,
},
"b": {
"b": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
}
Expand All @@ -153,11 +155,11 @@ func TestStopDeliverForChannel(t *testing.T) {

t.Run("Already stopping", func(t *testing.T) {
ds := NewDeliverService(&Config{}).(*deliverServiceImpl)
ds.blockProviders = map[string]*blocksprovider.Deliverer{
"a": {
ds.blockProviders = map[string]BlockDeliverer{
"a": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
"b": {
"b": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
}
Expand All @@ -169,11 +171,11 @@ func TestStopDeliverForChannel(t *testing.T) {

t.Run("Non-existent", func(t *testing.T) {
ds := NewDeliverService(&Config{}).(*deliverServiceImpl)
ds.blockProviders = map[string]*blocksprovider.Deliverer{
"a": {
ds.blockProviders = map[string]BlockDeliverer{
"a": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
"b": {
"b": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
}
Expand All @@ -185,18 +187,19 @@ func TestStopDeliverForChannel(t *testing.T) {

func TestStop(t *testing.T) {
ds := NewDeliverService(&Config{}).(*deliverServiceImpl)
ds.blockProviders = map[string]*blocksprovider.Deliverer{
"a": {
ds.blockProviders = map[string]BlockDeliverer{
"a": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
"b": {
"b": &blocksprovider.Deliverer{
DoneC: make(chan struct{}),
},
}
require.False(t, ds.stopping)
for _, bp := range ds.blockProviders {
bpd := bp.(*blocksprovider.Deliverer)
select {
case <-bp.DoneC:
case <-bpd.DoneC:
require.Fail(t, "block providers should not be closed")
default:
}
Expand All @@ -206,8 +209,9 @@ func TestStop(t *testing.T) {
require.True(t, ds.stopping)
require.Len(t, ds.blockProviders, 2)
for _, bp := range ds.blockProviders {
bpd := bp.(*blocksprovider.Deliverer)
select {
case <-bp.DoneC:
case <-bpd.DoneC:
default:
require.Fail(t, "block providers should te closed")
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/peer/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type BlockVerifier interface {
//go:generate counterfeiter -o fake/orderer_connection_source.go --fake-name OrdererConnectionSource . OrdererConnectionSource
type OrdererConnectionSource interface {
RandomEndpoint() (*orderers.Endpoint, error)
Endpoints() []*orderers.Endpoint
}

//go:generate counterfeiter -o fake/dialer.go --fake-name Dialer . Dialer
Expand Down
65 changes: 65 additions & 0 deletions internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions internal/pkg/peer/orderers/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func (cs *ConnectionSource) RandomEndpoint() (*Endpoint, error) {
return cs.allEndpoints[rand.Intn(len(cs.allEndpoints))], nil
}

func (cs *ConnectionSource) Endpoints() []*Endpoint {
cs.mutex.RLock()
defer cs.mutex.RUnlock()

return cs.allEndpoints
}

func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]OrdererOrg) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
Expand Down
13 changes: 0 additions & 13 deletions internal/pkg/peer/orderers/connection_internal_test.go

This file was deleted.

0 comments on commit 4a179d3

Please sign in to comment.