diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 86a905b982..27871b129e 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -58,11 +58,11 @@ mock! { content: mpsc::Receiver ) -> oneshot::Receiver; - async fn get_proposal( - &self, - height: BlockNumber, + async fn repropose( + &mut self, id: ProposalContentId, - ) -> mpsc::Receiver; + init: ProposalInit, + ); async fn validators(&self, height: BlockNumber) -> Vec; diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index fa96819fbd..9d2253a9e9 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -384,19 +384,13 @@ impl SingleHeightConsensus { .expect("proposals should have proposal for valid_round") .expect("proposal should not be None"); assert_eq!(id, block_hash, "proposal should match the stored proposal"); - let content_receiver = context.get_proposal(self.height, id).await; let init = ProposalInit { height: self.height, round, proposer: self.id, valid_round: Some(valid_round), }; - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(id).expect("Send should succeed"); - context - .propose(init, content_receiver, fin_receiver) - .await - .expect("Failed broadcasting Proposal"); + context.repropose(id, init).await; let old = self.proposals.insert(round, Some(block_hash)); assert!(old.is_none(), "There should be no entry for this round."); } diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 07669527ca..09e946bc06 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -397,7 +397,7 @@ async fn repropose() { }); let fin_receiver = Arc::new(OnceLock::new()); let fin_receiver_clone = Arc::clone(&fin_receiver); - context.expect_propose().times(2).returning(move |init, _, fin_receiver| { + context.expect_propose().times(1).returning(move |init, _, fin_receiver| { // Ignore content receiver, since this is the context's responsibility. assert_eq!(init.height, BlockNumber(0)); assert_eq!(init.proposer, *PROPOSER_ID); @@ -438,11 +438,9 @@ async fn repropose() { shc.handle_message(&mut context, precommits[0].clone()).await.unwrap(); shc.handle_message(&mut context, precommits[1].clone()).await.unwrap(); // After NIL precommits, the proposer should re-propose. - context.expect_get_proposal().returning(move |height, id| { - assert!(height == BlockNumber(0)); + context.expect_repropose().returning(move |id, init| { + assert_eq!(init.height, BlockNumber(0)); assert_eq!(id, BLOCK.id); - let (_content_sender, content_receiver) = mpsc::channel(1); - content_receiver }); context .expect_broadcast() diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index d6dad7357b..9723eae71a 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -43,11 +43,11 @@ mock! { content: mpsc::Receiver ) -> oneshot::Receiver; - async fn get_proposal( - &self, - height: BlockNumber, + async fn repropose( + &mut self, id: ProposalContentId, - ) -> mpsc::Receiver; + init: ProposalInit, + ); async fn validators(&self, height: BlockNumber) -> Vec; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index c8298fec09..4a8888c028 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -75,20 +75,12 @@ pub trait ConsensusContext { ) -> oneshot::Receiver; /// This function is called by consensus to retrieve the content of a previously built or - /// validated proposal. It expects that this call will return immediately, allowing - /// consensus to stream the block's content. + /// validated proposal. It broadcasts the proposal to the network. /// /// Params: - /// - `height`: The height of the block that was built or validated. /// - `id`: The `ProposalContentId` associated with the block's content. - /// - /// Returns: - /// - A receiver for the stream of the block's content. - async fn get_proposal( - &self, - height: BlockNumber, - id: ProposalContentId, - ) -> mpsc::Receiver; + /// - `init`: The `ProposalInit` that is broadcast to the network. + async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit); /// Get the set of validators for a given height. These are the nodes that can propose and vote /// on blocks. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 2fb0d93e84..5db3455db8 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -31,7 +31,7 @@ use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; // TODO: add debug messages and span to the tasks. @@ -115,7 +115,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposals.entry(height).or_default().insert(block_hash, transactions); // Done after inserting the proposal into the map to avoid race conditions between - // insertion and calls to `get_proposal`. + // insertion and calls to `repropose`. fin_sender.send(block_hash).expect("Send should succeed"); } .instrument(debug_span!("consensus_build_proposal")), @@ -177,7 +177,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposals.entry(height).or_default().insert(block_hash, transactions); // Done after inserting the proposal into the map to avoid race conditions between - // insertion and calls to `get_proposal`. + // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. fin_sender.send(block_hash).unwrap_or_else(|_| { warn!("Failed to send block to consensus. height={height}"); @@ -189,34 +189,29 @@ impl ConsensusContext for PapyrusConsensusContext { fin_receiver } - async fn get_proposal( - &self, - height: BlockNumber, - id: ProposalContentId, - ) -> mpsc::Receiver { - let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE); + async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit) { let valid_proposals = Arc::clone(&self.valid_proposals); - tokio::spawn(async move { - let transactions = { - let valid_proposals_lock = valid_proposals - .lock() - .expect("Lock on active proposals was poisoned due to a previous panic"); - let Some(proposals_at_height) = valid_proposals_lock.get(&height) else { - error!("No proposals found for height {height}"); - return; - }; - let Some(transactions) = proposals_at_height.get(&id) else { - error!("No proposal found for height {height} and id {id}"); - return; - }; - transactions.clone() - }; - for tx in transactions.clone() { - sender.try_send(tx).expect("Send should succeed"); - } - sender.close_channel(); - }); - receiver + let transactions = valid_proposals + .lock() + .expect("valid_proposals lock was poisoned") + .get(&init.height) + .unwrap_or_else(|| panic!("No proposals found for height {}", init.height)) + .get(&id) + .unwrap_or_else(|| panic!("No proposal found for height {} and id {}", init.height, id)) + .clone(); + + let proposal = Proposal { + height: init.height.0, + round: init.round, + proposer: init.proposer, + transactions, + block_hash: id, + valid_round: init.valid_round, + }; + self.network_broadcast_client + .broadcast_message(ConsensusMessage::Proposal(proposal)) + .await + .expect("Failed to send proposal"); } async fn validators(&self, _height: BlockNumber) -> Vec { diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index f0b180ca3c..01576edb15 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -134,11 +134,7 @@ impl ConsensusContext for SequencerConsensusContext { todo!() } - async fn get_proposal( - &self, - _height: BlockNumber, - _id: ProposalContentId, - ) -> mpsc::Receiver { + async fn repropose(&mut self, _id: ProposalContentId, _init: ProposalInit) { todo!() } @@ -251,13 +247,12 @@ async fn stream_build_proposal( GetProposalContent::Finished(id) => { let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); // Update valid_proposals before sending fin to avoid a race condition - // with `get_proposal` being called before `valid_proposals` is updated. + // with `repropose` being called before `valid_proposals` is updated. let mut valid_proposals = valid_proposals.lock().expect("Lock was poisoned"); valid_proposals .entry(height) .or_default() .insert(proposal_content_id, (content, proposal_id)); - if fin_sender.send(proposal_content_id).is_err() { // Consensus may exit early (e.g. sync). warn!("Failed to send proposal content id");