Skip to content

Commit

Permalink
event client update
Browse files Browse the repository at this point in the history
  • Loading branch information
o-tsaruk committed Aug 30, 2023
1 parent e500a2b commit 48937b0
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 89 deletions.
45 changes: 42 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.1.0"
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ frame-metadata = { version = "15.1", default-features = false, features = ["v14"
parity-scale-codec = { version = "3.6.3", optional = true }
pallet-contracts = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.43", default-features = false, optional = true }
pallet-contracts-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.43", default-features = false, optional = true }
scale-decode = { version = "0.9.0", optional = true }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.43", default-features = false, optional = true }
sp-version = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.43", default-features = false, optional = true }
substrate-api-client = { git = "https://github.com/scs/substrate-api-client", branch = "polkadot-v0.9.43", default-features = false, features = ["jsonrpsee-client", "contracts-xt"], optional = true }
Expand All @@ -35,6 +36,7 @@ rpc = [
"parity-scale-codec",
"pallet-contracts",
"pallet-contracts-primitives",
"scale-decode",
"sp-core",
"sp-version",
"substrate-api-client"
Expand Down
162 changes: 100 additions & 62 deletions crates/common/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

use std::{convert::identity, num::NonZeroUsize};

use frame_metadata::RuntimeMetadataPrefixed;
use frame_metadata::{RuntimeMetadataPrefixed, StorageEntryType};
use futures_util::{
stream::{self, try_unfold},
Stream, StreamExt, TryStreamExt,
};
use lru::LruCache;
use pallet_contracts::Determinism;
use pallet_contracts_primitives::ContractExecResult;
use parity_scale_codec::{Compact, Decode, Encode};
use parity_scale_codec::{Decode, Encode};
use scale_decode::DecodeAsType;
use sp_core::crypto::AccountId32;
use sp_version::RuntimeVersion;
use substrate_api_client::{
Expand All @@ -43,27 +43,17 @@ pub use substrate_api_client;
pub const PAGE_SIZE: u32 = 10;

/// WASM blob information received from an RPC node.
#[derive(Decode)]
#[derive(DecodeAsType)]
struct PrefabWasmModule {
_instruction_weights_version: Compact<u32>,
_initial: Compact<u32>,
_maximum: Compact<u32>,
/// WASM bytecode value.
code: Vec<u8>,
_determinism: Determinism,
}

/// Deployed contract information from an RPC node.
#[derive(Decode)]
#[derive(DecodeAsType)]
pub struct ContractInfo {
_trie_id: Vec<u8>,
/// Code hash associated with the current contract.
pub code_hash: H256,
_storage_bytes: u32,
_storage_items: u32,
_storage_byte_deposit: u128,
_storage_item_deposit: u128,
_storage_base_deposit: u128,
}

/// Get a [`Block`] information for the provided block hash.
Expand All @@ -82,15 +72,20 @@ pub async fn block<C: Request>(
///
/// This method returns an asynchronous [`Stream`] of [`StorageKey`] (which can be decoded to receive the code hash value)
/// and WASM blob bytes.
pub async fn pristine_code_root<C: Request>(
api: &Api<PolkadotConfig, C>,
pub async fn pristine_code_root<'a, C: Request>(
api: &'a Api<PolkadotConfig, C>,
at: H256,
) -> Result<impl Stream<Item = Result<Vec<(StorageKey, Vec<u8>)>, Error>> + '_, Error> {
let prefix = api
.get_storage_map_key_prefix("Contracts", "CodeStorage")
.await?;

Ok(paged_key_values::<_, PrefabWasmModule, _, _>(prefix, api, at, |module| module.code).await)
metadata: &'a Metadata,
) -> Result<impl Stream<Item = Result<Vec<(StorageKey, Vec<u8>)>, Error>> + 'a, Error> {
paged_key_values::<_, PrefabWasmModule, _, _>(
api,
"Contracts",
"PristineCode",
at,
|module| module.code,
metadata,
)
.await
}

/// Get WASM blob for the provided code hash at the provided block hash.
Expand All @@ -100,25 +95,30 @@ pub async fn pristine_code<C: Request>(
api: &Api<PolkadotConfig, C>,
at: H256,
code_hash: H256,
metadata: &Metadata,
) -> Result<Option<Vec<u8>>, Error> {
api.get_storage_map::<_, PrefabWasmModule>("Contracts", "CodeStorage", code_hash, Some(at))
.await
.map(|val| val.map(|module| module.code))
get_ty_storage_by_key::<_, _, PrefabWasmModule>(
api,
"Contracts",
"PristineCode",
code_hash,
at,
metadata,
)
.await
.map(|val| val.map(|module| module.code))
}

/// Get information on all available contracts at the provided block hash.
///
/// This method returns an asynchronous [`Stream`] of [`StorageKey`] (which can be decoded to receive the contract address value)
/// and associated contract information.
pub async fn contract_info_of_root<C: Request + Send + Sync>(
api: &Api<PolkadotConfig, C>,
pub async fn contract_info_of_root<'a, C: Request + Send + Sync>(
api: &'a Api<PolkadotConfig, C>,
at: H256,
) -> Result<impl Stream<Item = Result<Vec<(StorageKey, ContractInfo)>, Error>> + '_, Error> {
let prefix = api
.get_storage_map_key_prefix("Contracts", "ContractInfoOf")
.await?;

Ok(paged_key_values(prefix, api, at, identity).await)
metadata: &'a Metadata,
) -> Result<impl Stream<Item = Result<Vec<(StorageKey, ContractInfo)>, Error>> + 'a, Error> {
paged_key_values(api, "Contracts", "ContractInfoOf", at, identity, metadata).await
}

/// Get information about the specific contract at the provided block hash.
Expand All @@ -128,9 +128,9 @@ pub async fn contract_info_of<C: Request>(
api: &Api<PolkadotConfig, C>,
at: H256,
account_id: &AccountId32,
metadata: &Metadata,
) -> Result<Option<ContractInfo>, Error> {
api.get_storage_map("Contracts", "ContractInfoOf", account_id, Some(at))
.await
get_ty_storage_by_key(api, "Contracts", "ContractInfoOf", account_id, at, metadata).await
}

/// Get UNIX timestamp in milliseconds for the provided block hash.
Expand Down Expand Up @@ -207,11 +207,11 @@ impl MetadataCache {
///
/// This method requests node runtime version corresponding to the provided block,
/// and either fetches it from node or retrieves from cache.
pub async fn metadata<C: Request>(
&mut self,
pub async fn metadata<'a, C: Request>(
&'a mut self,
api: &Api<PolkadotConfig, C>,
at: H256,
) -> Result<Metadata, Error> {
) -> Result<&'a Metadata, Error> {
let RuntimeVersion {
authoring_version,
spec_version,
Expand All @@ -222,12 +222,10 @@ impl MetadataCache {
.request("state_getRuntimeVersion", rpc_params![at])
.await?;

if let Some(metadata) = self
if !self
.cache
.get(&(authoring_version, spec_version, impl_version))
.contains(&(authoring_version, spec_version, impl_version))
{
Ok(metadata.clone())
} else {
let metadata_bytes: Bytes = api
.client()
.request("state_getMetadata", rpc_params![Some(at)])
Expand All @@ -241,9 +239,14 @@ impl MetadataCache {
(authoring_version, spec_version, impl_version),
metadata.clone(),
);

Ok(metadata)
}

let metadata = self
.cache
.get(&(authoring_version, spec_version, impl_version))
.unwrap();

Ok(metadata)
}
}

Expand All @@ -258,23 +261,19 @@ impl Default for MetadataCache {
/// Fetch events associated with the provided block hash.
///
/// Since events layout may differ between different runtime upgrades,
/// this method accepts [`MetadataCache`] to correctly query node for the corresponding metadata.
/// this method accepts [`Metadata`] to correctly query node.
pub async fn events<C: Request>(
api: &Api<PolkadotConfig, C>,
at: H256,
metadata_cache: &mut MetadataCache,
metadata: Metadata,
) -> Result<Events<H256>, Error> {
let key = storage_key("System", "Events");
let event_bytes = api
.get_opaque_storage_by_key(key, Some(at))
.await?
.ok_or(Error::BlockNotFound)?;

Ok(Events::new(
metadata_cache.metadata(api, at).await?,
Default::default(),
event_bytes,
))
Ok(Events::new(metadata, Default::default(), event_bytes))
}

/// Contract instantiation event.
Expand Down Expand Up @@ -335,16 +334,36 @@ impl StaticEvent for Terminated {
const EVENT: &'static str = "Terminated";
}

// Get storage keys and values with the provided prefix, mapping values in process.
async fn paged_key_values<C: Request, V: Decode, T, F: FnMut(V) -> T + 'static>(
prefix: StorageKey,
async fn get_ty_storage_by_key<C: Request, K: Encode, V: DecodeAsType>(
api: &Api<PolkadotConfig, C>,
pallet: &'static str,
storage_item: &'static str,
map_key: K,
at: H256,
metadata: &Metadata,
) -> Result<Option<V>, Error> {
let storage_key = metadata.storage_map_key(pallet, storage_item, map_key)?;

api.get_opaque_storage_by_key(storage_key, Some(at))
.await?
.map(|input| resolve_ty(metadata, pallet, storage_item, &mut &*input))
.transpose()
}

// Get storage keys and values with the provided prefix, mapping values in process.
async fn paged_key_values<'a, C: Request, V: DecodeAsType, T, F: FnMut(V) -> T + 'static>(
api: &'a Api<PolkadotConfig, C>,
pallet: &'static str,
storage_item: &'static str,
at: H256,
map: F,
) -> impl Stream<Item = Result<Vec<(StorageKey, T)>, Error>> + '_ {
try_unfold(
(None, prefix, map),
move |(start_key, prefix, mut map)| async move {
metadata: &'a Metadata,
) -> Result<impl Stream<Item = Result<Vec<(StorageKey, T)>, Error>> + 'a, Error> {
let prefix = api.get_storage_map_key_prefix(pallet, storage_item).await?;

Ok(try_unfold(
(None, prefix, map, metadata),
move |(start_key, prefix, mut map, metadata)| async move {
let storage_keys = api
.get_storage_keys_paged(Some(prefix.clone()), PAGE_SIZE, start_key, Some(at))
.await?;
Expand All @@ -358,8 +377,10 @@ async fn paged_key_values<C: Request, V: Decode, T, F: FnMut(V) -> T + 'static>(
let values = stream::iter(storage_keys)
.then(move |storage_key| async move {
let value = api
.get_storage_by_key(storage_key.clone(), Some(at))
.get_opaque_storage_by_key(storage_key.clone(), Some(at))
.await?
.map(|input| resolve_ty(metadata, pallet, storage_item, &mut &*input))
.transpose()?
.expect("unable to find value corresponding to the provided storage key");

Result::<_, Error>::Ok((storage_key, value))
Expand All @@ -368,7 +389,24 @@ async fn paged_key_values<C: Request, V: Decode, T, F: FnMut(V) -> T + 'static>(
.try_collect()
.await?;

Result::<_, Error>::Ok(Some((values, (start_key, prefix, map))))
Result::<_, Error>::Ok(Some((values, (start_key, prefix, map, metadata))))
},
)
))
}

fn resolve_ty<T: DecodeAsType>(
metadata: &Metadata,
pallet_name: &'static str,
storage_key: &'static str,
input: &mut &[u8],
) -> Result<T, Error> {
let type_id = match metadata.pallet(pallet_name)?.storage(storage_key)?.ty {
StorageEntryType::Plain(ty) => ty.id,
StorageEntryType::Map { value, .. } => value.id,
};

let ty = T::decode_as_type(input, type_id, metadata.types())
.expect("unable to parse DecodeAsType type");

Ok(ty)
}
Loading

0 comments on commit 48937b0

Please sign in to comment.