Skip to content

Commit

Permalink
fix up
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Yuan <yy19902439@126.com>
  • Loading branch information
SamYuan1990 committed Jul 18, 2021
1 parent 8fa0a8c commit c7aa94e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 83 deletions.
91 changes: 91 additions & 0 deletions pkg/infra/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package infra

import (
"context"
"io"
"tape/pkg/infra/basic"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

type Broadcasters struct {
workers []*Broadcaster
ctx context.Context
envs chan *Elements
errorCh chan error
}

type Broadcaster struct {
c orderer.AtomicBroadcast_BroadcastClient
logger *log.Logger
}

func CreateBroadcasters(ctx context.Context, envs chan *Elements, errorCh chan error, config basic.Config, logger *log.Logger) (*Broadcasters, error) {
var workers []*Broadcaster
for i := 0; i < config.NumOfConn; i++ {
broadcaster, err := CreateBroadcaster(ctx, config.Orderer, logger)
if err != nil {
return nil, err
}
workers = append(workers, broadcaster)
}

return &Broadcasters{
workers: workers,
ctx: ctx,
envs: envs,
errorCh: errorCh,
}, nil
}

func (bs Broadcasters) Start() {
for _, b := range bs.workers {
go b.StartDraining(bs.errorCh)
go b.Start(bs.ctx, bs.envs, bs.errorCh)
}
}

func CreateBroadcaster(ctx context.Context, node basic.Node, logger *log.Logger) (*Broadcaster, error) {
client, err := basic.CreateBroadcastClient(ctx, node, logger)
if err != nil {
return nil, err
}

return &Broadcaster{c: client, logger: logger}, nil
}

func (b *Broadcaster) Start(ctx context.Context, envs <-chan *Elements, errorCh chan error) {
b.logger.Debugf("Start sending broadcast")
for {
select {
case e := <-envs:
err := b.c.Send(e.Envelope)
if err != nil {
errorCh <- err
}
case <-ctx.Done():
return
}
}
}

func (b *Broadcaster) StartDraining(errorCh chan error) {
for {
res, err := b.c.Recv()
if err != nil {
if err == io.EOF {
return
}
b.logger.Errorf("recv broadcast err: %+v, status: %+v\n", err, res)
return
}

if res.Status != common.Status_SUCCESS {
errorCh <- errors.Errorf("recv errouneous status %s", res.Status)
return
}
}
}
83 changes: 0 additions & 83 deletions pkg/infra/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package infra

import (
"context"
"io"
"tape/pkg/infra/basic"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -91,82 +87,3 @@ func (p *Proposer) Start(ctx context.Context, signed, processed chan *Elements,
}
}
}

type Broadcasters struct {
workers []*Broadcaster
ctx context.Context
envs chan *Elements
errorCh chan error
}

type Broadcaster struct {
c orderer.AtomicBroadcast_BroadcastClient
logger *log.Logger
}

func CreateBroadcasters(ctx context.Context, envs chan *Elements, errorCh chan error, config basic.Config, logger *log.Logger) (*Broadcasters, error) {
var workers []*Broadcaster
for i := 0; i < config.NumOfConn; i++ {
broadcaster, err := CreateBroadcaster(ctx, config.Orderer, logger)
if err != nil {
return nil, err
}
workers = append(workers, broadcaster)
}

return &Broadcasters{
workers: workers,
ctx: ctx,
envs: envs,
errorCh: errorCh,
}, nil
}

func (bs Broadcasters) Start() {
for _, b := range bs.workers {
go b.StartDraining(bs.errorCh)
go b.Start(bs.ctx, bs.envs, bs.errorCh)
}
}

func CreateBroadcaster(ctx context.Context, node basic.Node, logger *log.Logger) (*Broadcaster, error) {
client, err := basic.CreateBroadcastClient(ctx, node, logger)
if err != nil {
return nil, err
}

return &Broadcaster{c: client, logger: logger}, nil
}

func (b *Broadcaster) Start(ctx context.Context, envs <-chan *Elements, errorCh chan error) {
b.logger.Debugf("Start sending broadcast")
for {
select {
case e := <-envs:
err := b.c.Send(e.Envelope)
if err != nil {
errorCh <- err
}
case <-ctx.Done():
return
}
}
}

func (b *Broadcaster) StartDraining(errorCh chan error) {
for {
res, err := b.c.Recv()
if err != nil {
if err == io.EOF {
return
}
b.logger.Errorf("recv broadcast err: %+v, status: %+v\n", err, res)
return
}

if res.Status != common.Status_SUCCESS {
errorCh <- errors.Errorf("recv errouneous status %s", res.Status)
return
}
}
}

0 comments on commit c7aa94e

Please sign in to comment.