From 995b40b8717dd1cc21b5462b07891b3ad740471d Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Sun, 4 Jun 2023 12:39:20 +0300 Subject: [PATCH] BFT Block Puller: block deliverer, connection source - Introduce an abstraction BlockDeliverer that can CFT/BFT implementations - Expose ConnectionSource method Endpoints Signed-off-by: Yoav Tock Change-Id: I4c2d9b8f74a4004d55e779438e41e7a80b274504 --- core/deliverservice/deliveryclient.go | 12 +++- core/deliverservice/deliveryclient_test.go | 36 +++++----- .../pkg/peer/blocksprovider/blocksprovider.go | 1 + .../fake/orderer_connection_source.go | 65 +++++++++++++++++++ internal/pkg/peer/orderers/connection.go | 7 ++ .../peer/orderers/connection_internal_test.go | 13 ---- 6 files changed, 103 insertions(+), 31 deletions(-) delete mode 100644 internal/pkg/peer/orderers/connection_internal_test.go diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 507023d25cb..05fa06e89c3 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -41,12 +41,20 @@ type DeliverService interface { Stop() } +// BlockDeliverer communicates with orderers to obtain new blocks and send them to the committer service, for a +// specific channel. It can be implemented using different protocols depending on the ordering service consensus type, +// e.g CFT (etcdraft) or BFT (SmartBFT). +type BlockDeliverer interface { + Stop() + DeliverBlocks() +} + // deliverServiceImpl the implementation of the delivery service // maintains connection to the ordering service and maps of // blocks providers type deliverServiceImpl struct { conf *Config - blockProviders map[string]*blocksprovider.Deliverer + blockProviders map[string]BlockDeliverer lock sync.RWMutex stopping bool } @@ -78,7 +86,7 @@ type Config struct { func NewDeliverService(conf *Config) DeliverService { ds := &deliverServiceImpl{ conf: conf, - blockProviders: make(map[string]*blocksprovider.Deliverer), + blockProviders: make(map[string]BlockDeliverer), } return ds } diff --git a/core/deliverservice/deliveryclient_test.go b/core/deliverservice/deliveryclient_test.go index b172c0c59d6..bd34c643688 100644 --- a/core/deliverservice/deliveryclient_test.go +++ b/core/deliverservice/deliveryclient_test.go @@ -77,8 +77,9 @@ eUCutqn1KYDMYh54i6p723cXbdDkmvL2UCciHyHdSWS9lmkKVdyNGIJ6 } bp, ok := ds.blockProviders["channel-id"] + bpd := bp.(*blocksprovider.Deliverer) require.True(t, ok, "map entry must exist") - require.Equal(t, "76f7a03f8dfdb0ef7c4b28b3901fe163c730e906c70e4cdf887054ad5f608bed", fmt.Sprintf("%x", bp.TLSCertHash)) + require.Equal(t, "76f7a03f8dfdb0ef7c4b28b3901fe163c730e906c70e4cdf887054ad5f608bed", fmt.Sprintf("%x", bpd.TLSCertHash)) }) t.Run("Green Path without mutual TLS", func(t *testing.T) { @@ -99,8 +100,9 @@ eUCutqn1KYDMYh54i6p723cXbdDkmvL2UCciHyHdSWS9lmkKVdyNGIJ6 } bp, ok := ds.blockProviders["channel-id"] + bpd := bp.(*blocksprovider.Deliverer) require.True(t, ok, "map entry must exist") - require.Nil(t, bp.TLSCertHash) + require.Nil(t, bpd.TLSCertHash) }) t.Run("Exists", func(t *testing.T) { @@ -131,11 +133,11 @@ func TestStopDeliverForChannel(t *testing.T) { t.Run("Green path", func(t *testing.T) { ds := NewDeliverService(&Config{}).(*deliverServiceImpl) doneA := make(chan struct{}) - ds.blockProviders = map[string]*blocksprovider.Deliverer{ - "a": { + ds.blockProviders = map[string]BlockDeliverer{ + "a": &blocksprovider.Deliverer{ DoneC: doneA, }, - "b": { + "b": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, } @@ -153,11 +155,11 @@ func TestStopDeliverForChannel(t *testing.T) { t.Run("Already stopping", func(t *testing.T) { ds := NewDeliverService(&Config{}).(*deliverServiceImpl) - ds.blockProviders = map[string]*blocksprovider.Deliverer{ - "a": { + ds.blockProviders = map[string]BlockDeliverer{ + "a": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, - "b": { + "b": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, } @@ -169,11 +171,11 @@ func TestStopDeliverForChannel(t *testing.T) { t.Run("Non-existent", func(t *testing.T) { ds := NewDeliverService(&Config{}).(*deliverServiceImpl) - ds.blockProviders = map[string]*blocksprovider.Deliverer{ - "a": { + ds.blockProviders = map[string]BlockDeliverer{ + "a": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, - "b": { + "b": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, } @@ -185,18 +187,19 @@ func TestStopDeliverForChannel(t *testing.T) { func TestStop(t *testing.T) { ds := NewDeliverService(&Config{}).(*deliverServiceImpl) - ds.blockProviders = map[string]*blocksprovider.Deliverer{ - "a": { + ds.blockProviders = map[string]BlockDeliverer{ + "a": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, - "b": { + "b": &blocksprovider.Deliverer{ DoneC: make(chan struct{}), }, } require.False(t, ds.stopping) for _, bp := range ds.blockProviders { + bpd := bp.(*blocksprovider.Deliverer) select { - case <-bp.DoneC: + case <-bpd.DoneC: require.Fail(t, "block providers should not be closed") default: } @@ -206,8 +209,9 @@ func TestStop(t *testing.T) { require.True(t, ds.stopping) require.Len(t, ds.blockProviders, 2) for _, bp := range ds.blockProviders { + bpd := bp.(*blocksprovider.Deliverer) select { - case <-bp.DoneC: + case <-bpd.DoneC: default: require.Fail(t, "block providers should te closed") } diff --git a/internal/pkg/peer/blocksprovider/blocksprovider.go b/internal/pkg/peer/blocksprovider/blocksprovider.go index 45f602042d8..a29f65954d1 100644 --- a/internal/pkg/peer/blocksprovider/blocksprovider.go +++ b/internal/pkg/peer/blocksprovider/blocksprovider.go @@ -76,6 +76,7 @@ type BlockVerifier interface { //go:generate counterfeiter -o fake/orderer_connection_source.go --fake-name OrdererConnectionSource . OrdererConnectionSource type OrdererConnectionSource interface { RandomEndpoint() (*orderers.Endpoint, error) + Endpoints() []*orderers.Endpoint } //go:generate counterfeiter -o fake/dialer.go --fake-name Dialer . Dialer diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go index 2921d8f2081..97b1f60da14 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go @@ -9,6 +9,16 @@ import ( ) type OrdererConnectionSource struct { + EndpointsStub func() []*orderers.Endpoint + endpointsMutex sync.RWMutex + endpointsArgsForCall []struct { + } + endpointsReturns struct { + result1 []*orderers.Endpoint + } + endpointsReturnsOnCall map[int]struct { + result1 []*orderers.Endpoint + } RandomEndpointStub func() (*orderers.Endpoint, error) randomEndpointMutex sync.RWMutex randomEndpointArgsForCall []struct { @@ -25,6 +35,59 @@ type OrdererConnectionSource struct { invocationsMutex sync.RWMutex } +func (fake *OrdererConnectionSource) Endpoints() []*orderers.Endpoint { + fake.endpointsMutex.Lock() + ret, specificReturn := fake.endpointsReturnsOnCall[len(fake.endpointsArgsForCall)] + fake.endpointsArgsForCall = append(fake.endpointsArgsForCall, struct { + }{}) + stub := fake.EndpointsStub + fakeReturns := fake.endpointsReturns + fake.recordInvocation("Endpoints", []interface{}{}) + fake.endpointsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConnectionSource) EndpointsCallCount() int { + fake.endpointsMutex.RLock() + defer fake.endpointsMutex.RUnlock() + return len(fake.endpointsArgsForCall) +} + +func (fake *OrdererConnectionSource) EndpointsCalls(stub func() []*orderers.Endpoint) { + fake.endpointsMutex.Lock() + defer fake.endpointsMutex.Unlock() + fake.EndpointsStub = stub +} + +func (fake *OrdererConnectionSource) EndpointsReturns(result1 []*orderers.Endpoint) { + fake.endpointsMutex.Lock() + defer fake.endpointsMutex.Unlock() + fake.EndpointsStub = nil + fake.endpointsReturns = struct { + result1 []*orderers.Endpoint + }{result1} +} + +func (fake *OrdererConnectionSource) EndpointsReturnsOnCall(i int, result1 []*orderers.Endpoint) { + fake.endpointsMutex.Lock() + defer fake.endpointsMutex.Unlock() + fake.EndpointsStub = nil + if fake.endpointsReturnsOnCall == nil { + fake.endpointsReturnsOnCall = make(map[int]struct { + result1 []*orderers.Endpoint + }) + } + fake.endpointsReturnsOnCall[i] = struct { + result1 []*orderers.Endpoint + }{result1} +} + func (fake *OrdererConnectionSource) RandomEndpoint() (*orderers.Endpoint, error) { fake.randomEndpointMutex.Lock() ret, specificReturn := fake.randomEndpointReturnsOnCall[len(fake.randomEndpointArgsForCall)] @@ -84,6 +147,8 @@ func (fake *OrdererConnectionSource) RandomEndpointReturnsOnCall(i int, result1 func (fake *OrdererConnectionSource) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.endpointsMutex.RLock() + defer fake.endpointsMutex.RUnlock() fake.randomEndpointMutex.RLock() defer fake.randomEndpointMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index 03a65be36a8..54f544fdf6b 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -53,6 +53,13 @@ func (cs *ConnectionSource) RandomEndpoint() (*Endpoint, error) { return cs.allEndpoints[rand.Intn(len(cs.allEndpoints))], nil } +func (cs *ConnectionSource) Endpoints() []*Endpoint { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + + return cs.allEndpoints +} + func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]OrdererOrg) { cs.mutex.Lock() defer cs.mutex.Unlock() diff --git a/internal/pkg/peer/orderers/connection_internal_test.go b/internal/pkg/peer/orderers/connection_internal_test.go deleted file mode 100644 index 0a59482cc42..00000000000 --- a/internal/pkg/peer/orderers/connection_internal_test.go +++ /dev/null @@ -1,13 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package orderers - -func (cs *ConnectionSource) Endpoints() []*Endpoint { - cs.mutex.RLock() - defer cs.mutex.RUnlock() - return cs.allEndpoints -}