Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Leader Quorum Calculations for Commit and Prepare Phases #4813

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/stream/types/safe_map.go → common/types/safe_map.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttypes
package types

import (
"sync"
Expand Down
52 changes: 7 additions & 45 deletions consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,6 @@ func (consensus *Consensus) announce(block *types.Block) {
consensus.switchPhase("Announce", FBFTPrepare)
}

func (consensus *Consensus) checkFirstReceivedSignature(signerCount int64, phase quorum.Phase) (bool, bool) {
hasMultiBlsKeys := len(consensus.priKey) > 0
if hasMultiBlsKeys {
var myPubkeys []bls.SerializedPublicKey
for _, key := range consensus.priKey {
myPubkeys = append(myPubkeys, key.Pub.Bytes)
}
mySignsCount := consensus.decider.GetBallotsCount(phase, myPubkeys)
return true, signerCount == mySignsCount
}
return false, false
}

// this method is called for each validator sent their vote message
func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
// TODO(audit): make FBFT lookup using map instead of looping through all items.
Expand Down Expand Up @@ -135,20 +122,6 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
}

signerCount := consensus.decider.SignersCount(quorum.Prepare)

// check if it is first received signatures
// it may multi bls key validators can achieve quorum on first signature
hasMultiBlsKeys, isFirstReceivedSignature := consensus.checkFirstReceivedSignature(signerCount, quorum.Prepare)

quorumPreExisting := consensus.decider.IsQuorumAchieved(quorum.Prepare)
//// Read - End

if quorumPreExisting {
// already have enough signatures
consensus.getLogger().Debug().
Interface("validatorPubKeys", recvMsg.SenderPubkeys).
Msg("[OnPrepare] Received Additional Prepare Message")
}
//// Read - End

consensus.UpdateLeaderMetrics(float64(signerCount), float64(consensus.getBlockNum()))
Expand Down Expand Up @@ -203,15 +176,14 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
//// Write - End

//// Read - Start
quorumFromInitialSignature := hasMultiBlsKeys && isFirstReceivedSignature && quorumPreExisting
quorumPostNewSignatures := consensus.decider.IsQuorumAchieved(quorum.Prepare)
quorumFromNewSignatures := !quorumPreExisting && quorumPostNewSignatures

if quorumFromInitialSignature || quorumFromNewSignatures {
quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Prepare)
lastQuorumAchievedBlock := consensus.current.GetLastQuorumAchievedBlock(quorum.Prepare)
if quorumIsMet && recvMsg.BlockNum > lastQuorumAchievedBlock {
// NOTE Let it handle its own logs
if err := consensus.didReachPrepareQuorum(); err != nil {
return
}
consensus.current.SetLastQuorumAchievedBlock(quorum.Prepare, recvMsg.BlockNum)
consensus.switchPhase("onPrepare", FBFTCommit)
}
//// Read - End
Expand All @@ -236,15 +208,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {

commitBitmap := consensus.commitBitmap

// has to be called before verifying signature
quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit)

signerCount := consensus.decider.SignersCount(quorum.Commit)

// check if it is first received commit
// it may multi bls key validators can achieve quorum on first commit
hasMultiBlsKeys, isFirstReceivedSignature := consensus.checkFirstReceivedSignature(signerCount, quorum.Commit)

//// Read - End

// Verify the signature on commitPayload is correct
Expand Down Expand Up @@ -318,13 +282,11 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {

quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
//// Read - End

quorumAchievedByFirstCommit := hasMultiBlsKeys && isFirstReceivedSignature && quorumWasMet
quorumAchievedByThisCommit := !quorumWasMet && quorumIsMet

if quorumAchievedByFirstCommit || quorumAchievedByThisCommit {
lastQuorumAchievedBlock := consensus.current.GetLastQuorumAchievedBlock(quorum.Commit)
if quorumIsMet && blockObj.NumberU64() > lastQuorumAchievedBlock {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
consensus.fBFTLog.MarkBlockVerified(blockObj)
consensus.current.SetLastQuorumAchievedBlock(quorum.Commit, blockObj.NumberU64())

if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
Expand Down
23 changes: 22 additions & 1 deletion consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
types "github.com/harmony-one/harmony/common/types"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
Expand All @@ -32,11 +33,14 @@ type State struct {
// view changing id is used during view change mode
// it is the next view id
viewChangingID uint64

quorumAchievedBlock *types.SafeMap[quorum.Phase, uint64]
}

func NewState(mode Mode) State {
return State{
mode: uint32(mode),
mode: uint32(mode),
quorumAchievedBlock: types.NewSafeMap[quorum.Phase, uint64](),
}
}

Expand Down Expand Up @@ -73,6 +77,23 @@ func (pm *State) SetViewChangingID(id uint64) {
atomic.StoreUint64(&pm.viewChangingID, id)
}

// GetLastQuorumAchievedBlock retrieves the block number of the last block
// that achieved quorum for the specified phase.
// If no quorum has been achieved for the given phase, it returns 0.
func (pm *State) GetLastQuorumAchievedBlock(p quorum.Phase) uint64 {
lqab, exists := pm.quorumAchievedBlock.Get(p)
if !exists {
return 0
}
return lqab
}

// SetLastQuorumAchievedBlock updates the block number of the last block
// that achieved quorum for the specified phase.
func (pm *State) SetLastQuorumAchievedBlock(p quorum.Phase, blockNum uint64) {
pm.quorumAchievedBlock.Set(p, blockNum)
}

// GetViewChangeDuraion return the duration of the current view change
// It increase in the power of difference betweeen view changing ID and current view ID
func (pm *State) GetViewChangeDuraion() time.Duration {
Expand Down
5 changes: 3 additions & 2 deletions p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
types "github.com/harmony-one/harmony/common/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)
Expand Down Expand Up @@ -138,8 +139,8 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream {
return sts
}

func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] {
m := sttypes.NewSafeMap[sttypes.StreamID, *stream]()
func makeDummyStreamSets(indexes []int) *types.SafeMap[sttypes.StreamID, *stream] {
m := types.NewSafeMap[sttypes.StreamID, *stream]()

for _, index := range indexes {
st := &testStream{
Expand Down
23 changes: 12 additions & 11 deletions p2p/stream/common/requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"

"github.com/ethereum/go-ethereum/event"
types "github.com/harmony-one/harmony/common/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
Expand All @@ -20,10 +21,10 @@ import (
// TODO: each peer is able to have a queue of requests instead of one request at a time.
// TODO: add QoS evaluation for each stream
type requestManager struct {
streams *sttypes.SafeMap[sttypes.StreamID, *stream] // All streams
available *sttypes.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request
pendings *sttypes.SafeMap[uint64, *request] // requests that are sent but not received response
waitings requestQueues // double linked list of requests that are on the waiting list
streams *types.SafeMap[sttypes.StreamID, *stream] // All streams
available *types.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request
pendings *types.SafeMap[uint64, *request] // requests that are sent but not received response
waitings requestQueues // double linked list of requests that are on the waiting list

// Stream events
sm streammanager.Reader
Expand Down Expand Up @@ -56,9 +57,9 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager {
logger := utils.Logger().With().Str("module", "request manager").Logger()

return &requestManager{
streams: sttypes.NewSafeMap[sttypes.StreamID, *stream](),
available: sttypes.NewSafeMap[sttypes.StreamID, struct{}](),
pendings: sttypes.NewSafeMap[uint64, *request](),
streams: types.NewSafeMap[sttypes.StreamID, *stream](),
available: types.NewSafeMap[sttypes.StreamID, struct{}](),
pendings: types.NewSafeMap[uint64, *request](),
waitings: newRequestQueues(),

sm: sm,
Expand Down Expand Up @@ -355,7 +356,7 @@ func (rm *requestManager) refreshStreams() {
}
}

func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) {
func checkStreamUpdates(exists *types.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) {
targetM := make(map[sttypes.StreamID]sttypes.Stream)

for _, target := range targets {
Expand Down Expand Up @@ -401,9 +402,9 @@ func (rm *requestManager) close() {
rm.pendings.Iterate(func(key uint64, req *request) {
req.doneWithResponse(responseData{err: ErrClosed})
})
rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *stream]()
rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]()
rm.pendings = sttypes.NewSafeMap[uint64, *request]()
rm.streams = types.NewSafeMap[sttypes.StreamID, *stream]()
rm.available = types.NewSafeMap[sttypes.StreamID, struct{}]()
rm.pendings = types.NewSafeMap[uint64, *request]()
rm.waitings = newRequestQueues()
close(rm.stopC)
}
Expand Down
5 changes: 3 additions & 2 deletions p2p/stream/common/requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

types "github.com/harmony-one/harmony/common/types"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -368,7 +369,7 @@ func TestRequestManager_Concurrency(t *testing.T) {
func TestGenReqID(t *testing.T) {
retry := 100000
rm := &requestManager{
pendings: sttypes.NewSafeMap[uint64, *request](),
pendings: types.NewSafeMap[uint64, *request](),
}

for i := 0; i != retry; i++ {
Expand All @@ -382,7 +383,7 @@ func TestGenReqID(t *testing.T) {

func TestCheckStreamUpdates(t *testing.T) {
tests := []struct {
exists *sttypes.SafeMap[sttypes.StreamID, *stream]
exists *types.SafeMap[sttypes.StreamID, *stream]
targets []sttypes.Stream
expAddedIndexes []int
expRemovedIndexes []int
Expand Down