Skip to content

Commit

Permalink
feat(mempool_p2p): impl handle request for sender
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Oct 13, 2024
1 parent cd93544 commit db8da61
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 5 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions crates/mempool_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ papyrus_protobuf.workspace = true
starknet_gateway_types.workspace = true
starknet_mempool_infra.workspace = true
starknet_mempool_p2p_types.workspace = true

[dev-dependencies]
futures.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_network_types = { workspace = true, features = ["testing"] }
papyrus_test_utils.workspace = true
rand_chacha.workspace = true
starknet_api.workspace = true
tokio = { workspace = true, features = ["full", "sync", "test-util"] }
28 changes: 24 additions & 4 deletions crates/mempool_p2p/src/sender/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#[cfg(test)]
mod test;

use async_trait::async_trait;
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use starknet_mempool_infra::component_definitions::ComponentRequestHandler;
use starknet_mempool_p2p_types::communication::{
MempoolP2pSenderRequest,
MempoolP2pSenderResponse,
};
use starknet_mempool_p2p_types::errors::MempoolP2pSenderError;

pub struct MempoolP2pSender {
#[allow(dead_code)]
broadcast_topic_client: BroadcastTopicClient<RpcTransactionWrapper>,
}

Expand All @@ -24,8 +27,25 @@ impl ComponentRequestHandler<MempoolP2pSenderRequest, MempoolP2pSenderResponse>
{
async fn handle_request(
&mut self,
_request: MempoolP2pSenderRequest,
request: MempoolP2pSenderRequest,
) -> MempoolP2pSenderResponse {
unimplemented!()
match request {
MempoolP2pSenderRequest::AddTransaction(transaction) => {
let result = self
.broadcast_topic_client
.broadcast_message(RpcTransactionWrapper(transaction))
.await
.map_err(|_| MempoolP2pSenderError::NetworkSendError);
MempoolP2pSenderResponse::AddTransaction(result)
}
MempoolP2pSenderRequest::ContinuePropagation(propagation_manager) => {
let result = self
.broadcast_topic_client
.continue_propagation(&propagation_manager)
.await
.map_err(|_| MempoolP2pSenderError::NetworkSendError);
MempoolP2pSenderResponse::ContinuePropagation(result)
}
}
}
}
50 changes: 50 additions & 0 deletions crates/mempool_p2p/src/sender/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use futures::stream::StreamExt;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
TestSubscriberChannels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_network_types::network_types::BroadcastedMessageManager;
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_mempool_infra::component_definitions::ComponentRequestHandler;
use starknet_mempool_p2p_types::communication::MempoolP2pSenderRequest;
use tokio::time::timeout;

use crate::sender::MempoolP2pSender;

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);

#[tokio::test]
async fn process_handle_add_tx() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let BroadcastNetworkMock { mut messages_to_broadcast_receiver, .. } = mock_network;
let rpc_transaction = RpcTransaction::get_test_instance(&mut get_rng());
let mut mempool_sender = MempoolP2pSender::new(broadcast_topic_client);
mempool_sender
.handle_request(MempoolP2pSenderRequest::AddTransaction(rpc_transaction.clone()))
.await;
let message = timeout(TIMEOUT, messages_to_broadcast_receiver.next()).await.unwrap().unwrap();
assert_eq!(message, RpcTransactionWrapper(rpc_transaction));
}

#[tokio::test]
async fn process_handle_continue_propagation() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let BroadcastNetworkMock { mut continue_propagation_receiver, .. } = mock_network;
let propagation_manager = BroadcastedMessageManager::get_test_instance(&mut get_rng());
let mut mempool_sender = MempoolP2pSender::new(broadcast_topic_client);
mempool_sender
.handle_request(MempoolP2pSenderRequest::ContinuePropagation(propagation_manager.clone()))
.await;
let message = timeout(TIMEOUT, continue_propagation_receiver.next()).await.unwrap().unwrap();
assert_eq!(message, propagation_manager);
}
5 changes: 4 additions & 1 deletion crates/mempool_p2p_types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ use thiserror::Error;

// This error is defined even though it's empty to be compatible with the other components.
#[derive(Debug, Error, Serialize, Deserialize, Clone)]
pub enum MempoolP2pSenderError {}
pub enum MempoolP2pSenderError {
#[error("Sender request error")]
NetworkSendError,
}

0 comments on commit db8da61

Please sign in to comment.