diff --git a/Cargo.lock b/Cargo.lock index e9b9f6e29154..9c498730feea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,6 +1719,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" +[[package]] +name = "leb128-tokio" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ed5aeb781f5bc022a2b9a0df2c113721eadcc5d2a4cd2c139faa2957b2582cf" +dependencies = [ + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "libc" version = "0.2.153" @@ -2060,9 +2071,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -2872,16 +2883,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -3058,6 +3068,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8-tokio" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7792e7406e4fbad1f7c298d27ece7f25aab855c8270e5a4b428ebd2b0b95313" +dependencies = [ + "tokio", + "tokio-util", +] + [[package]] name = "utf8parse" version = "0.2.1" @@ -3306,6 +3326,19 @@ dependencies = [ "wat", ] +[[package]] +name = "wasm-tokio" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695c68282a8ffda65b24379d40e00cdab4737363af9e61463093b3f0d58f5bfc" +dependencies = [ + "leb128-tokio", + "tokio", + "tokio-util", + "tracing", + "utf8-tokio", +] + [[package]] name = "wasmi" version = "0.31.1" @@ -3777,6 +3810,25 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "wasmtime-rpc" +version = "22.0.0" +dependencies = [ + "anyhow", + "bytes", + "futures", + "test-log", + "test-programs-artifacts", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "wasm-tokio", + "wasmtime", + "wasmtime-wasi", + "wrpc-transport", +] + [[package]] name = "wasmtime-slab" version = "22.0.0" @@ -4368,6 +4420,22 @@ dependencies = [ "wast 35.0.2", ] +[[package]] +name = "wrpc-transport" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "853282f4cad30d0d62b1817d8bf5f42838ce20bd0fcced6aff727306d7de7640" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "futures", + "leb128", + "tokio", + "tracing", + "wasm-tokio", +] + [[package]] name = "xattr" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index ac53b922391f..34e0142d4d90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,7 @@ members = [ "crates/bench-api", "crates/c-api/artifact", "crates/environ/fuzz", + "crates/rpc", "crates/test-programs", "crates/wasi-preview1-component-adapter", "crates/wasi-preview1-component-adapter/verify", @@ -320,6 +321,9 @@ humantime = "2.0.0" postcard = { version = "1.0.8", default-features = false, features = ['alloc'] } criterion = { version = "0.5.0", default-features = false, features = ["html_reports", "rayon"] } rustc-hash = "1.1.0" +tokio-util = "0.7.9" +wasm-tokio = "0.4.0" +wrpc-transport = "0.25.0" # ============================================================================= # diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml new file mode 100644 index 000000000000..0b1e2e0697fd --- /dev/null +++ b/crates/rpc/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "wasmtime-rpc" +description = "RPC-based Wasmtime functionality extensions" + +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true, features = ["alloc"] } +tracing = { workspace = true } +tokio = { workspace = true, features = ["macros"] } +tokio-util = { workspace = true, features = ["codec"] } +wasm-tokio = { workspace = true } +wasmtime = { workspace = true } +wasmtime-wasi = { workspace = true } +wrpc-transport = { workspace = true } + +[dev-dependencies] +test-log = { workspace = true } +test-programs-artifacts = { workspace = true } +tracing-subscriber = { workspace = true } + +[lints] +workspace = true diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs new file mode 100644 index 000000000000..548eb3f64af1 --- /dev/null +++ b/crates/rpc/src/lib.rs @@ -0,0 +1,877 @@ +use core::future::Future; +use core::iter::zip; +use core::ops::{BitOrAssign, Shl}; +use core::pin::{pin, Pin}; + +use std::collections::HashSet; +use std::sync::Arc; + +use anyhow::{anyhow, bail, Context as _}; +use bytes::{BufMut as _, BytesMut}; +use futures::future::try_join_all; +use futures::stream::FuturesUnordered; +use futures::TryStreamExt as _; +use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; +use tokio::try_join; +use tokio_util::codec::Encoder; +use tracing::{debug, instrument, trace, warn}; +use wasm_tokio::cm::AsyncReadValue as _; +use wasm_tokio::{ + AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreStringEncoder, Leb128Encoder, + Utf8Encoder, +}; +use wasmtime::component::types::{self, Case, Field}; +use wasmtime::component::{LinkerInstance, Type, Val}; +use wasmtime::{AsContextMut, Engine, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wrpc_transport::{Index as _, Invocation, Invoke, Session}; + +pub struct ValEncoder<'a, T, W> { + pub store: StoreContextMut<'a, T>, + pub ty: &'a Type, + pub deferred: Option< + Box Pin> + Send>> + Send>, + >, +} + +impl ValEncoder<'_, T, W> { + #[must_use] + pub fn new<'a>(store: StoreContextMut<'a, T>, ty: &'a Type) -> ValEncoder<'a, T, W> { + ValEncoder { + store, + ty, + deferred: None, + } + } + + #[must_use] + pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> { + ValEncoder { + store: self.store.as_context_mut(), + ty, + deferred: None, + } + } +} + +fn find_enum_discriminant<'a, T>( + iter: impl IntoIterator, + names: impl IntoIterator, + discriminant: &str, +) -> wasmtime::Result { + zip(iter, names) + .find_map(|(i, name)| (name == discriminant).then_some(i)) + .context("unknown enum discriminant") +} + +fn find_variant_discriminant<'a, T>( + iter: impl IntoIterator, + cases: impl IntoIterator>, + discriminant: &str, +) -> wasmtime::Result<(T, Option)> { + zip(iter, cases) + .find_map(|(i, Case { name, ty })| (name == discriminant).then_some((i, ty))) + .context("unknown variant discriminant") +} + +#[inline] +fn flag_bits<'a, T: BitOrAssign + Shl + From>( + names: impl IntoIterator, + flags: impl IntoIterator, +) -> T { + let mut v = T::from(0); + let flags: HashSet<&str> = flags.into_iter().collect(); + for (i, name) in zip(0u8.., names) { + if flags.contains(name) { + v |= T::from(1) << i; + } + } + v +} + +async fn write_deferred(w: W, deferred: I) -> wasmtime::Result<()> +where + W: wrpc_transport::Index + Sync + Send + 'static, + W::Error: Into, + I: IntoIterator, + I::IntoIter: ExactSizeIterator< + Item = Option< + Box Pin> + Send>> + Send>, + >, + >, +{ + let futs: FuturesUnordered<_> = zip(0.., deferred) + .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f))) + .map(|(w, f)| async move { + let w = w.map_err(Into::into)?; + f(w).await + }) + .collect(); + futs.try_collect().await?; + Ok(()) +} + +impl Encoder<&Val> for ValEncoder<'_, T, W> +where + T: WasiView, + W: AsyncWrite + wrpc_transport::Index + Sync + Send + 'static, + W::Error: Into, +{ + type Error = wasmtime::Error; + + fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> { + match (v, self.ty) { + (Val::Bool(v), Type::Bool) => { + dst.reserve(1); + dst.put_u8((*v).into()); + Ok(()) + } + (Val::S8(v), Type::S8) => { + dst.reserve(1); + dst.put_i8(*v); + Ok(()) + } + (Val::U8(v), Type::U8) => { + dst.reserve(1); + dst.put_u8(*v); + Ok(()) + } + (Val::S16(v), Type::S16) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode s16"), + (Val::U16(v), Type::U16) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode u16"), + (Val::S32(v), Type::S32) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode s32"), + (Val::U32(v), Type::U32) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode u32"), + (Val::S64(v), Type::S64) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode s64"), + (Val::U64(v), Type::U64) => Leb128Encoder + .encode(*v, dst) + .context("failed to encode u64"), + (Val::Float32(v), Type::Float32) => { + dst.reserve(4); + dst.put_f32_le(*v); + Ok(()) + } + (Val::Float64(v), Type::Float64) => { + dst.reserve(8); + dst.put_f64_le(*v); + Ok(()) + } + (Val::Char(v), Type::Char) => { + Utf8Encoder.encode(*v, dst).context("failed to encode char") + } + (Val::String(v), Type::String) => CoreStringEncoder + .encode(v.as_str(), dst) + .context("failed to encode string"), + (Val::List(vs), Type::List(ty)) => { + let ty = ty.ty(); + let n = u32::try_from(vs.len()).context("list length does not fit in u32")?; + dst.reserve(5 + vs.len()); + Leb128Encoder + .encode(n, dst) + .context("failed to encode list length")?; + let mut deferred = Vec::with_capacity(vs.len()); + for v in vs { + let mut enc = self.with_type(&ty); + enc.encode(v, dst) + .context("failed to encode list element")?; + deferred.push(enc.deferred); + } + if deferred.iter().any(Option::is_some) { + self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred)))); + } + Ok(()) + } + (Val::Record(vs), Type::Record(ty)) => { + dst.reserve(vs.len()); + let mut deferred = Vec::with_capacity(vs.len()); + for ((name, v), Field { ref ty, .. }) in zip(vs, ty.fields()) { + let mut enc = self.with_type(ty); + enc.encode(v, dst) + .with_context(|| format!("failed to encode `{name}` field"))?; + deferred.push(enc.deferred); + } + if deferred.iter().any(Option::is_some) { + self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred)))); + } + Ok(()) + } + (Val::Tuple(vs), Type::Tuple(ty)) => { + dst.reserve(vs.len()); + let mut deferred = Vec::with_capacity(vs.len()); + for (v, ref ty) in zip(vs, ty.types()) { + let mut enc = self.with_type(ty); + enc.encode(v, dst) + .context("failed to encode tuple element")?; + deferred.push(enc.deferred); + } + if deferred.iter().any(Option::is_some) { + self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred)))); + } + Ok(()) + } + (Val::Variant(discriminant, v), Type::Variant(ty)) => { + let cases = ty.cases(); + let ty = match cases.len() { + ..=0x0000_00ff => { + let (discriminant, ty) = + find_variant_discriminant(0u8.., cases, discriminant)?; + dst.reserve(2 + usize::from(v.is_some())); + Leb128Encoder.encode(discriminant, dst)?; + ty + } + 0x0000_0100..=0x0000_ffff => { + let (discriminant, ty) = + find_variant_discriminant(0u16.., cases, discriminant)?; + dst.reserve(3 + usize::from(v.is_some())); + Leb128Encoder.encode(discriminant, dst)?; + ty + } + 0x0001_0000..=0x00ff_ffff => { + let (discriminant, ty) = + find_variant_discriminant(0u32.., cases, discriminant)?; + dst.reserve(4 + usize::from(v.is_some())); + Leb128Encoder.encode(discriminant, dst)?; + ty + } + 0x0100_0000..=0xffff_ffff => { + let (discriminant, ty) = + find_variant_discriminant(0u32.., cases, discriminant)?; + dst.reserve(5 + usize::from(v.is_some())); + Leb128Encoder.encode(discriminant, dst)?; + ty + } + 0x1_0000_0000.. => bail!("case count does not fit in u32"), + }; + if let Some(v) = v { + let ty = ty.context("type missing for variant")?; + let mut enc = self.with_type(&ty); + enc.encode(v, dst) + .context("failed to encode variant value")?; + if let Some(f) = enc.deferred { + self.deferred = Some(f); + } + } + Ok(()) + } + (Val::Enum(discriminant), Type::Enum(ty)) => { + let names = ty.names(); + match names.len() { + ..=0x0000_00ff => { + let discriminant = find_enum_discriminant(0u8.., names, discriminant)?; + dst.reserve(2); + Leb128Encoder.encode(discriminant, dst)? + } + 0x0000_0100..=0x0000_ffff => { + let discriminant = find_enum_discriminant(0u16.., names, discriminant)?; + dst.reserve(3); + Leb128Encoder.encode(discriminant, dst)? + } + 0x0001_0000..=0x00ff_ffff => { + let discriminant = find_enum_discriminant(0u32.., names, discriminant)?; + dst.reserve(4); + Leb128Encoder.encode(discriminant, dst)? + } + 0x0100_0000..=0xffff_ffff => { + let discriminant = find_enum_discriminant(0u32.., names, discriminant)?; + dst.reserve(5); + Leb128Encoder.encode(discriminant, dst)? + } + 0x1_0000_0000.. => bail!("name count does not fit in u32"), + } + Ok(()) + } + (Val::Option(None), Type::Option(_)) => { + dst.reserve(1); + dst.put_u8(0); + Ok(()) + } + (Val::Option(Some(v)), Type::Option(ty)) => { + dst.reserve(2); + dst.put_u8(1); + let ty = ty.ty(); + let mut enc = self.with_type(&ty); + enc.encode(v, dst) + .context("failed to encode `option::some` value")?; + if let Some(f) = enc.deferred { + self.deferred = Some(f); + } + Ok(()) + } + (Val::Result(v), Type::Result(ty)) => match v { + Ok(v) => match (v, ty.ok()) { + (Some(v), Some(ty)) => { + dst.reserve(2); + dst.put_u8(0); + let mut enc = self.with_type(&ty); + enc.encode(v, dst) + .context("failed to encode `result::ok` value")?; + if let Some(f) = enc.deferred { + self.deferred = Some(f); + } + Ok(()) + } + (Some(_v), None) => bail!("`result::ok` value of unknown type"), + (None, Some(_ty)) => bail!("`result::ok` value missing"), + (None, None) => { + dst.reserve(1); + dst.put_u8(0); + Ok(()) + } + }, + Err(v) => match (v, ty.err()) { + (Some(v), Some(ty)) => { + dst.reserve(2); + dst.put_u8(1); + let mut enc = self.with_type(&ty); + enc.encode(v, dst) + .context("failed to encode `result::err` value")?; + if let Some(f) = enc.deferred { + self.deferred = Some(f); + } + Ok(()) + } + (Some(_v), None) => bail!("`result::err` value of unknown type"), + (None, Some(_ty)) => bail!("`result::err` value missing"), + (None, None) => { + dst.reserve(1); + dst.put_u8(1); + Ok(()) + } + }, + }, + (Val::Flags(vs), Type::Flags(ty)) => { + let names = ty.names(); + let vs = vs.iter().map(String::as_str); + match names.len() { + ..=8 => { + dst.reserve(1); + dst.put_u8(flag_bits(names, vs)); + } + 9..=16 => { + dst.reserve(2); + dst.put_u16_le(flag_bits(names, vs)); + } + 17..=24 => { + dst.reserve(3); + dst.put_slice(&u32::to_le_bytes(flag_bits(names, vs))[..3]); + } + 25..=32 => { + dst.reserve(4); + dst.put_u32_le(flag_bits(names, vs)); + } + 33..=40 => { + dst.reserve(5); + dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..5]); + } + 41..=48 => { + dst.reserve(6); + dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..6]); + } + 49..=56 => { + dst.reserve(7); + dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..7]); + } + 57..=64 => { + dst.reserve(8); + dst.put_u64_le(flag_bits(names, vs)); + } + 65..=72 => { + dst.reserve(9); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..9]); + } + 73..=80 => { + dst.reserve(10); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..10]); + } + 81..=88 => { + dst.reserve(11); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..11]); + } + 89..=96 => { + dst.reserve(12); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..12]); + } + 97..=104 => { + dst.reserve(13); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..13]); + } + 105..=112 => { + dst.reserve(14); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..14]); + } + 113..=120 => { + dst.reserve(15); + dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..15]); + } + 121..=128 => { + dst.reserve(16); + dst.put_u128_le(flag_bits(names, vs)); + } + bits @ 129.. => { + let mut cap = bits / 8; + if bits % 8 != 0 { + cap = cap.saturating_add(1); + } + let mut buf = vec![0; cap]; + let flags: HashSet<&str> = vs.into_iter().collect(); + for (i, name) in names.enumerate() { + if flags.contains(name) { + buf[i / 8] |= 1 << (i % 8) + } + } + dst.extend_from_slice(&buf); + } + } + Ok(()) + } + (Val::Resource(_resource), Type::Own(_ty) | Type::Borrow(_ty)) => { + bail!("encoding resources not supported yet") + } + _ => bail!("value type mismatch"), + } + } +} + +#[inline] +async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Result { + let mut buf = 0u128.to_le_bytes(); + r.read_exact(&mut buf[..n]).await?; + Ok(u128::from_le_bytes(buf)) +} + +/// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`] +#[instrument(level = "trace", skip_all, fields(ty, path))] +async fn read_value( + store: &mut impl AsContextMut, + r: &mut Pin<&mut R>, + val: &mut Val, + ty: &Type, + path: &[usize], +) -> std::io::Result<()> +where + T: WasiView, + R: AsyncRead + wrpc_transport::Index + Send + Unpin + 'static, + R::Error: Into>, +{ + match ty { + Type::Bool => { + let v = r.read_bool().await?; + *val = Val::Bool(v); + Ok(()) + } + Type::S8 => { + let v = r.read_i8().await?; + *val = Val::S8(v); + Ok(()) + } + Type::U8 => { + let v = r.read_u8().await?; + *val = Val::U8(v); + Ok(()) + } + Type::S16 => { + let v = r.read_i16_leb128().await?; + *val = Val::S16(v); + Ok(()) + } + Type::U16 => { + let v = r.read_u16_leb128().await?; + *val = Val::U16(v); + Ok(()) + } + Type::S32 => { + let v = r.read_i32_leb128().await?; + *val = Val::S32(v); + Ok(()) + } + Type::U32 => { + let v = r.read_u32_leb128().await?; + *val = Val::U32(v); + Ok(()) + } + Type::S64 => { + let v = r.read_i64_leb128().await?; + *val = Val::S64(v); + Ok(()) + } + Type::U64 => { + let v = r.read_u64_leb128().await?; + *val = Val::U64(v); + Ok(()) + } + Type::Float32 => { + let v = r.read_f32_le().await?; + *val = Val::Float32(v); + Ok(()) + } + Type::Float64 => { + let v = r.read_f64_le().await?; + *val = Val::Float64(v); + Ok(()) + } + Type::Char => { + let v = r.read_char_utf8().await?; + *val = Val::Char(v); + Ok(()) + } + Type::String => { + let mut s = String::default(); + r.read_core_string(&mut s).await?; + *val = Val::String(s); + Ok(()) + } + Type::List(ty) => { + let n = r.read_u32_leb128().await?; + let n = n.try_into().unwrap_or(usize::MAX); + let mut vs = Vec::with_capacity(n); + let ty = ty.ty(); + let mut path = path.to_vec(); + for i in 0..n { + let mut v = Val::Bool(false); + path.push(i); + trace!(i, "reading list element value"); + Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + path.pop(); + vs.push(v); + } + *val = Val::List(vs); + Ok(()) + } + Type::Record(ty) => { + let fields = ty.fields(); + let mut vs = Vec::with_capacity(fields.len()); + let mut path = path.to_vec(); + for (i, Field { name, ty }) in fields.enumerate() { + let mut v = Val::Bool(false); + path.push(i); + trace!(i, "reading struct field value"); + Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + path.pop(); + vs.push((name.to_string(), v)); + } + *val = Val::Record(vs); + Ok(()) + } + Type::Tuple(ty) => { + let types = ty.types(); + let mut vs = Vec::with_capacity(types.len()); + let mut path = path.to_vec(); + for (i, ty) in types.enumerate() { + let mut v = Val::Bool(false); + path.push(i); + trace!(i, "reading tuple element value"); + Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + path.pop(); + vs.push(v); + } + *val = Val::Tuple(vs); + Ok(()) + } + Type::Variant(ty) => { + let discriminant = r.read_u32_leb128().await?; + let discriminant = discriminant + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + let Case { name, ty } = ty.cases().nth(discriminant).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("unknown variant discriminant `{discriminant}`"), + ) + })?; + let name = name.to_string(); + if let Some(ty) = ty { + let mut v = Val::Bool(false); + trace!(variant = name, "reading nested variant value"); + Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + *val = Val::Variant(name, Some(Box::new(v))); + } else { + *val = Val::Variant(name, None); + } + Ok(()) + } + Type::Enum(ty) => { + let discriminant = r.read_u32_leb128().await?; + let discriminant = discriminant + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + let name = ty.names().nth(discriminant).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("unknown enum discriminant `{discriminant}`"), + ) + })?; + *val = Val::Enum(name.to_string()); + Ok(()) + } + Type::Option(ty) => { + let ok = r.read_option_status().await?; + if ok { + let mut v = Val::Bool(false); + trace!("reading nested `option::some` value"); + Box::pin(read_value(store, r, &mut v, &ty.ty(), path)).await?; + *val = Val::Option(Some(Box::new(v))); + } else { + *val = Val::Option(None); + } + Ok(()) + } + Type::Result(ty) => { + let ok = r.read_result_status().await?; + if ok { + if let Some(ty) = ty.ok() { + let mut v = Val::Bool(false); + trace!("reading nested `result::ok` value"); + Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + *val = Val::Result(Ok(Some(Box::new(v)))); + } else { + *val = Val::Result(Ok(None)); + } + } else if let Some(ty) = ty.err() { + let mut v = Val::Bool(false); + trace!("reading nested `result::err` value"); + Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + *val = Val::Result(Err(Some(Box::new(v)))); + } else { + *val = Val::Result(Err(None)); + } + Ok(()) + } + Type::Flags(ty) => { + let names = ty.names(); + let flags = match names.len() { + ..=8 => read_flags(1, r).await?, + 9..=16 => read_flags(2, r).await?, + 17..=24 => read_flags(3, r).await?, + 25..=32 => read_flags(4, r).await?, + 33..=40 => read_flags(5, r).await?, + 41..=48 => read_flags(6, r).await?, + 49..=56 => read_flags(7, r).await?, + 57..=64 => read_flags(8, r).await?, + 65..=72 => read_flags(9, r).await?, + 73..=80 => read_flags(10, r).await?, + 81..=88 => read_flags(11, r).await?, + 89..=96 => read_flags(12, r).await?, + 97..=104 => read_flags(13, r).await?, + 105..=112 => read_flags(14, r).await?, + 113..=120 => read_flags(15, r).await?, + 121..=128 => r.read_u128_le().await?, + bits @ 129.. => { + let mut cap = bits / 8; + if bits % 8 != 0 { + cap = cap.saturating_add(1); + } + let mut buf = vec![0; cap]; + r.read_exact(&mut buf).await?; + let mut vs = Vec::with_capacity( + buf.iter() + .map(|b| b.count_ones()) + .sum::() + .try_into() + .unwrap_or(usize::MAX), + ); + for (i, name) in names.enumerate() { + if buf[i / 8] & (1 << (i % 8)) != 0 { + vs.push(name.to_string()); + } + } + *val = Val::Flags(vs); + return Ok(()); + } + }; + let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX)); + for (i, name) in zip(0.., names) { + if flags & (1 << i) != 0 { + vs.push(name.to_string()); + } + } + *val = Val::Flags(vs); + Ok(()) + } + Type::Own(_ty) | Type::Borrow(_ty) => Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "reading resources not supported yet", + )), + } +} + +pub trait WrpcView: Send { + fn client(&self) -> &C; +} + +/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] +#[instrument(level = "trace", skip_all)] +pub fn link_item<'a, C, V>( + engine: &Engine, + linker: &mut LinkerInstance, + ty: types::ComponentItem, + instance: impl Into>, + name: impl Into>, + cx: C::Context, +) -> wasmtime::Result<()> +where + V: WrpcView + WasiView, + C: Invoke, + C::Error: Into, + C::Context: Clone + 'static, + ::TransportError: Into, + >::Error: Into, + C::NestedOutgoing: 'static, + >::Error: Into, + C::Incoming: Unpin + Sized + 'static, + >::Error: + Into>, +{ + let instance = instance.into(); + match ty { + types::ComponentItem::ComponentFunc(ty) => { + let name = name.into(); + debug!(?instance, ?name, "linking function"); + link_function(linker, ty, instance, name, cx)? + } + types::ComponentItem::CoreFunc(_) => { + bail!("polyfilling core functions not supported yet") + } + types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"), + types::ComponentItem::Component(ty) => { + for (name, ty) in ty.imports(&engine) { + debug!(?instance, name, "linking component item"); + link_item(engine, linker, ty, "", name, cx.clone())?; + } + } + types::ComponentItem::ComponentInstance(ty) => { + let name = name.into(); + let mut linker = linker + .instance(&name) + .with_context(|| format!("failed to instantiate `{name}` in the linker"))?; + debug!(?instance, ?name, "linking instance"); + link_instance(engine, &mut linker, ty, name, cx)? + } + types::ComponentItem::Type(_) => {} + types::ComponentItem::Resource(_) => bail!("polyfilling resources not supported yet"), + } + Ok(()) +} + +/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] +#[instrument(level = "trace", skip_all)] +pub fn link_instance<'a, C, V>( + engine: &Engine, + linker: &mut LinkerInstance, + ty: types::ComponentInstance, + name: impl Into>, + cx: C::Context, +) -> wasmtime::Result<()> +where + V: WrpcView + WasiView, + C: Invoke, + C::Error: Into, + C::Context: Clone + 'static, + ::TransportError: Into, + >::Error: Into, + C::NestedOutgoing: 'static, + >::Error: Into, + C::Incoming: Unpin + Sized + 'static, + >::Error: + Into>, +{ + let instance = name.into(); + for (name, ty) in ty.exports(&engine) { + debug!(name, "linking instance item"); + link_item(engine, linker, ty, Arc::clone(&instance), name, cx.clone())? + } + Ok(()) +} + +/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] +#[instrument(level = "trace", skip_all)] +pub fn link_function<'a, C, V>( + linker: &mut LinkerInstance, + ty: types::ComponentFunc, + instance: impl Into>, + name: impl Into>, + cx: C::Context, +) -> wasmtime::Result<()> +where + V: WrpcView + WasiView, + C: Invoke, + C::Error: Into, + C::Context: Clone + 'static, + ::TransportError: Into, + >::Error: Into, + C::NestedOutgoing: 'static, + >::Error: Into, + C::Incoming: Unpin + Sized + 'static, + >::Error: + Into>, +{ + let instance = instance.into(); + let name = name.into(); + linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| { + let cx = cx.clone(); + let ty = ty.clone(); + let instance = Arc::clone(&instance); + let name = Arc::clone(&name); + Box::new(async move { + let mut buf = BytesMut::default(); + let mut deferred = vec![]; + for (v, ref ty) in zip(params, ty.params()) { + let mut enc = ValEncoder::new(store.as_context_mut(), ty); + enc.encode(v, &mut buf) + .context("failed to encode parameter")?; + deferred.push(enc.deferred); + } + let Invocation { + outgoing, + incoming, + session, + } = store + .data() + .client() + .invoke(cx, &instance, &name, buf.freeze(), &[]) + .await + .map_err(Into::into) + .with_context(|| { + format!("failed to invoke `{instance}.{name}` polyfill via wRPC") + })?; + try_join!( + async { + try_join_all( + zip(0.., deferred) + .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f))) + .map(|(w, f)| async move { + let w = w.map_err(Into::into)?; + f(w).await + }), + ) + .await + .context("failed to write asynchronous parameters")?; + pin!(outgoing) + .shutdown() + .await + .context("failed to shutdown outgoing stream") + }, + async { + let mut incoming = pin!(incoming); + for (i, (v, ref ty)) in zip(results, ty.results()).enumerate() { + read_value(&mut store, &mut incoming, v, ty, &[i]) + .await + .with_context(|| format!("failed to decode return value {i}"))?; + } + Ok(()) + }, + )?; + match session.finish(Ok(())).await.map_err(Into::into)? { + Ok(()) => Ok(()), + Err(err) => bail!(anyhow!("{err}").context("session failed")), + } + }) + }) +} diff --git a/crates/rpc/tests/all.rs b/crates/rpc/tests/all.rs new file mode 100644 index 000000000000..3fe2702125ff --- /dev/null +++ b/crates/rpc/tests/all.rs @@ -0,0 +1,576 @@ +use core::convert::Infallible; +use core::ops::Deref as _; +use core::pin::{pin, Pin}; +use core::task::{Context, Poll}; + +use std::collections::HashMap; +use std::io; +use std::sync::{Arc, Mutex}; + +use anyhow::bail; +use test_programs_artifacts::{foreach_rpc, RPC_HELLO_CLIENT_COMPONENT, RPC_SYNC_CLIENT_COMPONENT}; +use tokio::io::{AsyncRead, AsyncWrite}; +use wasmtime::component::{types, Component, Linker}; +use wasmtime::Store; +use wasmtime_rpc::{link_instance, WrpcView}; +use wasmtime_wasi::bindings::Command; +use wasmtime_wasi::pipe::MemoryOutputPipe; +use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiView}; +use wrpc_transport::{Index, Invoke}; + +// Assert that we are testing everything through assertion +// of the existence of the test function itself. +macro_rules! assert_test_exists { + ($name:ident) => { + #[allow(unused_imports)] + use self::$name as _; + }; +} + +foreach_rpc!(assert_test_exists); + +pub struct Ctx { + pub table: ResourceTable, + pub wasi: WasiCtx, + pub wrpc: C, + pub stderr: MemoryOutputPipe, +} + +impl> WrpcView for Ctx { + fn client(&self) -> &C { + &self.wrpc + } +} + +impl WasiView for Ctx { + fn ctx(&mut self) -> &mut WasiCtx { + &mut self.wasi + } + fn table(&mut self) -> &mut ResourceTable { + &mut self.table + } +} + +impl Drop for Ctx { + fn drop(&mut self) { + let stderr = self.stderr.contents(); + if !stderr.is_empty() { + println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr)); + } + } +} + +#[derive(Clone)] +struct Session { + outgoing: Arc>>>, + incoming: Result<(), String>, +} + +impl Default for Session { + fn default() -> Self { + Self { + outgoing: Arc::default(), + incoming: Ok(()), + } + } +} + +impl wrpc_transport::Session for Session { + type Error = String; + type TransportError = Infallible; + + async fn finish( + self, + res: Result<(), Self::Error>, + ) -> Result, Self::TransportError> { + let mut lock = self.outgoing.lock().unwrap(); + assert_eq!(lock.as_ref(), None); + *lock = Some(res); + Ok(self.incoming) + } +} + +#[derive(Default, Clone)] +struct Incoming { + indexes: Arc, io::Cursor<&'static [u8]>>>>, + path: Vec, +} + +impl Index for Incoming { + type Error = Infallible; + + fn index(&self, path: &[usize]) -> Result { + let mut target = self.path.to_vec(); + target.extend(path); + Ok(Self { + indexes: self.indexes.clone(), + path: target, + }) + } +} + +impl AsyncRead for Incoming { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let mut lock = self.indexes.lock().unwrap(); + let r = lock + .get_mut(&self.path) + .expect(&format!("unknown path {:?}", self.path)); + pin!(r).poll_read(cx, buf) + } +} + +#[derive(Default, Clone)] +struct Outgoing { + indexes: Arc, Vec>>>, + path: Vec, +} + +impl Index for Outgoing { + type Error = Infallible; + + fn index(&self, path: &[usize]) -> Result { + let mut target = self.path.to_vec(); + target.extend(path); + Ok(Self { + indexes: self.indexes.clone(), + path: target, + }) + } +} + +impl AsyncWrite for Outgoing { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut lock = self.indexes.lock().unwrap(); + let w = lock.entry(self.path.clone()).or_default(); + pin!(w).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut lock = self.indexes.lock().unwrap(); + if let Some(w) = lock.get_mut(&self.path) { + pin!(w).poll_flush(cx) + } else { + Poll::Ready(Ok(())) + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut lock = self.indexes.lock().unwrap(); + if let Some(w) = lock.get_mut(&self.path) { + pin!(w).poll_shutdown(cx) + } else { + Poll::Ready(Ok(())) + } + } +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn rpc_hello_client() -> anyhow::Result<()> { + struct Transport { + outgoing: Outgoing, + session: Session, + } + impl Invoke for Transport { + type Error = wasmtime::Error; + type Context = &'static str; + type Session = Session; + type Outgoing = Outgoing; + type NestedOutgoing = Outgoing; + type Incoming = Incoming; + + async fn invoke( + &self, + cx: Self::Context, + instance: &str, + func: &str, + params: bytes::Bytes, + paths: &[&[Option]], + ) -> Result< + wrpc_transport::Invocation, + Self::Error, + > { + assert_eq!(cx, "test"); + assert_eq!(instance, "rpc-test:hello/handler"); + assert_eq!(func, "hello"); + assert_eq!(params, "\x08wasmtime".as_bytes()); + assert!(paths.is_empty()); + let indexes = + HashMap::from([(vec![], io::Cursor::new("\x0fHello, wasmtime".as_bytes()))]); + Ok(wrpc_transport::Invocation { + outgoing: self.outgoing.clone(), + incoming: Incoming { + indexes: Arc::new(Mutex::new(indexes)), + path: vec![], + }, + session: self.session.clone(), + }) + } + } + + let engine = test_programs_artifacts::engine(|config| { + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.async_support(true); + }); + let component = Component::from_file(&engine, RPC_HELLO_CLIENT_COMPONENT)?; + + let stdout = MemoryOutputPipe::new(4096); + let stderr = MemoryOutputPipe::new(4096); + + let wasi = WasiCtxBuilder::new() + .stdout(stdout.clone()) + .stderr(stderr.clone()) + .build(); + let outgoing = Arc::default(); + let error = Arc::default(); + let ctx = Ctx { + table: ResourceTable::new(), + wasi, + wrpc: Transport { + outgoing: Outgoing { + indexes: Arc::clone(&outgoing), + path: vec![], + }, + session: Session { + outgoing: Arc::clone(&error), + incoming: Ok(()), + }, + }, + stderr, + }; + let mut store = Store::new(&engine, ctx); + let mut linker = Linker::new(&engine); + wasmtime_wasi::add_to_linker_async(&mut linker)?; + + { + let Some(types::ComponentItem::ComponentInstance(ty)) = component + .component_type() + .get_import(&engine, "rpc-test:hello/handler") + else { + bail!("`rpc-test:hello/handler` instance import not found") + }; + let mut linker = linker.instance("rpc-test:hello/handler")?; + link_instance(&engine, &mut linker, ty, "rpc-test:hello/handler", "test")?; + } + + let (command, _instance) = Command::instantiate_async(&mut store, &component, &linker).await?; + let result = command.wasi_cli_run().call_run(&mut store).await?; + result.map_err(|()| anyhow::anyhow!("run failed"))?; + assert_eq!(stdout.contents(), "Hello, wasmtime"); + assert_eq!(outgoing.lock().unwrap().deref(), &HashMap::default()); + assert_eq!(error.lock().unwrap().deref(), &Some(Ok(()))); + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn rpc_sync_client() -> anyhow::Result<()> { + struct Transport( + Arc>>, + ); + + impl Invoke for Transport { + type Error = wasmtime::Error; + type Context = &'static str; + type Session = Session; + type Outgoing = Outgoing; + type NestedOutgoing = Outgoing; + type Incoming = Incoming; + + async fn invoke( + &self, + cx: Self::Context, + instance: &str, + func: &str, + params: bytes::Bytes, + paths: &[&[Option]], + ) -> Result< + wrpc_transport::Invocation, + Self::Error, + > { + let mut lock = self.0.lock().unwrap(); + let (outgoing, session, indexes) = match (instance, func) { + ("foo", "foo") => { + assert_eq!(cx, "foo"); + assert!(paths.is_empty()); + assert_eq!(params, "\x03foo".as_bytes()); + let (outgoing, session) = lock.get_mut(&("foo", "foo", b"\x03foo")).unwrap(); + (outgoing, session, HashMap::default()) + } + ("foo", "f") => { + assert_eq!(cx, "foo"); + assert!(paths.is_empty()); + assert_eq!(params, "\x03foo".as_bytes()); + let (outgoing, session) = lock.get_mut(&("foo", "f", b"\x03foo")).unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new([42].as_slice()))]), + ) + } + ("rpc-test:sync/sync", "fallible") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + let (ret, params) = match params.as_ref() { + [0x00] => ("\x01\x04test".as_bytes(), "\x00".as_bytes()), // (error "test") + [0x01] => ("\x00\x01".as_bytes(), "\x01".as_bytes()), // (ok true) + _ => panic!("invalid `fallible` parameter payload: {params:x?}"), + }; + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "fallible", params)) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new(ret))]), + ) + } + ("rpc-test:sync/sync", "numbers") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + assert_eq!(params, "".as_bytes()); + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "numbers", b"")) + .unwrap(); + debug_assert_eq!(9.0f32.to_le_bytes(), b"\x00\x00\x10\x41".as_slice()); + debug_assert_eq!( + 10.0f64.to_le_bytes(), + b"\x00\x00\x00\x00\x00\x00\x24\x40".as_slice() + ); + ( + outgoing, + session, + HashMap::from([( + vec![], + io::Cursor::new( + concat!( + "\x01\x02\x03\x04\x05\x06\x07\x08", // 1 2 3 4 5 6 7 8 + "\x00\x00\x10\x41", // 9.0 + "\x00\x00\x00\x00\x00\x00\x24\x40" // 10.0 + ) + .as_bytes(), + ), + )]), + ) + } + ("rpc-test:sync/sync", "with-flags") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + assert_eq!(params, "\x01\x00\x01".as_bytes()); + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-flags", b"\x01\x00\x01")) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new([0b101].as_slice()))]), + ) + } + ("rpc-test:sync/sync", "with-variant-option") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + let (ret, params) = match params.as_ref() { + [0x00] => ("\x00".as_bytes(), "\x00".as_bytes()), // none + [0x01] => ("\x01\x00\x03bar".as_bytes(), "\x01".as_bytes()), // (some (variant "var" (record (record "bar")))) + _ => panic!("invalid `with-variant-option` parameter payload: {params:x?}"), + }; + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-variant-option", params)) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new(ret))]), + ) + } + ("rpc-test:sync/sync", "with-record") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + assert_eq!(params, "".as_bytes()); + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-record", b"")) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new("\x03foo".as_bytes()))]), + ) + } + ("rpc-test:sync/sync", "with-record-list") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + let (ret, params) = match params.as_ref() { + [0x00] => ("\x00".as_bytes(), "\x00".as_bytes()), // (list) + [0x03] => ( + concat!("\x03", "\x010", "\x011", "\x012").as_bytes(), // (list (record (record "0")) (record (record "1")) (record (record "2"))) + "\x03".as_bytes(), + ), + _ => panic!("invalid `with-record-list` parameter payload: {params:x?}"), + }; + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-record-list", params)) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new(ret))]), + ) + } + ("rpc-test:sync/sync", "with-record-tuple") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + assert_eq!(params, "".as_bytes()); + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-record-tuple", b"")) + .unwrap(); + ( + outgoing, + session, + HashMap::from([( + vec![], + io::Cursor::new(concat!("\x010", "\x011", "\x012").as_bytes()), // (tuple (record (record "0")) (record (record "1")) (record (record "2"))) + )]), + ) + } + ("rpc-test:sync/sync", "with-enum") => { + assert_eq!(cx, "sync"); + assert!(paths.is_empty()); + assert_eq!(params, "".as_bytes()); + let (outgoing, session) = lock + .get_mut(&("rpc-test:sync/sync", "with-enum", b"")) + .unwrap(); + ( + outgoing, + session, + HashMap::from([(vec![], io::Cursor::new("\x01".as_bytes()))]), + ) + } + _ => panic!("unexpected function call `{func}` from instance `{instance}`"), + }; + Ok(wrpc_transport::Invocation { + outgoing: outgoing.clone(), + incoming: Incoming { + indexes: Arc::new(Mutex::new(indexes)), + path: vec![], + }, + session: session.clone(), + }) + } + } + + let engine = test_programs_artifacts::engine(|config| { + config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + config.async_support(true); + }); + let component = Component::from_file(&engine, RPC_SYNC_CLIENT_COMPONENT)?; + + let stdout = MemoryOutputPipe::new(4096); + let stderr = MemoryOutputPipe::new(4096); + + let wasi = WasiCtxBuilder::new() + .stdout(stdout.clone()) + .stderr(stderr.clone()) + .build(); + let invocations = Arc::new(Mutex::new(HashMap::from([ + ( + ("foo", "foo", "\x03foo".as_bytes()), + (Outgoing::default(), Session::default()), + ), + ( + ("foo", "f", b"\x03foo"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "fallible", b"\x00"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "fallible", b"\x01"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "numbers", &[]), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-flags", b"\x01\x00\x01"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-variant-option", b"\x00"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-variant-option", b"\x01"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-record", b""), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-record-list", b"\x00"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-record-list", b"\x03"), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-record-tuple", b""), + (Outgoing::default(), Session::default()), + ), + ( + ("rpc-test:sync/sync", "with-enum", b""), + (Outgoing::default(), Session::default()), + ), + ]))); + let ctx = Ctx { + table: ResourceTable::new(), + wasi, + wrpc: Transport(Arc::clone(&invocations)), + stderr, + }; + let mut store = Store::new(&engine, ctx); + let mut linker = Linker::new(&engine); + wasmtime_wasi::add_to_linker_async(&mut linker)?; + + { + let Some(types::ComponentItem::ComponentInstance(ty)) = component + .component_type() + .get_import(&engine, "rpc-test:sync/sync") + else { + bail!("`rpc-test:sync/sync` instance import not found") + }; + let mut linker = linker.instance("rpc-test:sync/sync")?; + link_instance(&engine, &mut linker, ty, "rpc-test:sync/sync", "sync")?; + } + { + let Some(types::ComponentItem::ComponentInstance(ty)) = + component.component_type().get_import(&engine, "foo") + else { + bail!("`foo` instance import not found") + }; + let mut linker = linker.instance("foo")?; + link_instance(&engine, &mut linker, ty, "foo", "foo")?; + } + + let (command, _instance) = Command::instantiate_async(&mut store, &component, &linker).await?; + let result = command.wasi_cli_run().call_run(&mut store).await?; + result.map_err(|()| anyhow::anyhow!("run failed"))?; + assert_eq!(stdout.contents(), ""); + for (_, (outgoing, session)) in invocations.lock().unwrap().iter() { + assert_eq!( + outgoing.indexes.lock().unwrap().deref(), + &HashMap::default() + ); + assert_eq!(session.outgoing.lock().unwrap().deref(), &Some(Ok(()))); + } + Ok(()) +} diff --git a/crates/rpc/tests/wit/hello/hello.wit b/crates/rpc/tests/wit/hello/hello.wit new file mode 100644 index 000000000000..1c5669a27760 --- /dev/null +++ b/crates/rpc/tests/wit/hello/hello.wit @@ -0,0 +1,13 @@ +package rpc-test:hello; + +interface handler { + hello: func(name: string) -> string; +} + +world hello-client { + import handler; +} + +world hello-server { + export handler; +} diff --git a/crates/rpc/tests/wit/sync/sync.wit b/crates/rpc/tests/wit/sync/sync.wit new file mode 100644 index 000000000000..1814eb23fb69 --- /dev/null +++ b/crates/rpc/tests/wit/sync/sync.wit @@ -0,0 +1,54 @@ +package rpc-test:sync; + +interface sync { + flags abc { + a, + b, + c, + } + + record rec-nested { + foo: string, + } + + record rec { + nested: rec-nested, + } + + variant var { + var(rec), + empty, + } + + enum foobar { + foo, + bar, + } + + fallible: func(ok: bool) -> result; + numbers: func() -> tuple; + with-flags: func(a: bool, b: bool, c: bool) -> abc; + with-variant-option: func(ok: bool) -> option; + with-record: func() -> rec; + with-record-list: func(n: u8) -> list; + with-record-tuple: func() -> tuple; + with-enum: func() -> foobar; +} + +world sync-server { + export sync; + + export foo: interface { + f: func(x: string) -> u32; + foo: func(x: string); + } +} + +world sync-client { + import sync; + + import foo: interface { + f: func(x: string) -> u32; + foo: func(x: string); + } +} diff --git a/crates/test-programs/artifacts/build.rs b/crates/test-programs/artifacts/build.rs index a515415b604c..ba7c91f664ff 100644 --- a/crates/test-programs/artifacts/build.rs +++ b/crates/test-programs/artifacts/build.rs @@ -75,6 +75,7 @@ fn build_and_generate_tests() { s if s.starts_with("api_") => "api", s if s.starts_with("nn_") => "nn", s if s.starts_with("piped_") => "piped", + s if s.starts_with("rpc_") => "rpc", // If you're reading this because you hit this panic, either add it // to a test suite above or add a new "suite". The purpose of the // categorization above is to have a static assertion that tests diff --git a/crates/test-programs/src/bin/rpc_hello_client.rs b/crates/test-programs/src/bin/rpc_hello_client.rs new file mode 100644 index 000000000000..d57e9cfee61e --- /dev/null +++ b/crates/test-programs/src/bin/rpc_hello_client.rs @@ -0,0 +1,4 @@ +fn main() { + let greeting = test_programs::rpc_hello::rpc_test::hello::handler::hello("wasmtime"); + print!("{greeting}") +} diff --git a/crates/test-programs/src/bin/rpc_sync_client.rs b/crates/test-programs/src/bin/rpc_sync_client.rs new file mode 100644 index 000000000000..05586cc5e5f7 --- /dev/null +++ b/crates/test-programs/src/bin/rpc_sync_client.rs @@ -0,0 +1,91 @@ +use test_programs::rpc_sync::{foo, rpc_test}; + +fn main() { + foo::foo("foo"); + + let v = foo::f("foo"); + assert_eq!(v, 42); + + let v = rpc_test::sync::sync::fallible(true); + assert_eq!(v, Ok(true)); + + let v = rpc_test::sync::sync::fallible(false); + assert_eq!(v, Err("test".to_string())); + + let v = rpc_test::sync::sync::numbers(); + assert_eq!(v, (1, 2, 3, 4, 5, 6, 7, 8, 9., 10.,)); + + let v = rpc_test::sync::sync::with_flags(true, false, true); + assert_eq!( + v, + rpc_test::sync::sync::Abc::A | rpc_test::sync::sync::Abc::C + ); + + let v = rpc_test::sync::sync::with_variant_option(false); + assert_eq!(v, None); + + let v = rpc_test::sync::sync::with_variant_option(true); + assert_eq!( + v, + Some(rpc_test::sync::sync::Var::Var(rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "bar".to_string() + } + })) + ); + + let v = rpc_test::sync::sync::with_record(); + assert_eq!( + v, + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "foo".to_string() + } + }, + ); + + let v = rpc_test::sync::sync::with_record_list(0); + assert_eq!(v, []); + + let v = rpc_test::sync::sync::with_record_list(3); + assert_eq!( + v, + [ + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "0".to_string() + } + }, + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "1".to_string() + } + }, + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "2".to_string() + } + }, + ] + ); + + let v = rpc_test::sync::sync::with_record_tuple(); + assert_eq!( + v, + ( + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "0".to_string() + } + }, + rpc_test::sync::sync::Rec { + nested: rpc_test::sync::sync::RecNested { + foo: "1".to_string() + } + }, + ) + ); + + let v = rpc_test::sync::sync::with_enum(); + assert_eq!(v, rpc_test::sync::sync::Foobar::Bar); +} diff --git a/crates/test-programs/src/lib.rs b/crates/test-programs/src/lib.rs index dfa4541a8fc2..9a62949797f0 100644 --- a/crates/test-programs/src/lib.rs +++ b/crates/test-programs/src/lib.rs @@ -17,3 +17,20 @@ pub mod proxy { }, }); } + +pub mod rpc_hello { + wit_bindgen::generate!({ + path: "../rpc/tests/wit/hello", + world: "hello-client", + default_bindings_module: "test_programs::rpc_hello", + }); +} + +pub mod rpc_sync { + wit_bindgen::generate!({ + path: "../rpc/tests/wit/sync", + world: "sync-client", + default_bindings_module: "test_programs::rpc_sync", + additional_derives: [PartialEq, Eq], + }); +} diff --git a/supply-chain/audits.toml b/supply-chain/audits.toml index e29b1cfc4fd0..dd5075fae5d0 100644 --- a/supply-chain/audits.toml +++ b/supply-chain/audits.toml @@ -1829,6 +1829,16 @@ criteria = "safe-to-deploy" version = "0.2.5" notes = "I am the author of this crate." +[[audits.leb128-tokio]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +version = "0.1.1" + +[[audits.leb128-tokio]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +version = "0.1.1" + [[audits.libc]] who = "Dan Gohman " criteria = "safe-to-deploy" @@ -2173,6 +2183,11 @@ a few `unsafe` blocks related to utf-8 validation which are locally verifiable as correct and otherwise this crate is good to go. """ +[[audits.pin-project-lite]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +delta = "0.2.13 -> 0.2.14" + [[audits.pin-utils]] who = "Pat Hickey " criteria = "safe-to-deploy" @@ -2557,6 +2572,11 @@ criteria = "safe-to-deploy" version = "0.7.4" notes = "Alex Crichton audited the safety of src/sync/reusable_box.rs, I audited the remainder of the crate." +[[audits.tokio-util]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +delta = "0.7.4 -> 0.7.11" + [[audits.tracing]] who = "Alex Crichton " criteria = "safe-to-deploy" @@ -2666,6 +2686,11 @@ is similar to what it once was back then. Skimming over the crate there is nothing suspicious and it's everything you'd expect a Rust URL parser to be. """ +[[audits.utf8-tokio]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +version = "0.1.1" + [[audits.vcpkg]] who = "Pat Hickey " criteria = "safe-to-deploy" @@ -2970,6 +2995,11 @@ criteria = "safe-to-run" version = "0.12.5" notes = "The Bytecode Alliance is the author of this crate." +[[audits.wasm-tokio]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +version = "0.4.1" + [[audits.wasmi]] who = "Robin Freyler " criteria = "safe-to-run" @@ -3411,6 +3441,11 @@ criteria = "safe-to-deploy" version = "0.6.4" notes = "The Bytecode Alliance is the author of this crate." +[[audits.wrpc-transport]] +who = "Roman Volosatovs " +criteria = "safe-to-deploy" +version = "0.25.0" + [[audits.xattr]] who = "Andrew Brown " criteria = "safe-to-deploy" diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index ad0f41ae2103..c6b5ea796802 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -2883,6 +2883,13 @@ version = "0.2.9" notes = "Reviewed on https://fxrev.dev/824504" aggregated-from = "https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/third_party/rust_crates/supply-chain/audits.toml?format=TEXT" +[[audits.google.audits.pin-project-lite]] +who = "David Koloski " +criteria = "safe-to-deploy" +delta = "0.2.9 -> 0.2.13" +notes = "Audited at https://fxrev.dev/946396" +aggregated-from = "https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/third_party/rust_crates/supply-chain/audits.toml?format=TEXT" + [[audits.google.audits.threadpool]] who = "Dennis Kempin " criteria = "safe-to-run"