diff --git a/.changelog/5404.bugfix.1.md b/.changelog/5404.bugfix.1.md new file mode 100644 index 00000000000..2f985105441 --- /dev/null +++ b/.changelog/5404.bugfix.1.md @@ -0,0 +1 @@ +go/worker/compute/executor: Use local time for batch scheduling diff --git a/.changelog/5404.bugfix.2.md b/.changelog/5404.bugfix.2.md new file mode 100644 index 00000000000..12672546e7e --- /dev/null +++ b/.changelog/5404.bugfix.2.md @@ -0,0 +1 @@ +go/worker/compute/executor: Start processing once all txs are fetched diff --git a/.changelog/5404.bugfix.3.md b/.changelog/5404.bugfix.3.md new file mode 100644 index 00000000000..71835d1cc26 --- /dev/null +++ b/.changelog/5404.bugfix.3.md @@ -0,0 +1 @@ +go/worker/compute/executor: Schedule only if higher ranks didn't propose diff --git a/.changelog/5404.bugfix.4.md b/.changelog/5404.bugfix.4.md new file mode 100644 index 00000000000..b65187533b7 --- /dev/null +++ b/.changelog/5404.bugfix.4.md @@ -0,0 +1 @@ +go/worker/compute/executor: Estimate pool rank from observed commitments diff --git a/docs/oasis-node/metrics.md b/docs/oasis-node/metrics.md index ad8b0059d18..79a7c0176b7 100644 --- a/docs/oasis-node/metrics.md +++ b/docs/oasis-node/metrics.md @@ -127,7 +127,7 @@ oasis_worker_node_status_frozen | Gauge | Is oasis node frozen (binary). | | [w oasis_worker_node_status_runtime_faults | Gauge | Number of runtime faults. | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go) oasis_worker_node_status_runtime_suspended | Gauge | Runtime node suspension status (binary). | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go) oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) +oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) oasis_worker_storage_commit_latency | Summary | Latency of storage commit calls (state + outputs) (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) oasis_worker_storage_full_round | Gauge | The last round that was fully synced and finalized. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go) oasis_worker_storage_pending_round | Gauge | The last round that is in-flight for syncing. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go) diff --git a/go/consensus/cometbft/apps/roothash/finalization.go b/go/consensus/cometbft/apps/roothash/finalization.go index 454be01b753..9bfd316aa1a 100644 --- a/go/consensus/cometbft/apps/roothash/finalization.go +++ b/go/consensus/cometbft/apps/roothash/finalization.go @@ -83,7 +83,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo case commitment.ErrDiscrepancyDetected: ctx.Logger().Warn("executor discrepancy detected", "round", round, - "priority", rtState.CommitmentPool.HighestRank, + "rank", rtState.CommitmentPool.HighestRank, logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected, ) @@ -291,6 +291,7 @@ func (app *rootHashApplication) finalizeBlock(ctx *tmapi.Context, rtState *rooth // Emit event. ctx.Logger().Debug("new runtime block", + "runtime_id", rtState.Runtime.ID, "height", ctx.BlockHeight()+1, // Current height is ctx.BlockHeight() + 1 "round", blk.Header.Round, "type", blk.Header.HeaderType, diff --git a/go/roothash/api/commitment/pool.go b/go/roothash/api/commitment/pool.go index d5a7123122f..a05e6d49891 100644 --- a/go/roothash/api/commitment/pool.go +++ b/go/roothash/api/commitment/pool.go @@ -228,6 +228,7 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo // Discrepancy detection accepts commitments arriving in any order, e.g., a backup worker // can submit a commitment even before there is a discrepancy. logger.Debug("node is not in the committee", + "round", ec.Header.Header.Round, "node_id", ec.NodeID, ) return ErrNotInCommittee @@ -235,6 +236,7 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo // Discrepancy resolution accepts commitments only from backup workers to prevent workers // from improving their liveness statistics. logger.Debug("node is not a backup worker", + "round", ec.Header.Header.Round, "node_id", ec.NodeID, ) return ErrBadExecutorCommitment @@ -246,7 +248,9 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo // Reject commitments with invalid schedulers. logger.Debug("executor commitment's scheduler is not in the committee", "round", ec.Header.Header.Round, + "node_id", ec.NodeID, "scheduler_id", ec.Header.SchedulerID, + "rank", rank, ) return ErrBadExecutorCommitment } @@ -255,18 +259,22 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo switch { case rank > p.HighestRank: // Reject commitments with higher ranking. - logger.Debug("executor commitment's scheduler has too high ranking", + logger.Debug("executor commitment's scheduler has worse ranking", "round", ec.Header.Header.Round, - "commitment_rank", rank, - "pool_rank", p.HighestRank, + "node_id", ec.NodeID, + "scheduler_id", ec.Header.SchedulerID, + "rank", rank, + "highest_rank", p.HighestRank, ) return ErrBadExecutorCommitment case rank != p.HighestRank && p.Discrepancy: // Prevent placing commitments with different rank during discrepancy resolution. logger.Debug("executor commitment's scheduler rank does not match", "round", ec.Header.Header.Round, - "commitment_rank", rank, - "pool_rank", p.HighestRank, + "node_id", ec.NodeID, + "scheduler_id", ec.Header.SchedulerID, + "rank", rank, + "highest_rank", p.HighestRank, ) return ErrBadExecutorCommitment case rank < p.HighestRank: diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 53cd1775b08..87fee767e22 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -32,7 +32,7 @@ import ( ) const ( - recvTimeout = 5 * time.Second + recvTimeout = 10 * time.Second nrRuntimes = 3 ) diff --git a/go/worker/client/committee/node.go b/go/worker/client/committee/node.go index 1f003ac754b..145fbb842f8 100644 --- a/go/worker/client/committee/node.go +++ b/go/worker/client/committee/node.go @@ -98,11 +98,6 @@ func (n *Node) HandleNewBlockLocked(*runtime.BlockInfo) { // Nothing to do here. } -// HandleNewEventLocked is guarded by CrossNode. -func (n *Node) HandleNewEventLocked(*roothash.Event) { - // Nothing to do here. -} - // HandleRuntimeHostEventLocked is guarded by CrossNode. func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { // Nothing to do here. diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index a599036a0e5..b4f2be18a31 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -43,13 +43,6 @@ var ( }, []string{"runtime"}, ) - processedEventCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "oasis_worker_processed_event_count", - Help: "Number of processed roothash events.", - }, - []string{"runtime"}, - ) failedRoundCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "oasis_worker_failed_round_count", @@ -116,7 +109,6 @@ var ( nodeCollectors = []prometheus.Collector{ processedBlockCount, - processedEventCount, failedRoundCount, epochTransitionCount, epochNumber, @@ -145,8 +137,6 @@ type NodeHooks interface { // Guarded by CrossNode. HandleNewBlockLocked(*runtime.BlockInfo) // Guarded by CrossNode. - HandleNewEventLocked(*roothash.Event) - // Guarded by CrossNode. HandleRuntimeHostEventLocked(*host.Event) // Initialized returns a channel that will be closed when the worker is initialized and ready @@ -616,15 +606,6 @@ func (n *Node) handleNewBlockLocked(blk *block.Block, height int64) { } } -// Guarded by n.CrossNode. -func (n *Node) handleNewEventLocked(ev *roothash.Event) { - processedEventCount.With(n.getMetricLabels()).Inc() - - for _, hooks := range n.hooks { - hooks.HandleNewEventLocked(ev) - } -} - // Guarded by n.CrossNode. func (n *Node) handleRuntimeHostEventLocked(ev *host.Event) { n.logger.Debug("got runtime event", "ev", ev) @@ -712,16 +693,6 @@ func (n *Node) worker() { } defer blocksSub.Close() - // Start watching roothash events. - events, eventsSub, err := n.Consensus.RootHash().WatchEvents(n.ctx, n.Runtime.ID()) - if err != nil { - n.logger.Error("failed to subscribe to roothash events", - "err", err, - ) - return - } - defer eventsSub.Close() - // Provision the hosted runtime. hrt, hrtNotifier, err := n.ProvisionHostedRuntime(n.ctx) if err != nil { @@ -798,13 +769,6 @@ func (n *Node) worker() { defer n.CrossNode.Unlock() n.handleNewBlockLocked(blk.Block, blk.Height) }() - case ev := <-events: - // Received an event. - func() { - n.CrossNode.Lock() - defer n.CrossNode.Unlock() - n.handleNewEventLocked(ev) - }() case ev := <-hrtEventCh: // Received a hosted runtime event. func() { diff --git a/go/worker/compute/executor/committee/discrepancy.go b/go/worker/compute/executor/committee/discrepancy.go index 19b7e5beb91..56eda83bef7 100644 --- a/go/worker/compute/executor/committee/discrepancy.go +++ b/go/worker/compute/executor/committee/discrepancy.go @@ -13,17 +13,6 @@ type discrepancyEvent struct { authoritative bool } -func (n *Node) NotifyDiscrepancy(info *discrepancyEvent) { - // Drop discrepancies if the worker falls behind. - select { - case <-n.discrepancyCh: - default: - } - - // Non-blocking send. - n.discrepancyCh <- info -} - func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) { n.logger.Warn("execution discrepancy detected", "rank", ev.rank, @@ -55,17 +44,7 @@ func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) { n.discrepancy = ev } -func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitment.ExecutorCommitment) { - // Don't do anything if we are not a backup worker or we are an executor worker. - id := n.commonNode.Identity.NodeSigner.Public() - if !n.committee.IsBackupWorker(id) || n.committee.IsWorker(id) { - return - } - - n.logger.Debug("observed executor commitment", - "commitment", ec, - ) - +func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCommitment) { // TODO: Handle equivocation detection. // Don't do anything if the discrepancy has already been detected. @@ -101,7 +80,7 @@ func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitm n.logger.Warn("observed commitments indicate discrepancy") - n.NotifyDiscrepancy(&discrepancyEvent{ + n.handleDiscrepancy(ctx, &discrepancyEvent{ rank: n.commitPool.HighestRank, height: uint64(n.blockInfo.ConsensusBlock.Height), authoritative: false, diff --git a/go/worker/compute/executor/committee/hooks.go b/go/worker/compute/executor/committee/hooks.go index 36ff7b94e54..017f67e4463 100644 --- a/go/worker/compute/executor/committee/hooks.go +++ b/go/worker/compute/executor/committee/hooks.go @@ -4,8 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common/crash" - roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" - "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" ) @@ -46,34 +44,3 @@ func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { // Non-blocking send. n.blockInfoCh <- bi } - -// HandleNewEventLocked implements NodeHooks. -// Guarded by n.commonNode.CrossNode. -func (n *Node) HandleNewEventLocked(ev *roothash.Event) { - switch { - case ev.ExecutionDiscrepancyDetected != nil: - n.NotifyDiscrepancy(&discrepancyEvent{ - rank: ev.ExecutionDiscrepancyDetected.Rank, - height: uint64(ev.Height), - authoritative: true, - }) - case ev.ExecutorCommitted != nil: - n.NotifySchedulerCommitment(&ev.ExecutorCommitted.Commit) - } -} - -func (n *Node) NotifySchedulerCommitment(ec *commitment.ExecutorCommitment) { - // Filter scheduler commitments. - if ec.NodeID != ec.Header.SchedulerID { - return - } - - // Drop commitments if the worker falls behind. The pool's rank can only improve. - select { - case <-n.schedulerCommitmentCh: - default: - } - - // Non-blocking send. - n.schedulerCommitmentCh <- ec -} diff --git a/go/worker/compute/executor/committee/metrics.go b/go/worker/compute/executor/committee/metrics.go index 87b87e66e0e..22c392beaa2 100644 --- a/go/worker/compute/executor/committee/metrics.go +++ b/go/worker/compute/executor/committee/metrics.go @@ -9,6 +9,13 @@ import ( ) var ( + processedEventCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_worker_processed_event_count", + Help: "Number of processed roothash events.", + }, + []string{"runtime"}, + ) discrepancyDetectedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "oasis_worker_execution_discrepancy_detected_count", @@ -52,6 +59,7 @@ var ( []string{"runtime"}, ) nodeCollectors = []prometheus.Collector{ + processedEventCount, discrepancyDetectedCount, abortedBatchCount, storageCommitLatency, diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 1b24b3cd89e..ca42bdb8363 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -76,14 +76,14 @@ type Node struct { // nolint: maligned committee *scheduler.Committee commitPool *commitment.Pool - blockInfoCh chan *runtime.BlockInfo - processedBatchCh chan *processedBatch - discrepancyCh chan *discrepancyEvent - schedulerCommitmentCh chan *commitment.ExecutorCommitment - reselectCh chan struct{} + blockInfoCh chan *runtime.BlockInfo + processedBatchCh chan *processedBatch + reselectCh chan struct{} + missingTxCh chan [][]byte txCh <-chan []*txpool.PendingCheckTransaction ecCh <-chan *commitment.ExecutorCommitment + evCh <-chan *roothash.Event // Local, set and used by every round worker. @@ -95,6 +95,7 @@ type Node struct { // nolint: maligned discrepancy *discrepancyEvent submitted map[uint64]struct{} rank uint64 + poolRank uint64 proposedBatch *proposedBatch logger *logging.Logger @@ -1254,6 +1255,102 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch) n.proposeBatch(ctx, &lastHeader, batch) } +func (n *Node) handleEvent(ctx context.Context, ev *roothash.Event) { + processedEventCount.With(n.getMetricLabels()).Inc() + + switch { + case ev.ExecutionDiscrepancyDetected != nil: + n.handleDiscrepancy(ctx, &discrepancyEvent{ + rank: ev.ExecutionDiscrepancyDetected.Rank, + height: uint64(ev.Height), + authoritative: true, + }) + case ev.ExecutorCommitted != nil: + n.handleExecutorCommitment(ctx, &ev.ExecutorCommitted.Commit) + } +} + +func (n *Node) handleExecutorCommitment(ctx context.Context, ec *commitment.ExecutorCommitment) { + n.logger.Debug("executor commitment", + "commitment", ec, + ) + + id := n.commonNode.Identity.NodeSigner.Public() + switch { + case n.committee.IsWorker(id): + n.estimatePoolRank(ctx, ec, false) + } +} + +func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitment.ExecutorCommitment) { + n.logger.Debug("observed executor commitment", + "commitment", ec, + ) + + id := n.commonNode.Identity.NodeSigner.Public() + switch { + case n.committee.IsWorker(id): + n.estimatePoolRank(ctx, ec, true) + case n.committee.IsBackupWorker(id): + n.predictDiscrepancy(ctx, ec) + } +} + +func (n *Node) estimatePoolRank(ctx context.Context, ec *commitment.ExecutorCommitment, observed bool) { + // Filter for this round only. + round := n.blockInfo.RuntimeBlock.Header.Round + 1 + if ec.Header.Header.Round != round { + n.logger.Debug("ignoring bad executor commitment, not for this round", + "round", round, + "ec_round", ec.Header.Header.Round, + "node_id", ec.NodeID, + "observed", observed, + ) + return + } + + // Filter scheduler commitments. + if ec.NodeID != ec.Header.SchedulerID { + n.logger.Debug("ignoring bad executor commitment, not from scheduler", + "node_id", ec.NodeID, + "observed", observed, + ) + return + } + + if observed { + // Verify the commitment. + rt := n.epoch.GetRuntime() + if err := commitment.VerifyExecutorCommitment(ctx, n.blockInfo.RuntimeBlock, rt, n.committee.ValidFor, ec, nil, n.epoch); err != nil { + n.logger.Debug("ignoring bad executor commitment, verification failed", + "err", err, + "node_id", ec.NodeID, + "observed", observed, + ) + return + } + } + + // Update pool rank. + rank, ok := n.committee.SchedulerRank(ec.Header.Header.Round, ec.Header.SchedulerID) + if !ok { + n.logger.Debug("ignoring bad executor commitment, scheduler not in committee", + "node_id", ec.NodeID, + "observed", observed, + ) + return + } + if rank >= n.poolRank { + return + } + n.poolRank = rank + n.logger.Debug("pool rank has changed", + "round", round, + "rank", n.poolRank, + "observed", observed, + ) +} + func (n *Node) handleRoundStarted() { n.logger.Debug("starting round worker", "round", n.blockInfo.RuntimeBlock.Header.Round+1, @@ -1335,6 +1432,7 @@ func (n *Node) worker() { err error txSub pubsub.ClosableSubscription ecSub pubsub.ClosableSubscription + evSub pubsub.ClosableSubscription ) // Subscribe to notifications of new transactions being available in the pool. @@ -1352,6 +1450,16 @@ func (n *Node) worker() { } defer ecSub.Close() + // Start watching roothash events. + n.evCh, evSub, err = n.commonNode.Consensus.RootHash().WatchEvents(n.ctx, n.commonNode.Runtime.ID()) + if err != nil { + n.logger.Error("failed to subscribe to roothash events", + "err", err, + ) + return + } + defer evSub.Close() + // We are initialized. close(n.initCh) @@ -1467,18 +1575,16 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { "backup_worker", n.epoch.IsExecutorBackupWorker(), ) - // Track the pool's highest rank to prevent committing to worse-ranked proposals + // Estimate the pool's highest rank to prevent committing to worse-ranked proposals // that will be rejected by the pool. - poolRank := uint64(math.MaxUint64) + n.poolRank = uint64(math.MaxUint64) // Allow only the highest-ranked scheduler to propose immediately. schedulerRank := uint64(0) // The ticker determines when we are allowed to commit to proposals from schedulers // with lower ranks. - schedulerDelay := n.rtState.Runtime.TxnScheduler.ProposerTimeout - schedulerRankTicker := NewLinearRankTicker(uint64(bi.RuntimeBlock.Header.Timestamp), schedulerDelay, n.rank) - schedulerRankTicker.Start() + schedulerRankTicker := time.NewTicker(n.rtState.Runtime.TxnScheduler.ProposerTimeout) defer schedulerRankTicker.Stop() // Reset discrepancy detection. @@ -1493,14 +1599,14 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { // Update state, propose or schedule. switch n.discrepancy { case nil: - limit := min(schedulerRank, poolRank, n.rank) + limit := min(schedulerRank, n.poolRank, n.rank) proposal, rank, ok := n.proposals.Best(round, 0, limit, n.submitted) switch { case ok && rank < n.rank: // Commit to a proposal with a higher rank. n.updateState(ctx, 0, rank, false) n.processProposal(ctx, proposal, rank, false) - case n.rank <= schedulerRank: + case n.rank <= limit: // Try to schedule a batch. n.updateState(ctx, 0, n.rank, false) n.scheduleBatch(ctx, round, flush) @@ -1525,12 +1631,15 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { case <-ctx.Done(): n.logger.Debug("exiting round, context canceled") return + case ev := <-n.evCh: + // Handle an event. + n.handleEvent(ctx, ev) case txs := <-n.txCh: // Check any queued transactions. n.handleNewCheckedTransactions(txs) - case discrepancy := <-n.discrepancyCh: - // Discrepancy has been detected. - n.handleDiscrepancy(ctx, discrepancy) + case txs := <-n.missingTxCh: + // Missing transactions fetched. + n.handleMissingTransactions(txs) case ec := <-n.ecCh: // Process observed executor commitments. n.handleObservedExecutorCommitment(ctx, ec) @@ -1538,25 +1647,12 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { case batch := <-n.processedBatchCh: // Batch processing has finished. n.handleProcessedBatch(ctx, batch) - case schedulerRank = <-schedulerRankTicker.C(): - // Scheduler rank decreased, try to schedule/process again. + case <-schedulerRankTicker.C: + // Change scheduler rank and try again. + schedulerRank++ n.logger.Debug("scheduler rank has changed", "rank", schedulerRank, ) - case ec := <-n.schedulerCommitmentCh: - // Pool rank increased, no need to schedule/process again. - if ec.Header.Header.Round != round { - continue - } - rank, ok := n.committee.SchedulerRank(round, ec.Header.SchedulerID) - if !ok { - continue - } - poolRank = rank - n.logger.Debug("pool rank has changed", - "rank", poolRank, - ) - continue case <-flushTimer.C: // Force scheduling for primary transaction scheduler. n.logger.Debug("scheduling is now forced") @@ -1583,25 +1679,24 @@ func NewNode( ctx, cancel := context.WithCancel(context.Background()) n := &Node{ - commonNode: commonNode, - commonCfg: commonCfg, - roleProvider: roleProvider, - committeeTopic: committeeTopic, - proposals: newPendingProposals(), - ctx: ctx, - cancelCtx: cancel, - stopCh: make(chan struct{}), - quitCh: make(chan struct{}), - initCh: make(chan struct{}), - state: StateWaitingForBatch{}, - txSync: txsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()), - stateTransitions: pubsub.NewBroker(false), - blockInfoCh: make(chan *runtime.BlockInfo, 1), - discrepancyCh: make(chan *discrepancyEvent, 1), - processedBatchCh: make(chan *processedBatch, 1), - schedulerCommitmentCh: make(chan *commitment.ExecutorCommitment, 1), - reselectCh: make(chan struct{}, 1), - logger: logging.GetLogger("worker/executor/committee").With("runtime_id", commonNode.Runtime.ID()), + commonNode: commonNode, + commonCfg: commonCfg, + roleProvider: roleProvider, + committeeTopic: committeeTopic, + proposals: newPendingProposals(), + ctx: ctx, + cancelCtx: cancel, + stopCh: make(chan struct{}), + quitCh: make(chan struct{}), + initCh: make(chan struct{}), + state: StateWaitingForBatch{}, + txSync: txsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()), + stateTransitions: pubsub.NewBroker(false), + blockInfoCh: make(chan *runtime.BlockInfo, 1), + processedBatchCh: make(chan *processedBatch, 1), + reselectCh: make(chan struct{}, 1), + missingTxCh: make(chan [][]byte, 1), + logger: logging.GetLogger("worker/executor/committee").With("runtime_id", commonNode.Runtime.ID()), } // Register prune handler. diff --git a/go/worker/compute/executor/committee/ticker.go b/go/worker/compute/executor/committee/ticker.go deleted file mode 100644 index 97ba8729c36..00000000000 --- a/go/worker/compute/executor/committee/ticker.go +++ /dev/null @@ -1,110 +0,0 @@ -package committee - -import ( - "time" -) - -// RankTicker represents an interface for rank ticker functionality. -type RankTicker interface { - // Start starts the ticker. - Start() - - // Stop stops the ticker. - Stop() - - // C returns a channel on which ranks will be sent. - C() <-chan uint64 -} - -type linearRankTicker struct { - c chan uint64 - - stop chan struct{} - done chan struct{} - - timestamp int64 - duration time.Duration - maxRank uint64 -} - -// NewLinearRankTicker returns a new linear rank ticker. -func NewLinearRankTicker(timestamp uint64, d time.Duration, maxRank uint64) RankTicker { - if d <= 0 { - panic("duration has to be a positive number") - } - - return &linearRankTicker{ - c: make(chan uint64, 1), - stop: make(chan struct{}), - done: make(chan struct{}), - timestamp: int64(timestamp), - duration: d, - maxRank: maxRank, - } -} - -func (t *linearRankTicker) C() <-chan uint64 { - return t.c -} - -func (t *linearRankTicker) Start() { - go t.start() -} - -func (t *linearRankTicker) start() { - defer close(t.done) - - var rank uint64 - sendRank := func() { - select { - case t.c <- rank: - default: - } - rank++ - } - - diff := time.Until(time.Unix(t.timestamp, 0)) - if diff < 0 { - rank = uint64(-diff / t.duration) - if rank > t.maxRank { - rank = t.maxRank - } - - sendRank() - if rank > t.maxRank { - return - } - - diff = t.duration + diff%t.duration - } - - timer := time.NewTimer(diff) - defer timer.Stop() - - select { - case <-timer.C: - case <-t.stop: - return - } - - ticker := time.NewTicker(t.duration) - defer ticker.Stop() - - for { - sendRank() - if rank > t.maxRank { - return - } - - select { - case <-ticker.C: - case <-t.stop: - return - } - } -} - -func (t *linearRankTicker) Stop() { - close(t.stop) - <-t.done -} diff --git a/go/worker/compute/executor/committee/transactions.go b/go/worker/compute/executor/committee/transactions.go index 7f5ebd1d88f..c47be8db952 100644 --- a/go/worker/compute/executor/committee/transactions.go +++ b/go/worker/compute/executor/committee/transactions.go @@ -31,6 +31,30 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio state.bytes += uint64(tx.Size()) } + n.checkWaitingForTxsState(&state) +} + +func (n *Node) handleMissingTransactions(txs [][]byte) { + state, ok := n.state.(StateWaitingForTxs) + if !ok { + return + } + + for _, tx := range txs { + h := hash.NewFromBytes(tx) + idx, ok := state.txs[h] + if !ok { + continue + } + delete(state.txs, h) + state.batch[idx] = tx + state.bytes += uint64(len(tx)) + } + + n.checkWaitingForTxsState(&state) +} + +func (n *Node) checkWaitingForTxsState(state *StateWaitingForTxs) { if len(state.txs) == 0 { n.logger.Info("received all transactions needed for batch processing") } @@ -42,6 +66,8 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio } func (n *Node) requestMissingTransactions(ctx context.Context, txHashes []hash.Hash) { + txs := make([][]byte, 0, len(txHashes)) + requestOp := func() error { if len(txHashes) == 0 { return nil @@ -68,6 +94,8 @@ func (n *Node) requestMissingTransactions(ctx context.Context, txHashes []hash.H ) } + txs = append(txs, rsp.Txs...) + // Queue all transactions in the transaction pool. n.commonNode.TxPool.SubmitProposedBatch(rsp.Txs) @@ -93,5 +121,10 @@ func (n *Node) requestMissingTransactions(ctx context.Context, txHashes []hash.H return } - n.logger.Info("received all transactions needed for batch processing") + n.logger.Info("fetched all transactions needed for batch processing") + + select { + case n.missingTxCh <- txs: + case <-ctx.Done(): + } } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index f1de5131a5c..4734431463e 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -358,11 +358,6 @@ func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { n.blockCh.In() <- bi.RuntimeBlock } -// HandleNewEventLocked is guarded by CrossNode. -func (n *Node) HandleNewEventLocked(*roothashApi.Event) { - // Nothing to do here. -} - // HandleRuntimeHostEventLocked is guarded by CrossNode. func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { // Nothing to do here.