diff --git a/pkg/infra/benchmark_test.go b/pkg/infra/benchmark_test.go index ddbd82d9..ab775c12 100644 --- a/pkg/infra/benchmark_test.go +++ b/pkg/infra/benchmark_test.go @@ -60,8 +60,10 @@ func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) } func benchmarkAsyncCollector(concurrent int, b *testing.B) { instance, _ := NewBlockCollector(concurrent, concurrent) block := make(chan *AddressedBlock, 100) + successRateBlock := make(chan *AddressedBlock, 100) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, b.N, time.Now(), false) + go instance.Start(context.Background(), block, successRateBlock, done, b.N, time.Now(), false) + go CalSuccessRate(concurrent, b.N, successRateBlock) b.ReportAllocs() b.ResetTimer() @@ -71,9 +73,9 @@ func benchmarkAsyncCollector(concurrent int, b *testing.B) { block <- &AddressedBlock{ FilteredBlock: &peer.FilteredBlock{ Number: uint64(j), - FilteredTransactions: make([]*peer.FilteredTransaction, 1), + FilteredTransactions: []*peer.FilteredTransaction{{TxValidationCode: peer.TxValidationCode_VALID}}, }, - Address: idx, + PeerIdx: idx, } } }(i) diff --git a/pkg/infra/block_collector.go b/pkg/infra/block_collector.go index 87a03d35..5fba4428 100644 --- a/pkg/infra/block_collector.go +++ b/pkg/infra/block_collector.go @@ -24,7 +24,12 @@ type BlockCollector struct { // AddressedBlock describe the source of block type AddressedBlock struct { *peer.FilteredBlock - Address int // source peer's number + PeerIdx int // source peer's number +} + +type collectorBlock struct { + collector int + block *peer.DeliverResponse_FilteredBlock } // NewBlockCollector creates a BlockCollector @@ -46,6 +51,7 @@ func NewBlockCollector(threshold int, total int) (*BlockCollector, error) { func (bc *BlockCollector) Start( ctx context.Context, blockCh <-chan *AddressedBlock, + successRateBlockCh chan<- *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, @@ -54,7 +60,7 @@ func (bc *BlockCollector) Start( for { select { case block := <-blockCh: - bc.commit(block, finishCh, totalTx, now, printResult) + bc.commit(block, successRateBlockCh, finishCh, totalTx, now, printResult) case <-ctx.Done(): return } @@ -65,7 +71,7 @@ func (bc *BlockCollector) Start( // commit commits a block to collector. // If the number of peers on which this block has been committed has satisfied thresholdP, // adds the number to the totalTx. -func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) { +func (bc *BlockCollector) commit(block *AddressedBlock, successRatioBlockCh chan<- *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) { bitMap, ok := bc.registry[block.Number] if !ok { // The block with Number is received for the first time @@ -77,11 +83,11 @@ func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, bitMap = &b } // When the block from Address has been received before, return directly. - if bitMap.Has(block.Address) { + if bitMap.Has(block.PeerIdx) { return } - bitMap.Set(block.Address) + bitMap.Set(block.PeerIdx) cnt := bitMap.Count() // newly committed block just hits threshold @@ -101,4 +107,37 @@ func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, // committed on all peers, remove from registry delete(bc.registry, block.Number) } + successRatioBlockCh <- block +} + +// CalSuccessRate calculate the success rate of the txs +// First calculate the transaction success rate accepted by each peer, +// and then count all the transaction success rates +func CalSuccessRate(collectNum int, totalTx int, blocks chan *AddressedBlock) { + var totalTxs, totalSuccessTxs int + allTxs := collectNum * totalTx + successTxs := make([]int, collectNum) + + for { + select { + case block := <-blocks: + for _, tx := range block.FilteredTransactions { + if tx.TxValidationCode == peer.TxValidationCode_VALID { + successTxs[block.PeerIdx]++ + } + } + totalTxs += len(block.FilteredTransactions) + } + + if totalTxs >= allTxs { + break + } + } + + fmt.Println("The txs' success rate is as followers:") + for i := 0; i < collectNum; i++ { + totalSuccessTxs += successTxs[i] + fmt.Printf("peer %d received %d txs, containing %d successful txs, and the success rate is %.2f%%\n", i, totalTx, successTxs[i], float64(successTxs[i])/float64(totalTx)*100) + } + fmt.Printf("All peer received %d txs, containing %d successful txs, and the success rate is %.2f%%\n", totalTxs, totalSuccessTxs, float64(totalSuccessTxs)/float64(totalTxs)*100) } diff --git a/pkg/infra/block_collector_test.go b/pkg/infra/block_collector_test.go index f23bd9f2..5a63f8fb 100644 --- a/pkg/infra/block_collector_test.go +++ b/pkg/infra/block_collector_test.go @@ -2,6 +2,8 @@ package infra_test import ( "context" + "io/ioutil" + "os" "sync" "tape/pkg/infra" "time" @@ -11,8 +13,16 @@ import ( . "github.com/onsi/gomega" ) -func newAddressedBlock(addr int, blockNum uint64) *infra.AddressedBlock { - return &infra.AddressedBlock{Address: addr, FilteredBlock: &peer.FilteredBlock{Number: blockNum, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}} +func newAddressedBlock(idx int, blockNum uint64, isValidTx bool) *infra.AddressedBlock { + tx := make([]*peer.FilteredTransaction, 1) + if isValidTx { + tx[0] = &peer.FilteredTransaction{TxValidationCode: peer.TxValidationCode_VALID} + } else { + tx[0] = &peer.FilteredTransaction{TxValidationCode: peer.TxValidationCode_NIL_ENVELOPE} + } + + return &infra.AddressedBlock{PeerIdx: idx, FilteredBlock: &peer.FilteredBlock{Number: blockNum, + FilteredTransactions: tx}} } var _ = Describe("BlockCollector", func() { @@ -23,12 +33,14 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 2, time.Now(), false) + go infra.CalSuccessRate(1, 2, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 2, time.Now(), false) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(0, 1) + block <- newAddressedBlock(0, 1, true) Eventually(done).Should(BeClosed()) }) @@ -37,18 +49,20 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 2, time.Now(), false) + go infra.CalSuccessRate(2, 2, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 2, time.Now(), false) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(1, 0) + block <- newAddressedBlock(1, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(0, 1) + block <- newAddressedBlock(0, 1, true) Eventually(done).Should(BeClosed()) select { - case block <- newAddressedBlock(1, 1): + case block <- newAddressedBlock(1, 1, true): default: Fail("Block collector should still be able to consume blocks") } @@ -59,25 +73,27 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 2, time.Now(), false) + go infra.CalSuccessRate(4, 2, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 2, time.Now(), false) - block <- newAddressedBlock(0, 1) + block <- newAddressedBlock(0, 1, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(1, 1) + block <- newAddressedBlock(1, 1, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(2, 1) + block <- newAddressedBlock(2, 1, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(3, 1) + block <- newAddressedBlock(3, 1, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(1, 0) + block <- newAddressedBlock(1, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(2, 0) + block <- newAddressedBlock(2, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(3, 0) + block <- newAddressedBlock(3, 0, true) Eventually(done).Should(BeClosed()) }) @@ -86,12 +102,14 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 1, time.Now(), false) + go infra.CalSuccessRate(4, 1, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 1, time.Now(), false) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(1, 0) + block <- newAddressedBlock(1, 0, true) Eventually(done).Should(BeClosed()) }) @@ -100,15 +118,17 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 2, time.Now(), false) + go infra.CalSuccessRate(1, 2, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 2, time.Now(), false) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(0, 0) + block <- newAddressedBlock(0, 0, true) Consistently(done).ShouldNot(BeClosed()) - block <- newAddressedBlock(0, 1) + block <- newAddressedBlock(0, 1, true) Eventually(done).Should(BeClosed()) }) @@ -133,15 +153,17 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 1, time.Now(), false) + go infra.CalSuccessRate(100, 1, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 1, time.Now(), false) var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { go func(idx int) { defer wg.Done() - block <- newAddressedBlock(idx, 0) + block <- newAddressedBlock(idx, 0, true) }(i) } wg.Wait() @@ -153,13 +175,15 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 10, time.Now(), false) + go infra.CalSuccessRate(5, 10, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 10, time.Now(), false) for i := 0; i < 3; i++ { go func(idx int) { for j := 0; j < 10; j++ { - block <- newAddressedBlock(idx, uint64(j)) + block <- newAddressedBlock(idx, uint64(j), true) } }(i) } @@ -171,13 +195,15 @@ var _ = Describe("BlockCollector", func() { Expect(err).NotTo(HaveOccurred()) block := make(chan *infra.AddressedBlock) + successRateBlock := make(chan *infra.AddressedBlock) done := make(chan struct{}) - go instance.Start(context.Background(), block, done, 10, time.Now(), false) + go infra.CalSuccessRate(5, 10, successRateBlock) + go instance.Start(context.Background(), block, successRateBlock, done, 10, time.Now(), false) for i := 0; i < 5; i++ { go func(idx int) { for j := 0; j < 10; j++ { - block <- newAddressedBlock(idx, uint64(j)) + block <- newAddressedBlock(idx, uint64(j), true) } }(i) } @@ -185,3 +211,130 @@ var _ = Describe("BlockCollector", func() { }) }) }) + +var _ = Describe("CalSuccessRate", func() { + + Context("all txs are correct", func() { + It("Should supports observer 1 and tx 1", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, true) + }() + infra.CalSuccessRate(1, 1, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 1 txs, containing 1 successful txs, and the success rate is 100.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 1 txs, containing 1 successful txs, and the success rate is 100.00%")) + }) + + It("Should supports observer 2 and tx 1", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, true) + blockCh <- newAddressedBlock(1, 0, true) + }() + infra.CalSuccessRate(2, 1, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 1 txs, containing 1 successful txs, and the success rate is 100.00%")) + Expect(string(out)).Should(ContainSubstring("peer 1 received 1 txs, containing 1 successful txs, and the success rate is 100.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 2 txs, containing 2 successful txs, and the success rate is 100.00%")) + }) + + It("Should supports observer 2 and tx 2", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, true) + blockCh <- newAddressedBlock(1, 0, true) + blockCh <- newAddressedBlock(0, 1, true) + blockCh <- newAddressedBlock(1, 1, true) + }() + infra.CalSuccessRate(2, 2, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 2 txs, containing 2 successful txs, and the success rate is 100.00%")) + Expect(string(out)).Should(ContainSubstring("peer 1 received 2 txs, containing 2 successful txs, and the success rate is 100.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 4 txs, containing 4 successful txs, and the success rate is 100.00%")) + }) + }) + + Context("Not all txs are correct", func() { + It("Should supports observer 1 and tx 1", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, false) + }() + infra.CalSuccessRate(1, 1, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 1 txs, containing 0 successful txs, and the success rate is 0.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 1 txs, containing 0 successful txs, and the success rate is 0.00%")) + }) + + It("Should supports observer 2 and tx 1", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, false) + blockCh <- newAddressedBlock(1, 0, false) + }() + infra.CalSuccessRate(2, 1, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 1 txs, containing 0 successful txs, and the success rate is 0.00%")) + Expect(string(out)).Should(ContainSubstring("peer 1 received 1 txs, containing 0 successful txs, and the success rate is 0.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 2 txs, containing 0 successful txs, and the success rate is 0.00%")) + }) + + It("Should supports observer 2 and tx 2", func() { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + blockCh := make(chan *infra.AddressedBlock) + go func() { + blockCh <- newAddressedBlock(0, 0, true) + blockCh <- newAddressedBlock(1, 0, true) + blockCh <- newAddressedBlock(0, 1, false) + blockCh <- newAddressedBlock(1, 1, false) + }() + infra.CalSuccessRate(2, 2, blockCh) + + w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + Expect(string(out)).Should(ContainSubstring("peer 0 received 2 txs, containing 1 successful txs, and the success rate is 50.00%")) + Expect(string(out)).Should(ContainSubstring("peer 1 received 2 txs, containing 1 successful txs, and the success rate is 50.00%")) + Expect(string(out)).Should(ContainSubstring("All peer received 4 txs, containing 2 successful txs, and the success rate is 50.00%")) + }) + }) +}) diff --git a/pkg/infra/observer_test.go b/pkg/infra/observer_test.go index 185eb412..9cf0ff85 100644 --- a/pkg/infra/observer_test.go +++ b/pkg/infra/observer_test.go @@ -81,7 +81,9 @@ var _ = Describe("Observer", func() { blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers)) Expect(err).NotTo(HaveOccurred()) blockCh := make(chan *infra.AddressedBlock) - go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true) + successRateBlockCh := make(chan *infra.AddressedBlock) + go infra.CalSuccessRate(1, mock.MockTxSize, successRateBlockCh) + go blockCollector.Start(ctx, blockCh, successRateBlockCh, finishCh, mock.MockTxSize, time.Now(), true) go observers.Start(errorCh, blockCh, start) go func() { for i := 0; i < mock.MockTxSize; i++ { @@ -141,7 +143,9 @@ var _ = Describe("Observer", func() { blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers)) Expect(err).NotTo(HaveOccurred()) blockCh := make(chan *infra.AddressedBlock) - go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true) + successRateBlockCh := make(chan *infra.AddressedBlock) + go infra.CalSuccessRate(TotalPeers, mock.MockTxSize, successRateBlockCh) + go blockCollector.Start(ctx, blockCh, successRateBlockCh, finishCh, mock.MockTxSize, time.Now(), true) go observers.Start(errorCh, blockCh, start) for i := 0; i < TotalPeers; i++ { go func(k int) { diff --git a/pkg/infra/process.go b/pkg/infra/process.go index bc9dbe80..fb705dc4 100644 --- a/pkg/infra/process.go +++ b/pkg/infra/process.go @@ -23,6 +23,7 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo processed := make(chan *Elements, burst) envs := make(chan *Elements, burst) blockCh := make(chan *AddressedBlock) + successRateBlockCh := make(chan *AddressedBlock) finishCh := make(chan struct{}) errorCh := make(chan error, burst) assembler := &Assembler{Signer: crypto} @@ -61,7 +62,8 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo start := time.Now() - go blockCollector.Start(ctx, blockCh, finishCh, num, time.Now(), true) + go CalSuccessRate(len(observers.workers), num, successRateBlockCh) + go blockCollector.Start(ctx, blockCh, successRateBlockCh, finishCh, num, time.Now(), true) go observers.Start(errorCh, blockCh, start) go StartCreateProposal(num, burst, rate, config, crypto, raw, errorCh)