Skip to content

Commit

Permalink
Add queue for block proposal, remove consensus ready signal
Browse files Browse the repository at this point in the history
  • Loading branch information
GheisMohammadi committed Dec 5, 2024
1 parent 3580e89 commit 872e519
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 122 deletions.
2 changes: 1 addition & 1 deletion api/service/blockproposal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *Service) Start() error {
}

func (s *Service) run() {
s.c.WaitForConsensusReadyV2(s.stopChan, s.stoppedChan)
s.c.StartCheckingForNewProposals(s.stopChan, s.stoppedChan)
}

// Stop stops block proposal service.
Expand Down
2 changes: 1 addition & 1 deletion p2p/stream/types/safe_map.go → common/types/safe_map.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttypes
package types

import (
"sync"
Expand Down
72 changes: 32 additions & 40 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,6 @@ const (

var errLeaderPriKeyNotFound = errors.New("leader private key not found locally")

type Proposal struct {
Type ProposalType
Caller string
}

// NewProposal creates a new proposal
func NewProposal(t ProposalType) Proposal {
return Proposal{Type: t, Caller: utils.GetCallStackInfo(2)}
}

// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte

// Constant of the type of new block proposal
const (
SyncProposal ProposalType = iota
AsyncProposal
)

type DownloadAsync interface {
DownloadAsync()
}
Expand All @@ -74,6 +55,8 @@ type Consensus struct {
prepareBitmap *bls_cosi.Mask
commitBitmap *bls_cosi.Mask

proposalManager *ProposalManager

multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators

pendingCXReceipts map[utils.CXKey]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
Expand Down Expand Up @@ -101,8 +84,6 @@ type Consensus struct {
mutex *sync.RWMutex
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
readySignal chan Proposal
// Channel to send full commit signatures to finish new block proposal
commitSigChannel chan []byte
// verified block to state sync broadcast
Expand Down Expand Up @@ -162,16 +143,27 @@ func (consensus *Consensus) ChainReader() engine.ChainReader {
return consensus.Blockchain()
}

func (consensus *Consensus) ReadySignal(p Proposal, signalSource string, signalReason string) {
func (consensus *Consensus) AddProposal(t ProposalType, source string, reason string) error {
bn := consensus.Blockchain().CurrentBlock().NumberU64()
v := consensus.GetViewChangingID()
p := NewProposal(t, v, bn, source, reason)
consensus.proposalManager.AddProposal(p)
return nil
}

func (consensus *Consensus) ReadySignal(t ProposalType, signalSource string, signalReason string) {
if err := consensus.AddProposal(t, signalSource, signalReason); err != nil {
utils.Logger().Debug().Err(err).
Str("signalSource", signalSource).
Str("signalReason", signalReason).
Msg("ReadySignal is failed to add a new proposal")
return
}
utils.Logger().Info().
Str("ProposalType", t.String()).
Str("signalSource", signalSource).
Str("signalReason", signalReason).
Msg("ReadySignal is called to propose new block")
consensus.readySignal <- p
}

func (consensus *Consensus) GetReadySignal() chan Proposal {
return consensus.readySignal
Msg("ReadySignal is called to propose a new block")
}

func (consensus *Consensus) GetCommitSigChannel() chan []byte {
Expand Down Expand Up @@ -287,17 +279,18 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: NewState(Normal),
decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: NewState(Normal),
decider: Decider,
registry: registry,
proposalManager: NewProposalManager(),
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
// FBFT timeout
consensusTimeout: createTimeout(),
dHelper: downloadAsync{},
Expand All @@ -318,7 +311,6 @@ func New(
// displayed on explorer as Height right now
consensus.setCurBlockViewID(0)
consensus.SlashChan = make(chan slash.Record)
consensus.readySignal = make(chan Proposal)
consensus.commitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (consensus *Consensus) updateConsensusInformation(reason string) Mode {
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal(NewProposal(SyncProposal), "updateConsensusInformation", "leader changed and I am the new leader")
consensus.ReadySignal(SyncProposal, "updateConsensusInformation", "leader changed and I am the new leader")
}()
}
return Normal
Expand Down
8 changes: 4 additions & 4 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (consensus *Consensus) finalCommit(isLeader bool) {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal(NewProposal(SyncProposal), "finalCommit", "I am leader and it's the last block in epoch")
consensus.ReadySignal(SyncProposal, "finalCommit", "I am leader and it's the last block in epoch")
}()
} else {
// pipelining
Expand Down Expand Up @@ -354,7 +354,7 @@ func (consensus *Consensus) StartChannel() {
consensus.start = true
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.mutex.Unlock()
consensus.ReadySignal(NewProposal(SyncProposal), "StartChannel", "consensus channel is started")
consensus.ReadySignal(SyncProposal, "StartChannel", "consensus channel is started")
return
}
consensus.mutex.Unlock()
Expand Down Expand Up @@ -606,7 +606,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
consensus.mutex.Unlock()
consensus.ReadySignal(NewProposal(AsyncProposal), "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish")
consensus.ReadySignal(AsyncProposal, "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish")
}()

return nil
Expand Down Expand Up @@ -848,7 +848,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
blockPeriod := consensus.BlockPeriod
go func() {
<-time.After(blockPeriod)
consensus.ReadySignal(NewProposal(SyncProposal), "setupForNewConsensus", "I am the new leader")
consensus.ReadySignal(SyncProposal, "setupForNewConsensus", "I am the new leader")
}()
}
}
Expand Down
134 changes: 134 additions & 0 deletions consensus/proposal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package consensus

import (
"sync"
"time"

"github.com/harmony-one/harmony/internal/utils"
)

// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte

// Constant of the type of new block proposal
const (
SyncProposal ProposalType = iota
AsyncProposal
)

func (pt ProposalType) String() string {
if pt == SyncProposal {
return "SyncProposal"
}
return "AsyncProposal"
}

// Proposal represents a new block proposal with associated metadata
type Proposal struct {
Type ProposalType
Caller string
Height uint64
ViewID uint64
Source string
Reason string
CreatedAt time.Time
lock *sync.RWMutex
}

// NewProposal creates a new proposal
func NewProposal(t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal {
return &Proposal{
Type: t,
Caller: utils.GetCallStackInfo(2),
ViewID: 0,
Height: 0,
Source: source,
Reason: reason,
CreatedAt: time.Now(),
lock: &sync.RWMutex{},
}
}

// Clone returns a copy of proposal
func (p *Proposal) Clone() *Proposal {
p.lock.RLock()
defer p.lock.RUnlock()
return &Proposal{
Type: p.Type,
Caller: p.Caller,
ViewID: p.ViewID,
Height: p.Height,
CreatedAt: p.CreatedAt,
lock: &sync.RWMutex{},
}
}

// GetType retrieves the Proposal type
func (p *Proposal) GetType() ProposalType {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Type
}

// SetType updates the Proposal type
func (p *Proposal) SetType(t ProposalType) {
p.lock.Lock()
defer p.lock.Unlock()
p.Type = t
}

// GetCaller retrieves the Proposal caller
func (p *Proposal) GetCaller() string {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Caller
}

// SetCaller updates the Proposal caller
func (p *Proposal) SetCaller(caller string) {
p.lock.Lock()
defer p.lock.Unlock()
p.Caller = caller
}

// GetHeight retrieves the Proposal height
func (p *Proposal) GetHeight() uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Height
}

// SetHeight updates the Proposal height
func (p *Proposal) SetHeight(height uint64) {
p.lock.Lock()
defer p.lock.Unlock()
p.Height = height
}

// GetViewID retrieves the Proposal view ID
func (p *Proposal) GetViewID() uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.ViewID
}

// SetViewID updates the Proposal view ID
func (p *Proposal) SetViewID(viewID uint64) {
p.lock.Lock()
defer p.lock.Unlock()
p.ViewID = viewID
}

// GetCreatedAt retrieves the Proposal creation time
func (p *Proposal) GetCreatedAt() time.Time {
p.lock.RLock()
defer p.lock.RUnlock()
return p.CreatedAt
}

// SetCreatedAt updates the Proposal creation time
func (p *Proposal) SetCreatedAt(createdAt time.Time) {
p.lock.Lock()
defer p.lock.Unlock()
p.CreatedAt = createdAt
}
Loading

0 comments on commit 872e519

Please sign in to comment.