Skip to content

Commit

Permalink
refactor(consensus): get_proposal + propose -> repropose
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Oct 14, 2024
1 parent 033ec45 commit 3b2d9f8
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 68 deletions.
8 changes: 4 additions & 4 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ mock! {
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;

async fn get_proposal(
&self,
height: BlockNumber,
async fn repropose(
&mut self,
id: ProposalContentId,
) -> mpsc::Receiver<Transaction>;
init: ProposalInit,
);

async fn validators(&self, height: BlockNumber) -> Vec<ValidatorId>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,13 @@ impl SingleHeightConsensus {
.expect("proposals should have proposal for valid_round")
.expect("proposal should not be None");
assert_eq!(id, block_hash, "proposal should match the stored proposal");
let content_receiver = context.get_proposal(self.height, id).await;
let init = ProposalInit {
height: self.height,
round,
proposer: self.id,
valid_round: Some(valid_round),
};
let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(id).expect("Send should succeed");
context
.propose(init, content_receiver, fin_receiver)
.await
.expect("Failed broadcasting Proposal");
context.repropose(id, init).await;
let old = self.proposals.insert(round, Some(block_hash));
assert!(old.is_none(), "There should be no entry for this round.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ async fn repropose() {
});
let fin_receiver = Arc::new(OnceLock::new());
let fin_receiver_clone = Arc::clone(&fin_receiver);
context.expect_propose().times(2).returning(move |init, _, fin_receiver| {
context.expect_propose().times(1).returning(move |init, _, fin_receiver| {
// Ignore content receiver, since this is the context's responsibility.
assert_eq!(init.height, BlockNumber(0));
assert_eq!(init.proposer, *PROPOSER_ID);
Expand Down Expand Up @@ -438,11 +438,9 @@ async fn repropose() {
shc.handle_message(&mut context, precommits[0].clone()).await.unwrap();
shc.handle_message(&mut context, precommits[1].clone()).await.unwrap();
// After NIL precommits, the proposer should re-propose.
context.expect_get_proposal().returning(move |height, id| {
assert!(height == BlockNumber(0));
context.expect_repropose().returning(move |id, init| {
assert_eq!(init.height, BlockNumber(0));
assert_eq!(id, BLOCK.id);
let (_content_sender, content_receiver) = mpsc::channel(1);
content_receiver
});
context
.expect_broadcast()
Expand Down
8 changes: 4 additions & 4 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ mock! {
content: mpsc::Receiver<u32>
) -> oneshot::Receiver<ProposalContentId>;

async fn get_proposal(
&self,
height: BlockNumber,
async fn repropose(
&mut self,
id: ProposalContentId,
) -> mpsc::Receiver<u32>;
init: ProposalInit,
);

async fn validators(&self, height: BlockNumber) -> Vec<ValidatorId>;

Expand Down
14 changes: 3 additions & 11 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,12 @@ pub trait ConsensusContext {
) -> oneshot::Receiver<ProposalContentId>;

/// This function is called by consensus to retrieve the content of a previously built or
/// validated proposal. It expects that this call will return immediately, allowing
/// consensus to stream the block's content.
/// validated proposal. It broadcasts the proposal to the network.
///
/// Params:
/// - `height`: The height of the block that was built or validated.
/// - `id`: The `ProposalContentId` associated with the block's content.
///
/// Returns:
/// - A receiver for the stream of the block's content.
async fn get_proposal(
&self,
height: BlockNumber,
id: ProposalContentId,
) -> mpsc::Receiver<Self::ProposalChunk>;
/// - `init`: The `ProposalInit` that is broadcast to the network.
async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit);

/// Get the set of validators for a given height. These are the nodes that can propose and vote
/// on blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

// TODO: add debug messages and span to the tasks.

Expand Down Expand Up @@ -115,7 +115,7 @@ impl ConsensusContext for PapyrusConsensusContext {

proposals.entry(height).or_default().insert(block_hash, transactions);
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `get_proposal`.
// insertion and calls to `repropose`.
fin_sender.send(block_hash).expect("Send should succeed");
}
.instrument(debug_span!("consensus_build_proposal")),
Expand Down Expand Up @@ -177,7 +177,7 @@ impl ConsensusContext for PapyrusConsensusContext {

proposals.entry(height).or_default().insert(block_hash, transactions);
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `get_proposal`.
// insertion and calls to `repropose`.
// This can happen as a result of sync interrupting `run_height`.
fin_sender.send(block_hash).unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
Expand All @@ -189,34 +189,29 @@ impl ConsensusContext for PapyrusConsensusContext {
fin_receiver
}

async fn get_proposal(
&self,
height: BlockNumber,
id: ProposalContentId,
) -> mpsc::Receiver<Transaction> {
let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE);
async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit) {
let valid_proposals = Arc::clone(&self.valid_proposals);
tokio::spawn(async move {
let transactions = {
let valid_proposals_lock = valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");
let Some(proposals_at_height) = valid_proposals_lock.get(&height) else {
error!("No proposals found for height {height}");
return;
};
let Some(transactions) = proposals_at_height.get(&id) else {
error!("No proposal found for height {height} and id {id}");
return;
};
transactions.clone()
};
for tx in transactions.clone() {
sender.try_send(tx).expect("Send should succeed");
}
sender.close_channel();
});
receiver
let transactions = valid_proposals
.lock()
.expect("valid_proposals lock was poisoned")
.get(&init.height)
.unwrap_or_else(|| panic!("No proposals found for height {}", init.height))
.get(&id)
.unwrap_or_else(|| panic!("No proposal found for height {} and id {}", init.height, id))
.clone();

let proposal = Proposal {
height: init.height.0,
round: init.round,
proposer: init.proposer,
transactions,
block_hash: id,
valid_round: init.valid_round,
};
self.network_broadcast_client
.broadcast_message(ConsensusMessage::Proposal(proposal))
.await
.expect("Failed to send proposal");
}

async fn validators(&self, _height: BlockNumber) -> Vec<ValidatorId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ impl ConsensusContext for SequencerConsensusContext {
todo!()
}

async fn get_proposal(
&self,
_height: BlockNumber,
_id: ProposalContentId,
) -> mpsc::Receiver<Self::ProposalChunk> {
async fn repropose(&mut self, _id: ProposalContentId, _init: ProposalInit) {
todo!()
}

Expand Down Expand Up @@ -251,13 +247,12 @@ async fn stream_build_proposal(
GetProposalContent::Finished(id) => {
let proposal_content_id = BlockHash(id.state_diff_commitment.0.0);
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// with `repropose` being called before `valid_proposals` is updated.
let mut valid_proposals = valid_proposals.lock().expect("Lock was poisoned");
valid_proposals
.entry(height)
.or_default()
.insert(proposal_content_id, (content, proposal_id));

if fin_sender.send(proposal_content_id).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content id");
Expand Down

0 comments on commit 3b2d9f8

Please sign in to comment.