Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-activate txpool API #1390

Merged
merged 6 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
database::Database,
error::{EthApiError, EthereumDataFormatError, KakarotError, SignatureError},
provider::{EthApiResult, EthDataProvider},
TxPoolProvider,
},
sn_provider::StarknetProvider,
},
Expand All @@ -16,12 +17,14 @@ use alloy_rlp::Decodable;
use async_trait::async_trait;
use num_traits::ToPrimitive;
use reth_chainspec::ChainSpec;
use reth_primitives::{Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_primitives::{Address, Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_rpc_types::{txpool::TxpoolContent, Transaction};
use reth_transaction_pool::{
blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig, TransactionOrigin, TransactionPool,
blobstore::NoopBlobStore, AllPoolTransactions, EthPooledTransaction, PoolConfig, PoolTransaction,
TransactionOrigin, TransactionPool,
};
use starknet::{core::types::Felt, providers::Provider};
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

#[async_trait]
pub trait KakarotTransactions {
Expand Down Expand Up @@ -109,3 +112,35 @@ where
Ok(hash)
}
}

#[async_trait]
impl<SP> TxPoolProvider for EthClient<SP>
where
SP: starknet::providers::Provider + Send + Sync,
{
fn content(&self) -> TxpoolContent {
#[inline]
fn insert<T: PoolTransaction>(tx: &T, content: &mut BTreeMap<Address, BTreeMap<String, Transaction>>) {
content.entry(tx.sender()).or_default().insert(
tx.nonce().to_string(),
reth_rpc_types_compat::transaction::from_recovered(tx.to_recovered_transaction()),
);
}

let AllPoolTransactions { pending, queued } = self.pool.all_transactions();

let mut content = TxpoolContent::default();
for pending in pending {
insert(&pending.transaction, &mut content.pending);
}
for queued in queued {
insert(&queued.transaction, &mut content.queued);
}

content
}

async fn txpool_content(&self) -> EthApiResult<TxpoolContent> {
Ok(self.content())
}
}
2 changes: 1 addition & 1 deletion src/eth_rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ where
let eth_provider = eth_client.eth_provider().clone();

let alchemy_provider = Arc::new(AlchemyDataProvider::new(eth_provider.clone()));
let pool_provider = Arc::new(PoolDataProvider::new(eth_provider.clone()));
let pool_provider = Arc::new(PoolDataProvider::new(eth_client.clone()));
let debug_provider = Arc::new(DebugDataProvider::new(eth_provider.clone()));

let eth_rpc_module = EthRpc::new(eth_client).into_rpc();
Expand Down
17 changes: 4 additions & 13 deletions src/providers/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use crate::{
into_via_try_wrapper, into_via_wrapper,
models::block::{EthBlockId, EthBlockNumberOrTag},
providers::{
eth_provider::{
BlockProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider,
TxPoolProvider,
},
eth_provider::{BlockProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider},
sn_provider::StarknetProvider,
},
};
Expand Down Expand Up @@ -47,18 +44,12 @@ pub type EthApiResult<T> = Result<T, EthApiError>;

/// A trait that defines the interface for an Ethereum Provider.
pub trait EthereumProvider:
GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + TxPoolProvider + BlockProvider
GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + BlockProvider
{
}

impl<T> EthereumProvider for T where
T: GasProvider
+ StateProvider
+ TransactionProvider
+ ReceiptProvider
+ LogProvider
+ TxPoolProvider
+ BlockProvider
T: GasProvider + StateProvider + TransactionProvider + ReceiptProvider + LogProvider + BlockProvider
{
}

Expand All @@ -69,7 +60,7 @@ impl<T> EthereumProvider for T where
pub struct EthDataProvider<SP: starknet::providers::Provider + Send + Sync> {
database: Database,
starknet_provider: StarknetProvider<SP>,
pub(crate) chain_id: u64,
pub chain_id: u64,
}

impl<SP> EthDataProvider<SP>
Expand Down
25 changes: 3 additions & 22 deletions src/providers/eth_provider/tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
use crate::providers::eth_provider::provider::{EthApiResult, EthDataProvider};
use crate::providers::eth_provider::provider::EthApiResult;
use async_trait::async_trait;
use auto_impl::auto_impl;
use mongodb::bson::doc;
use reth_rpc_types::{txpool::TxpoolContent, Transaction};
use reth_rpc_types::txpool::TxpoolContent;

/// Ethereum provider trait. Used to abstract away the database and the network.
#[async_trait]
#[auto_impl(Arc, &)]
pub trait TxPoolProvider {
/// Returns a vec of pending pool transactions.
async fn txpool_transactions(&self) -> EthApiResult<Vec<Transaction>>;
fn content(&self) -> TxpoolContent;

/// Returns the content of the pending pool.
async fn txpool_content(&self) -> EthApiResult<TxpoolContent>;
}

#[async_trait]
impl<SP> TxPoolProvider for EthDataProvider<SP>
where
SP: starknet::providers::Provider + Send + Sync,
{
async fn txpool_transactions(&self) -> EthApiResult<Vec<Transaction>> {
// let span = tracing::span!(tracing::Level::INFO, "sn::txpool");
// TODO: we need certainly to move this implementation and rely on the mempool to check this
Ok(vec![])
}

async fn txpool_content(&self) -> EthApiResult<TxpoolContent> {
Ok(self.txpool_transactions().await?.into_iter().fold(TxpoolContent::default(), |mut content, pending| {
content.pending.entry(pending.from).or_default().insert(pending.nonce.to_string(), pending);
content
}))
}
}
51 changes: 35 additions & 16 deletions src/providers/pool_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::providers::eth_provider::provider::{EthApiResult, EthereumProvider};
use super::eth_provider::TxPoolProvider;
use crate::providers::eth_provider::provider::EthApiResult;
use async_trait::async_trait;
use auto_impl::auto_impl;
use reth_primitives::Address;
Expand All @@ -14,18 +15,18 @@ pub trait PoolProvider {
}

#[derive(Debug, Clone)]
pub struct PoolDataProvider<P: EthereumProvider> {
pub struct PoolDataProvider<P: TxPoolProvider> {
eth_provider: P,
}

impl<P: EthereumProvider> PoolDataProvider<P> {
impl<P: TxPoolProvider> PoolDataProvider<P> {
pub const fn new(eth_provider: P) -> Self {
Self { eth_provider }
}
}

#[async_trait]
impl<P: EthereumProvider + Send + Sync + 'static> PoolProvider for PoolDataProvider<P> {
impl<P: TxPoolProvider + Send + Sync + 'static> PoolProvider for PoolDataProvider<P> {
async fn txpool_status(&self) -> EthApiResult<TxpoolStatus> {
let all = self.eth_provider.txpool_content().await?;
Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
Expand All @@ -34,18 +35,36 @@ impl<P: EthereumProvider + Send + Sync + 'static> PoolProvider for PoolDataProvi
async fn txpool_inspect(&self) -> EthApiResult<TxpoolInspect> {
let mut inspect = TxpoolInspect::default();

let transactions = self.eth_provider.txpool_transactions().await?;

for transaction in transactions {
inspect.pending.entry(transaction.from).or_default().insert(
transaction.nonce.to_string(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
let transactions = self.eth_provider.content();

// Organize the pending transactions in the inspect summary struct.
for (sender, nonce_transaction) in transactions.pending {
for (nonce, transaction) in nonce_transaction {
inspect.pending.entry((*sender).into()).or_default().insert(
nonce.clone(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
}
}

// Organize the queued transactions in the inspect summary struct.
for (sender, nonce_transaction) in transactions.queued {
for (nonce, transaction) in nonce_transaction {
inspect.queued.entry((*sender).into()).or_default().insert(
nonce.clone(),
TxpoolInspectSummary {
to: transaction.to,
value: transaction.value,
gas: transaction.gas,
gas_price: transaction.gas_price.unwrap_or_default(),
},
);
}
}

Ok(inspect)
Expand Down
8 changes: 8 additions & 0 deletions src/test_utils/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ pub async fn katana() -> Katana {
Katana::new(RANDOM_BYTES_SIZE).await
}

/// This fixture creates a new test environment on Katana.
#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
#[fixture]
pub async fn katana_empty() -> Katana {
// Create a new test environment on Katana
Katana::new_empty().await
}

/// This fixture configures the tests. The following setup
/// is used:
/// - The log level is set to `info`
Expand Down
54 changes: 54 additions & 0 deletions src/test_utils/katana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,60 @@ impl<'a> Katana {
Self::initialize(sequencer, starknet_provider, rnd_bytes_size).await
}

#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
pub async fn new_empty() -> Self {
use reth_primitives::{constants::EMPTY_ROOT_HASH, B64, U256};

let sequencer = katana_sequencer().await;
let starknet_provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url())));

// Load the private key from the environment variables.
dotenvy::dotenv().expect("Failed to load .env file");
let pk = std::env::var("EVM_PRIVATE_KEY").expect("Failed to get EVM private key");
let pk = B256::from_str(&pk).expect("Failed to parse EVM private key");

// Set the relayer private key in the environment variables.
std::env::set_var("RELAYER_PRIVATE_KEY", format!("0x{:x}", sequencer.raw_account().private_key));

// Initialize a MongoFuzzer instance with the specified random bytes size.
let mut mongo_fuzzer = MongoFuzzer::new(0).await;
mongo_fuzzer.headers.push(StoredHeader {
header: Header {
hash: Some(B256::random()),
total_difficulty: Some(U256::default()),
mix_hash: Some(B256::default()),
nonce: Some(B64::default()),
withdrawals_root: Some(EMPTY_ROOT_HASH),
base_fee_per_gas: Some(0),
blob_gas_used: Some(0),
excess_blob_gas: Some(0),
number: Some(0),
..Default::default()
},
});

// Finalize the empty MongoDB database initialization and get the database instance.
let database = mongo_fuzzer.finalize().await;

// Initialize the EthClient
let eth_client = EthClient::try_new(starknet_provider, database).await.expect("failed to start eth client");

// Create a new Kakarot EOA instance with the private key and EthDataProvider instance.
let eoa = KakarotEOA::new(pk, Arc::new(eth_client.clone()));

// Return a new instance of Katana with initialized fields.
Self {
sequencer,
eoa,
eth_client,
container: Some(mongo_fuzzer.container),
transactions: mongo_fuzzer.transactions,
receipts: mongo_fuzzer.receipts,
logs: mongo_fuzzer.logs,
headers: mongo_fuzzer.headers,
}
}

/// Initializes the Katana test environment.
#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
async fn initialize(
Expand Down
13 changes: 2 additions & 11 deletions src/test_utils/mock_provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::providers::eth_provider::{
provider::EthApiResult, BlockProvider, ChainProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider,
TransactionProvider, TxPoolProvider,
TransactionProvider,
};
use async_trait::async_trait;
use mockall::mock;
use reth_primitives::{Address, BlockId, BlockNumberOrTag, Bytes, B256, U256, U64};
use reth_rpc_types::{
txpool::TxpoolContent, Filter, FilterChanges, Header, SyncStatus, TransactionReceipt, TransactionRequest,
};
use reth_rpc_types::{Filter, FilterChanges, Header, SyncStatus, TransactionReceipt, TransactionRequest};

mock! {
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -80,11 +78,4 @@ mock! {

async fn transaction_count(&self, address: Address, block_id: Option<BlockId>) -> EthApiResult<U256>;
}

#[async_trait]
impl TxPoolProvider for EthereumProviderStruct {
async fn txpool_transactions(&self) -> EthApiResult<Vec<reth_rpc_types::Transaction>>;

async fn txpool_content(&self) -> EthApiResult<TxpoolContent>;
}
}
2 changes: 1 addition & 1 deletion tests/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async fn test_mempool_get_private_transactions(#[future] katana: Katana, _setup:
}

// Helper function to create a sample transaction
async fn create_sample_transactions(
pub async fn create_sample_transactions(
katana: &Katana,
num_transactions: usize,
) -> Result<Vec<(EthPooledTransaction, TransactionSigned)>, SignatureError> {
Expand Down
Loading
Loading