Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/compute/executor: Minor fixes #5426

Merged
merged 12 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5426.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Clear channels when not in the committee
21 changes: 17 additions & 4 deletions go/consensus/cometbft/apps/roothash/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,28 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
switch err {
case commitment.ErrDiscrepancyDetected:
ctx.Logger().Warn("executor discrepancy detected",
"runtime_id", rtState.Runtime.ID,
"round", round,
"rank", rtState.CommitmentPool.HighestRank,
"timeout", timeout,
logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected,
)

ctx.EmitEvent(
tmapi.NewEventBuilder(app.Name()).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Rank: rtState.CommitmentPool.HighestRank, Timeout: timeout}).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{
Round: round,
Rank: rtState.CommitmentPool.HighestRank,
Timeout: timeout,
}).
TypedAttribute(&roothash.RuntimeIDAttribute{ID: rtState.Runtime.ID}),
)

// Re-arm round timeout. Give backup workers enough time to submit commitments.
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = ctx.BlockHeight() + 1 + (rtState.Runtime.Executor.RoundTimeout*backupWorkerTimeoutFactorNumerator)/backupWorkerTimeoutFactorDenominator // Current height is ctx.BlockHeight() + 1

if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout); err != nil {
if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, round, prevTimeout, rtState.NextTimeout); err != nil {
return err
}

Expand All @@ -114,6 +120,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
case commitment.ErrStillWaiting:
// Need more commits.
ctx.Logger().Debug("insufficient commitments for finality, waiting",
"runtime_id", rtState.Runtime.ID,
"round", round,
)
return nil
Expand All @@ -132,8 +139,11 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo

// The round has been finalized.
ctx.Logger().Debug("finalized round",
"runtime_id", rtState.Runtime.ID,
"round", round,
"priority", pool.HighestRank,
"rank", pool.HighestRank,
"scheduler_id", sc.Commitment.Header.SchedulerID,
"timeout", timeout,
)

livenessStats.TotalRounds++
Expand Down Expand Up @@ -231,6 +241,8 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
switch rtState.CommitmentPool.Discrepancy {
case true:
ctx.Logger().Debug("executor pool discrepancy",
"runtime_id", rtState.Runtime.ID,
"round", round,
"slashing", rtState.Runtime.Staking.Slashing,
)

Expand Down Expand Up @@ -316,7 +328,7 @@ func (app *rootHashApplication) finalizeBlock(ctx *tmapi.Context, rtState *rooth
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = roothash.TimeoutNever

return rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout)
return rearmRoundTimeout(ctx, rtState.Runtime.ID, blk.Header.Round, prevTimeout, rtState.NextTimeout)
}

func (app *rootHashApplication) failRound(
Expand All @@ -327,6 +339,7 @@ func (app *rootHashApplication) failRound(
round := rtState.LastBlock.Header.Round + 1

ctx.Logger().Debug("round failed",
"runtime_id", rtState.Runtime.ID,
"round", round,
"err", err,
logging.LogEvent, roothash.LogEventRoundFailed,
Expand Down
1 change: 1 addition & 0 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, state *ro
ctx.Logger().Debug("updating committee for runtime",
"runtime_id", rt.ID,
"epoch", epoch,
"committee", committee,
)

// Emit an empty block signaling epoch transition. This is required so that
Expand Down
5 changes: 4 additions & 1 deletion go/consensus/cometbft/apps/roothash/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ func (app *rootHashApplication) processRoundTimeouts(ctx *tmapi.Context) error {

func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace) error {
ctx.Logger().Warn("round timeout expired, forcing finalization",
"runtime_id", runtimeID,
logging.LogEvent, roothash.LogEventTimerFired,
)

if err := app.tryFinalizeRound(ctx, runtimeID, true); err != nil {
ctx.Logger().Error("failed to finalize round",
"runtime_id", runtimeID,
"err", err,
)
return fmt.Errorf("failed to finalize round: %w", err)
Expand All @@ -48,14 +50,15 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeI
return nil
}

func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, prevTimeout int64, nextTimeout int64) error {
func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, round uint64, prevTimeout int64, nextTimeout int64) error {
// Re-arm only if the round timeout has changed.
if prevTimeout == nextTimeout {
return nil
}

ctx.Logger().Debug("re-arming round timeout",
"runtime_id", runtimeID,
"round", round,
"prev_timeout", prevTimeout,
"next_timeout", nextTimeout,
"height", ctx.BlockHeight()+1, // Current height is ctx.BlockHeight() + 1
Expand Down
6 changes: 5 additions & 1 deletion go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (app *rootHashApplication) executorCommit(

// Check if higher-ranked scheduler submitted a commitment.
if prevRank != rtState.CommitmentPool.HighestRank {
round := rtState.LastBlock.Header.Round + 1

ctx.Logger().Debug("transaction scheduler has changed",
"runtime_id", cc.ID,
"round", round,
"prev_rank", prevRank,
"new_rank", rtState.CommitmentPool.HighestRank,
)
Expand All @@ -142,7 +146,7 @@ func (app *rootHashApplication) executorCommit(
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = ctx.BlockHeight() + 1 + rtState.Runtime.Executor.RoundTimeout // Current height is ctx.BlockHeight() + 1

if err := rearmRoundTimeout(ctx, cc.ID, prevTimeout, rtState.NextTimeout); err != nil {
if err := rearmRoundTimeout(ctx, cc.ID, round, prevTimeout, rtState.NextTimeout); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ func (e *ExecutorCommittedEvent) EventKind() string {

// ExecutionDiscrepancyDetectedEvent is an execute discrepancy detected event.
type ExecutionDiscrepancyDetectedEvent struct {
// Round is the round in which the discrepancy was detected.
Round uint64 `json:"round"`
// Rank is the rank of the transaction scheduler.
Rank uint64 `json:"rank"`
// Timeout signals whether the discrepancy was due to a timeout.
Expand Down
26 changes: 16 additions & 10 deletions go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type commitmentEvent struct {
type discrepancyEvent struct {
timeout bool
rank uint64
round uint64
}

type finalizedEvent struct {
Expand Down Expand Up @@ -418,6 +419,7 @@ func (s *runtimeState) verifyEvents(t *testing.T, ctx context.Context, backend a
require.NotNil(ev.ExecutionDiscrepancyDetected, fmt.Sprintf("unexpected event: %+v", ev))
require.Equal(de.timeout, ev.ExecutionDiscrepancyDetected.Timeout, "timeout should match")
require.Equal(de.rank, ev.ExecutionDiscrepancyDetected.Rank, "rank should match")
require.Equal(de.round, ev.ExecutionDiscrepancyDetected.Round, "round should match")
}

if fe != nil {
Expand Down Expand Up @@ -590,7 +592,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Check that round was finalized after 2.5*RoundTimeout blocks.
Expand All @@ -599,7 +602,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

// Check that the liveness statistics were computed correctly.
verifyLivenessStatistics(parent)
Expand All @@ -626,7 +629,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Backup schedulers should wait for a double timeout.
Expand All @@ -635,15 +639,15 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
// Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy
// resolution started immediately.
height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank}, nil)
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank, round}, nil)
default:
// Check that round was finalized after 2.5*RoundTimeout blocks.
height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, nil, nil)

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

}

Expand Down Expand Up @@ -672,7 +676,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Check that round was finalized after 2.5*RoundTimeout blocks.
Expand All @@ -681,7 +686,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

// Check that the liveness statistics were computed correctly.
verifyLivenessStatistics(parent)
Expand Down Expand Up @@ -712,7 +717,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Backup schedulers should wait for a double timeout.
Expand All @@ -721,15 +727,15 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
// Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy
// resolution started immediately.
height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank}, nil)
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank, round}, nil)
default:
// Check that round was finalized after 2.5*RoundTimeout blocks.
height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, nil, nil)

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

}

Expand Down
2 changes: 1 addition & 1 deletion go/runtime/host/sgx/sgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// nodes on a single machine, all sharing the same EPC.
runtimeRAKTimeout = 60 * time.Second
// Runtime attest interval.
defaultRuntimeAttestInterval = 1 * time.Hour
defaultRuntimeAttestInterval = 2 * time.Hour
)

// Config contains SGX-specific provisioner configuration options.
Expand Down
31 changes: 12 additions & 19 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ type TransactionPool interface {
GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int)

// ProcessBlock updates the last known runtime block information.
ProcessBlock(bi *runtime.BlockInfo) error
ProcessBlock(bi *runtime.BlockInfo)

// ProcessIncomingMessages loads transactions from incoming messages into the pool.
ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error
ProcessIncomingMessages(inMsgs []*message.IncomingMessage)

// WatchCheckedTransactions subscribes to notifications about new transactions being available
// in the transaction pool for scheduling.
WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction)
WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription)

// PendingCheckSize returns the number of transactions currently pending to be checked.
PendingCheckSize() int
Expand Down Expand Up @@ -374,43 +374,36 @@ HASH_LOOP:
return txs, missingTxs
}

func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) error {
func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()

switch {
case t.blockInfo == nil:
if t.blockInfo == nil {
close(t.initCh)
fallthrough
case bi.RuntimeBlock.Header.HeaderType == block.EpochTransition:
// Force recheck on epoch transitions.
t.recheckTxCh.In() <- struct{}{}
default:
}

t.blockInfo = bi
t.lastBlockProcessed = time.Now()

// Trigger transaction rechecks if needed.
if (bi.RuntimeBlock.Header.Round - t.lastRecheckRound) > t.cfg.RecheckInterval {
// Force transaction rechecks on epoch transitions and if needed.
isEpochTransition := bi.RuntimeBlock.Header.HeaderType == block.EpochTransition
roundDifference := bi.RuntimeBlock.Header.Round - t.lastRecheckRound
if isEpochTransition || roundDifference > t.cfg.RecheckInterval {
t.recheckTxCh.In() <- struct{}{}
t.lastRecheckRound = bi.RuntimeBlock.Header.Round
}

return nil
}

func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error {
func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) {
t.rimQueue.Load(inMsgs)
rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size()))
return nil
}

func (t *txPool) WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction) {
func (t *txPool) WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription) {
sub := t.checkTxNotifier.Subscribe()
ch := make(chan []*PendingCheckTransaction)
sub.Unwrap(ch)
return sub, ch
return ch, sub
}

func (t *txPool) PendingCheckSize() int {
Expand Down
11 changes: 0 additions & 11 deletions go/worker/client/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,6 @@ func (n *Node) Initialized() <-chan struct{} {
return n.initCh
}

// HandlePeerTx is guarded by CrossNode.
func (n *Node) HandlePeerTx(context.Context, []byte) error {
// Nothing to do here.
return nil
}

// HandleEpochTransitionLocked is guarded by CrossNode.
func (n *Node) HandleEpochTransitionLocked(*committee.EpochSnapshot) {
// Nothing to do here.
}

// HandleNewBlockEarlyLocked is guarded by CrossNode.
func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) {
// Nothing to do here.
Expand Down
Loading
Loading