Skip to content

Commit

Permalink
fix: bench
Browse files Browse the repository at this point in the history
  • Loading branch information
vivekpal1 committed Sep 6, 2024
1 parent a18746b commit 0b6b659
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 61 deletions.
221 changes: 163 additions & 58 deletions benches/indexer_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,179 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use cypher_indexer::{
config::Config,
storage::Storage,
processing::Processor,
};
use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
system_instruction,
transaction::Transaction,
};
use solana_transaction_status::{
EncodedConfirmedTransaction,
TransactionStatusMeta,
processing::{
account_parser::AccountParser, event_parser::EventParser,
instruction_parser::InstructionParser, Processor,
},
storage::{ipfs::IpfsStorage, Storage},
};
use solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, transaction::Transaction};
use solana_transaction_status::{EncodedConfirmedTransaction, UiConfirmedBlock};
use std::collections::HashMap;
use tokio::runtime::Runtime;

fn create_mock_config() -> Config {
Config {
solana_rpc_url: "https://api.devnet.solana.com".to_string(),
clickhouse_url: "http://localhost:8123".to_string(),
scylla_nodes: vec!["127.0.0.1:9042".to_string()],
redis_url: "redis://localhost:6379".to_string(),
ipfs_api_url: "http://localhost:5001".to_string(),
wasm_module_path: "./wasm/cypher_indexer.wasm".to_string(),
rpc_poll_interval: 1,
websocket_url: "wss://api.devnet.solana.com".to_string(),
geyser_plugin_config: GeyserPluginConfig {
libpath: "/path/to/mock/libsolana_geyser_plugin.so".to_string(),
accounts_selector: AccountsSelector {
owners: vec!["CyphrkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string()],
},
},
wasm_modules: Some(HashMap::from([(
"test_module".to_string(),
"./tests/test_module.wasm".to_string(),
)])),
wasm_memory_limit: Some(1024 * 1024),
wasm_execution_timeout: Some(5),
}
}

fn create_mock_storage() -> Storage {
let config = create_mock_config();
let rt = Runtime::new().unwrap();
rt.block_on(async { Storage::new(&config).await.unwrap() })
}

fn create_mock_ipfs_storage() -> IpfsStorage {
IpfsStorage::new("http://localhost:5001")
}

fn benchmark_processor_new(c: &mut Criterion) {
let storage = create_mock_storage();
let ipfs_storage = create_mock_ipfs_storage();

c.bench_function("Processor::new", |b| {
b.iter(|| Processor::new(black_box(storage.clone()), black_box(ipfs_storage.clone())))
});
}

fn benchmark_process_transaction(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async {
let config = Config::load().unwrap();
let storage = Storage::new(&config).await.unwrap();
let processor = Processor::new(storage);

let payer = Keypair::new();
let to = Pubkey::new_unique();
let lamports = 1_000_000;
let recent_blockhash = solana_sdk::hash::Hash::default();

let tx = Transaction::new_signed_with_payer(
&[system_instruction::transfer(&payer.pubkey(), &to, lamports)],
Some(&payer.pubkey()),
&[&payer],
recent_blockhash,
);

let encoded_tx = EncodedConfirmedTransaction {
slot: 12345,
transaction: tx.encode(),
meta: Some(TransactionStatusMeta::default()),
};

c.bench_function("process_transaction", |b| {
b.to_async(&rt).iter(|| async {
black_box(processor.process_transaction(encoded_tx.clone(), 12345).await)
let storage = create_mock_storage();
let ipfs_storage = create_mock_ipfs_storage();
let processor = Processor::new(storage, ipfs_storage);

let mock_transaction = EncodedConfirmedTransaction {
slot: 1000,
transaction: Transaction::default(),
meta: None,
};

c.bench_function("Processor::process_transaction", |b| {
b.iter(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
processor
.process_transaction(black_box(mock_transaction.clone()), black_box(1000))
.await
})
});
})
});
}

fn benchmark_process_account_update(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
fn benchmark_process_block(c: &mut Criterion) {
let storage = create_mock_storage();
let ipfs_storage = create_mock_ipfs_storage();
let processor = Processor::new(storage, ipfs_storage);

rt.block_on(async {
let config = Config::load().unwrap();
let storage = Storage::new(&config).await.unwrap();
let processor = Processor::new(storage);
let mock_block = UiConfirmedBlock {
previous_blockhash: "11111111111111111111111111111111".to_string(),
blockhash: "22222222222222222222222222222222".to_string(),
parent_slot: 999,
transactions: vec![Some(EncodedConfirmedTransaction {
slot: 1000,
transaction: Transaction::default(),
meta: None,
})],
rewards: None,
block_time: Some(1623456789),
block_height: Some(1000),
};

let pubkey = Pubkey::new_unique();
let owner = Pubkey::new_unique();
let data = vec![0, 1, 2, 3, 4];
let slot = 12345;

c.bench_function("process_account_update", |b| {
b.to_async(&rt).iter(|| async {
black_box(processor.process_account_update(pubkey, data.clone(), owner, slot).await)
c.bench_function("Processor::process_block", |b| {
b.iter(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
processor
.process_block(black_box(mock_block.clone()), black_box(1000))
.await
})
});
})
});
}

criterion_group!(benches, benchmark_process_transaction, benchmark_process_account_update);
criterion_main!(benches);
fn benchmark_account_parser(c: &mut Criterion) {
let account_parser = AccountParser::new();
let mock_pubkey = Pubkey::new_unique();
let mock_data = vec![0u8; 100];
let mock_owner = Pubkey::new_unique();

c.bench_function("AccountParser::parse_account", |b| {
b.iter(|| {
account_parser.parse_account(
black_box(&mock_pubkey),
black_box(&mock_data),
black_box(&mock_owner),
)
})
});
}

fn benchmark_instruction_parser(c: &mut Criterion) {
let instruction_parser = InstructionParser::new();
let mock_program_id = Pubkey::new_unique();
let mock_instruction = CompiledInstruction {
program_id_index: 0,
accounts: vec![0, 1, 2],
data: vec![0u8; 32],
};

c.bench_function("InstructionParser::parse_instruction", |b| {
b.iter(|| {
instruction_parser
.parse_instruction(black_box(&mock_program_id), black_box(&mock_instruction))
})
});
}

fn benchmark_event_parser(c: &mut Criterion) {
let event_parser = EventParser::new();
let mock_logs = vec![
"Program log: {\"type\":\"cypher_transfer\",\"from\":\"ABC\",\"to\":\"XYZ\",\"amount\":1000}".to_string(),
"Program log: Some other log".to_string(),
];

c.bench_function("EventParser::parse_logs", |b| {
b.iter(|| event_parser.parse_logs(black_box(&mock_logs)))
});
}

criterion_group!(
benches,
benchmark_processor_new,
benchmark_process_transaction,
benchmark_process_block,
benchmark_account_parser,
benchmark_instruction_parser,
benchmark_event_parser
);
criterion_main!(benches);

#[derive(Clone)]
struct GeyserPluginConfig {
libpath: String,
accounts_selector: AccountsSelector,
}

#[derive(Clone)]
struct AccountsSelector {
owners: Vec<String>,
}
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ pub struct Config {
pub geyser_plugin_config: GeyserPluginConfig,
pub rpc_poll_interval: u64,
pub websocket_url: String,
pub(crate) wasm_modules: Option<_>,
pub(crate) wasm_memory_limit: Option<i32>,
pub(crate) wasm_execution_timeout: Option<i32>,
pub wasm_modules: Option<_>,
pub wasm_memory_limit: Option<i32>,
pub wasm_execution_timeout: Option<i32>,
}

#[derive(Debug, Deserialize)]
Expand Down
48 changes: 48 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,54 @@ pub trait StorageBackend: Send + Sync {
) -> Result<Vec<Transaction>, Box<dyn std::error::Error>>;
}

pub type Account = serde_json::Value;
pub type Transaction = serde_json::Value;

#[derive(Clone, Debug)]
pub struct StorageError {
message: String,
}

impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}

impl std::error::Error for StorageError {}

impl From<Box<dyn std::error::Error>> for StorageError {
fn from(error: Box<dyn std::error::Error>) -> Self {
StorageError {
message: error.to_string(),
}
}
}

impl From<redis::RedisError> for StorageError {
fn from(error: redis::RedisError) -> Self {
StorageError {
message: error.to_string(),
}
}
}

impl From<clickhouse::ClickhouseError> for StorageError {
fn from(error: clickhouse::ClickhouseError) -> Self {
StorageError {
message: error.to_string(),
}
}
}

impl From<scylla::ScyllaError> for StorageError {
fn from(error: scylla::ScyllaError) -> Self {
StorageError {
message: error.to_string(),
}
}
}

#[derive(Clone)]
pub struct Storage {
clickhouse: clickhouse::ClickhouseStorage,
Expand Down

0 comments on commit 0b6b659

Please sign in to comment.