Skip to content

Commit

Permalink
refactor for worker interface
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Yuan <yy19902439@126.com>
  • Loading branch information
SamYuan1990 committed Jul 3, 2021
1 parent 95f608d commit 34d9664
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 132 deletions.
5 changes: 2 additions & 3 deletions pkg/infra/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package infra
import (
"context"
"testing"
"time"

"tape/e2e/mock"

Expand Down Expand Up @@ -58,10 +57,10 @@ func BenchmarkPeerEndorsement4(b *testing.B) { benchmarkNPeer(4, b) }
func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) }

func benchmarkAsyncCollector(concurrent int, b *testing.B) {
instance, _ := NewBlockCollector(concurrent, concurrent)
block := make(chan *AddressedBlock, 100)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, b.N, time.Now(), false)
instance, _ := NewBlockCollector(concurrent, concurrent, context.Background(), block, done, b.N, false)
go instance.Start()

b.ReportAllocs()
b.ResetTimer()
Expand Down
59 changes: 32 additions & 27 deletions pkg/infra/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ import (
// on a certain number of peers within network.
type BlockCollector struct {
sync.Mutex
thresholdP, totalP int
totalTx int
registry map[uint64]*bitmap.BitMap
thresholdP, totalP, totalTx int
registry map[uint64]*bitmap.BitMap
ctx context.Context
blockCh <-chan *AddressedBlock
finishCh chan struct{}
printResult bool // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
}

// AddressedBlock describe the source of block
Expand All @@ -28,34 +31,37 @@ type AddressedBlock struct {
}

// NewBlockCollector creates a BlockCollector
func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
func NewBlockCollector(threshold int, totalP int,
ctx context.Context,
blockCh <-chan *AddressedBlock,
finishCh chan struct{},
totalTx int,
printResult bool) (*BlockCollector, error) {
registry := make(map[uint64]*bitmap.BitMap)
if threshold <= 0 || total <= 0 {
if threshold <= 0 || totalP <= 0 {
return nil, errors.New("threshold and total must be greater than zero")
}
if threshold > total {
return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, total)
if threshold > totalP {
return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, totalP)
}
return &BlockCollector{
thresholdP: threshold,
totalP: total,
registry: registry,
thresholdP: threshold,
totalP: totalP,
totalTx: totalTx,
registry: registry,
ctx: ctx,
blockCh: blockCh,
finishCh: finishCh,
printResult: printResult,
}, nil
}

func (bc *BlockCollector) Start(
ctx context.Context,
blockCh <-chan *AddressedBlock,
finishCh chan struct{},
totalTx int,
now time.Time,
printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
) {
func (bc *BlockCollector) Start() {
for {
select {
case block := <-blockCh:
bc.commit(block, finishCh, totalTx, now, printResult)
case <-ctx.Done():
case block := <-bc.blockCh:
bc.commit(block, time.Now())
case <-bc.ctx.Done():
return
}
}
Expand All @@ -65,7 +71,7 @@ func (bc *BlockCollector) Start(
// commit commits a block to collector.
// If the number of peers on which this block has been committed has satisfied thresholdP,
// adds the number to the totalTx.
func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) {
func (bc *BlockCollector) commit(block *AddressedBlock, now time.Time) {
bitMap, ok := bc.registry[block.Number]
if !ok {
// The block with Number is received for the first time
Expand All @@ -86,13 +92,12 @@ func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{},

// newly committed block just hits threshold
if cnt == bc.thresholdP {
if printResult {
if bc.printResult {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
}

bc.totalTx += len(block.FilteredTransactions)
if bc.totalTx >= totalTx {
close(finishCh)
bc.totalTx -= len(block.FilteredTransactions)
if bc.totalTx <= 0 {
close(bc.finishCh)
}
}

Expand Down
74 changes: 38 additions & 36 deletions pkg/infra/block_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"sync"
"tape/pkg/infra"
"time"

"github.com/hyperledger/fabric-protos-go/peer"
. "github.com/onsi/ginkgo"
Expand All @@ -19,12 +18,12 @@ var _ = Describe("BlockCollector", func() {

Context("Async Commit", func() {
It("should work with threshold 1 and observer 1", func() {
instance, err := infra.NewBlockCollector(1, 1)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
instance, err := infra.NewBlockCollector(1, 1, context.Background(), block, done, 2, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

block <- newAddressedBlock(0, 0)
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -33,12 +32,12 @@ var _ = Describe("BlockCollector", func() {
})

It("should work with threshold 1 and observer 2", func() {
instance, err := infra.NewBlockCollector(1, 2)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
instance, err := infra.NewBlockCollector(1, 2, context.Background(), block, done, 2, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

block <- newAddressedBlock(0, 0)
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -55,12 +54,12 @@ var _ = Describe("BlockCollector", func() {
})

It("should work with threshold 4 and observer 4", func() {
instance, err := infra.NewBlockCollector(4, 4)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
instance, err := infra.NewBlockCollector(4, 4, context.Background(), block, done, 2, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

block <- newAddressedBlock(0, 1)
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -82,12 +81,12 @@ var _ = Describe("BlockCollector", func() {
})

It("should work with threshold 2 and observer 4", func() {
instance, err := infra.NewBlockCollector(2, 4)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 1, time.Now(), false)
instance, err := infra.NewBlockCollector(2, 4, context.Background(), block, done, 1, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

block <- newAddressedBlock(0, 0)
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -96,12 +95,12 @@ var _ = Describe("BlockCollector", func() {
})

PIt("should not count tx for repeated block", func() {
instance, err := infra.NewBlockCollector(1, 1)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 2, time.Now(), false)
instance, err := infra.NewBlockCollector(1, 1, context.Background(), block, done, 2, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

block <- newAddressedBlock(0, 0)
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -113,28 +112,32 @@ var _ = Describe("BlockCollector", func() {
})

It("should return err when threshold is greater than total", func() {
instance, err := infra.NewBlockCollector(2, 1)
block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
instance, err := infra.NewBlockCollector(2, 1, context.Background(), block, done, 2, false)
Expect(err).Should(MatchError("threshold [2] must be less than or equal to total [1]"))
Expect(instance).Should(BeNil())
})

It("should return err when threshold or total is zero", func() {
instance, err := infra.NewBlockCollector(0, 1)
block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
instance, err := infra.NewBlockCollector(0, 1, context.Background(), block, done, 2, false)
Expect(err).Should(MatchError("threshold and total must be greater than zero"))
Expect(instance).Should(BeNil())

instance, err = infra.NewBlockCollector(1, 0)
instance, err = infra.NewBlockCollector(1, 0, context.Background(), block, done, 2, false)
Expect(err).Should(MatchError("threshold and total must be greater than zero"))
Expect(instance).Should(BeNil())
})

It("Should supports parallel committers", func() {
instance, err := infra.NewBlockCollector(100, 100)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 1, time.Now(), false)
instance, err := infra.NewBlockCollector(100, 100, context.Background(), block, done, 1, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

var wg sync.WaitGroup
wg.Add(100)
Expand All @@ -149,12 +152,12 @@ var _ = Describe("BlockCollector", func() {
})

It("Should supports threshold 3 and observer 5 as parallel committers", func() {
instance, err := infra.NewBlockCollector(3, 5)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 10, time.Now(), false)
instance, err := infra.NewBlockCollector(3, 5, context.Background(), block, done, 10, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start()

for i := 0; i < 3; i++ {
go func(idx int) {
Expand All @@ -167,13 +170,12 @@ var _ = Describe("BlockCollector", func() {
})

It("Should supports threshold 5 and observer 5 as parallel committers", func() {
instance, err := infra.NewBlockCollector(5, 5)
Expect(err).NotTo(HaveOccurred())

block := make(chan *infra.AddressedBlock)
done := make(chan struct{})
instance, err := infra.NewBlockCollector(5, 5, context.Background(), block, done, 10, false)
Expect(err).NotTo(HaveOccurred())

go instance.Start(context.Background(), block, done, 10, time.Now(), false)
go instance.Start()
for i := 0; i < 5; i++ {
go func(idx int) {
for j := 0; j < 10; j++ {
Expand Down
1 change: 0 additions & 1 deletion pkg/infra/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Crypto interface {
}

/*
type consmuer and producer interface
as Tape major as Producer and Consumer pattern
define an interface here as Worker with start here
as for #56 and #174,in cli imp adjust sequence of P&C impl to control workflow.
Expand Down
22 changes: 15 additions & 7 deletions pkg/infra/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
)

type Observers struct {
workers []*Observer
workers []*Observer
errorCh chan error
blockCh chan *AddressedBlock
StartTime time.Time
}

type Observer struct {
Expand All @@ -20,22 +23,27 @@ type Observer struct {
logger *log.Logger
}

func CreateObservers(ctx context.Context, channel string, nodes []Node, crypto Crypto, logger *log.Logger) (*Observers, error) {
func CreateObservers(ctx context.Context, crypto Crypto, errorCh chan error, blockCh chan *AddressedBlock, config Config, logger *log.Logger) (*Observers, error) {
var workers []*Observer
for i, node := range nodes {
worker, err := CreateObserver(ctx, channel, node, crypto, logger)
for i, node := range config.Committers {
worker, err := CreateObserver(ctx, config.Channel, node, crypto, logger)
if err != nil {
return nil, err
}
worker.index = i
workers = append(workers, worker)
}
return &Observers{workers: workers}, nil
return &Observers{
workers: workers,
errorCh: errorCh,
blockCh: blockCh,
}, nil
}

func (o *Observers) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) {
func (o *Observers) Start() {
o.StartTime = time.Now()
for i := 0; i < len(o.workers); i++ {
go o.workers[i].Start(errorCh, blockCh, now)
go o.workers[i].Start(o.errorCh, o.blockCh, o.StartTime)
}
}

Expand Down
Loading

0 comments on commit 34d9664

Please sign in to comment.