diff --git a/Cargo.lock b/Cargo.lock index 1a2e26ed..1a637785 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1015,6 +1015,13 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "hello-component-server" +version = "0.1.0" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "hello-nats-client" version = "0.1.0" @@ -2547,6 +2554,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/crates/runtime-wasmtime/Cargo.toml b/crates/runtime-wasmtime/Cargo.toml index aced1043..022490e0 100644 --- a/crates/runtime-wasmtime/Cargo.toml +++ b/crates/runtime-wasmtime/Cargo.toml @@ -16,7 +16,7 @@ async-trait = { workspace = true } bytes = { workspace = true } futures = { workspace = true, features = ["alloc"] } tokio = { workspace = true, features = ["macros"] } -tokio-util = { workspace = true, features = ["codec"] } +tokio-util = { workspace = true, features = ["codec", "compat"] } tracing = { workspace = true, features = ["attributes"] } wasm-tokio = { workspace = true } wasmtime = { workspace = true } diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index dce7d361..93b78c98 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -9,29 +9,29 @@ use std::collections::HashSet; use std::sync::Arc; use anyhow::{anyhow, bail, Context as _}; -use bytes::{BufMut as _, BytesMut}; +use bytes::{BufMut as _, Bytes, BytesMut}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::TryStreamExt as _; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use tokio::try_join; -use tokio_util::codec::Encoder; -use tracing::{error, trace}; +use tokio_util::codec::{Encoder, FramedRead}; +use tokio_util::compat::FuturesAsyncReadCompatExt as _; +use tracing::{debug, trace}; use tracing::{instrument, warn}; use wasm_tokio::cm::AsyncReadValue as _; use wasm_tokio::{ - AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder, Leb128Encoder, - Utf8Codec, + AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder, + CoreVecEncoderBytes, Leb128Encoder, Utf8Codec, }; use wasmtime::component::types::{self, Case, Field}; -use wasmtime::component::{Linker, ResourceType, Type, Val}; -use wasmtime::{AsContextMut, StoreContextMut}; +use wasmtime::component::{LinkerInstance, ResourceType, Type, Val}; +use wasmtime::{AsContextMut, Engine, StoreContextMut}; use wasmtime_wasi::pipe::AsyncReadStream; use wasmtime_wasi::{InputStream, StreamError, WasiView}; -use wrpc_introspect::rpc_func_name; -use wrpc_transport::{Index as _, Invocation, Invoke, Session}; +use wrpc_transport::{Index as _, Invocation, Invoke, ListDecoderU8, Session}; -pub struct RemoteResource(pub String); +pub struct RemoteResource(pub Bytes); pub struct ValEncoder<'a, T, W> { pub store: StoreContextMut<'a, T>, @@ -456,8 +456,20 @@ where loop { stream.ready().await; match stream.read(8096) { - Ok(buf) => w.write_all(&buf).await?, - Err(StreamError::Closed) => return Ok(()), + Ok(buf) => { + let mut chunk = BytesMut::with_capacity( + buf.len().saturating_add(5), + ); + CoreVecEncoderBytes + .encode(buf, &mut chunk) + .context( + "failed to encode input stream chunk", + )?; + w.write_all(&chunk).await?; + } + Err(StreamError::Closed) => { + w.write_all(&[0x00]).await? + } Err(err) => return Err(err.into()), } } @@ -470,8 +482,20 @@ where let mut w = pin!(w); loop { match stream.read(8096).await { - Ok(buf) => w.write_all(&buf).await?, - Err(StreamError::Closed) => return Ok(()), + Ok(buf) => { + let mut chunk = BytesMut::with_capacity( + buf.len().saturating_add(5), + ); + CoreVecEncoderBytes + .encode(buf, &mut chunk) + .context( + "failed to encode input stream chunk", + )?; + w.write_all(&chunk).await?; + } + Err(StreamError::Closed) => { + w.write_all(&[0x00]).await? + } Err(err) => return Err(err.into()), } } @@ -497,19 +521,19 @@ where .context("resource type mismatch")?; let table = self.store.data_mut().table(); if resource.owned() { - let RemoteResource(id) = table + let RemoteResource(buf) = table .delete(resource) .context("failed to delete remote resource")?; - CoreNameEncoder - .encode(id, dst) - .context("failed to encode resource ID") + CoreVecEncoderBytes + .encode(buf, dst) + .context("failed to encode resource handle") } else { - let RemoteResource(id) = table + let RemoteResource(buf) = table .get(&resource) .context("failed to get remote resource")?; - CoreNameEncoder - .encode(id.as_str(), dst) - .context("failed to encode resource ID") + CoreVecEncoderBytes + .encode(buf, dst) + .context("failed to encode resource handle") } } else { bail!("encoding host resources not supported yet") @@ -527,7 +551,9 @@ async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Resu Ok(u128::from_le_bytes(buf)) } -pub async fn read_value( +/// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`] +#[instrument(level = "trace", skip_all, fields(ty, path))] +async fn read_value( store: &mut impl AsContextMut, r: &mut Pin<&mut R>, val: &mut Val, @@ -614,6 +640,7 @@ where for i in 0..n { let mut v = Val::Bool(false); path.push(i); + trace!(i, "reading list element value"); Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; path.pop(); vs.push(v); @@ -628,6 +655,7 @@ where for (i, Field { name, ty }) in fields.enumerate() { let mut v = Val::Bool(false); path.push(i); + trace!(i, "reading struct field value"); Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; path.pop(); vs.push((name.to_string(), v)); @@ -642,6 +670,7 @@ where for (i, ty) in types.enumerate() { let mut v = Val::Bool(false); path.push(i); + trace!(i, "reading tuple element value"); Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; path.pop(); vs.push(v); @@ -663,6 +692,7 @@ where let name = name.to_string(); if let Some(ty) = ty { let mut v = Val::Bool(false); + trace!(variant = name, "reading nested variant value"); Box::pin(read_value(store, r, &mut v, &ty, path)).await?; *val = Val::Variant(name, Some(Box::new(v))); } else { @@ -688,6 +718,7 @@ where let ok = r.read_option_status().await?; if ok { let mut v = Val::Bool(false); + trace!("reading nested `option::some` value"); Box::pin(read_value(store, r, &mut v, &ty.ty(), path)).await?; *val = Val::Option(Some(Box::new(v))); } else { @@ -700,6 +731,7 @@ where if ok { if let Some(ty) = ty.ok() { let mut v = Val::Bool(false); + trace!("reading nested `result::ok` value"); Box::pin(read_value(store, r, &mut v, &ty, path)).await?; *val = Val::Result(Ok(Some(Box::new(v)))); } else { @@ -707,6 +739,7 @@ where } } else if let Some(ty) = ty.err() { let mut v = Val::Bool(false); + trace!("reading nested `result::err` value"); Box::pin(read_value(store, r, &mut v, &ty, path)).await?; *val = Val::Result(Err(Some(Box::new(v)))); } else { @@ -758,7 +791,7 @@ where }; let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX)); for (i, name) in zip(0.., names) { - if flags & (1 << i) > 0 { + if flags & (1 << i) != 0 { vs.push(name.to_string()); } } @@ -771,10 +804,16 @@ where let r = r .index(path) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + // TODO: Implement a custom reader, this approach ignores the stream end (`\0`), + // which will could potentially break/hang with some transports let res = store .data_mut() .table() - .push(InputStream::Host(Box::new(AsyncReadStream::new(r)))) + .push(InputStream::Host(Box::new(AsyncReadStream::new( + FramedRead::new(r, ListDecoderU8::default()) + .into_async_read() + .compat(), + )))) .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?; let v = res .try_into_resource_any(store) @@ -783,11 +822,14 @@ where Ok(()) } else { let mut store = store.as_context_mut(); - let mut s = String::default(); - r.read_core_name(&mut s).await?; + let n = r.read_u32_leb128().await?; + let n = usize::try_from(n) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + let mut buf = Vec::with_capacity(n); + r.read_to_end(&mut buf).await?; let table = store.data_mut().table(); let resource = table - .push(RemoteResource(s)) + .push(RemoteResource(buf.into())) .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?; let resource = resource .try_into_resource_any(store) @@ -803,184 +845,150 @@ pub trait WrpcView: Send { fn client(&self) -> &C; } -/// Polyfills all missing imports +/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] #[instrument(level = "trace", skip_all)] -pub fn polyfill<'a, T, C, V>( - resolve: &wit_parser::Resolve, - imports: T, - engine: &wasmtime::Engine, - ty: &types::Component, - linker: &mut Linker, +pub fn link_item<'a, C, V>( + engine: &Engine, + linker: &mut LinkerInstance, + ty: types::ComponentItem, + instance: impl Into>, + name: impl Into>, cx: C::Context, -) where - T: IntoIterator, - T::IntoIter: ExactSizeIterator, +) -> wasmtime::Result<()> +where V: WrpcView + WasiView, C: Invoke, C::Context: Clone + 'static, - C::Outgoing: 'static, - C::Incoming: Unpin + Sized + 'static, { - let imports = imports.into_iter(); - for (wk, item) in imports { - let instance_name = resolve.name_world_key(wk); - // Avoid polyfilling instances, for which static bindings are linked - match instance_name.as_ref() { - "wasi:cli/environment@0.2.0" - | "wasi:cli/exit@0.2.0" - | "wasi:cli/stderr@0.2.0" - | "wasi:cli/stdin@0.2.0" - | "wasi:cli/stdout@0.2.0" - | "wasi:cli/terminal-input@0.2.0" - | "wasi:cli/terminal-output@0.2.0" - | "wasi:cli/terminal-stderr@0.2.0" - | "wasi:cli/terminal-stdin@0.2.0" - | "wasi:cli/terminal-stdout@0.2.0" - | "wasi:clocks/monotonic-clock@0.2.0" - | "wasi:clocks/wall-clock@0.2.0" - | "wasi:filesystem/preopens@0.2.0" - | "wasi:filesystem/types@0.2.0" - | "wasi:http/incoming-handler@0.2.0" - | "wasi:http/outgoing-handler@0.2.0" - | "wasi:http/types@0.2.0" - | "wasi:io/error@0.2.0" - | "wasi:io/poll@0.2.0" - | "wasi:io/streams@0.2.0" - | "wasi:keyvalue/store@0.2.0-draft" - | "wasi:random/random@0.2.0" - | "wasi:sockets/instance-network@0.2.0" - | "wasi:sockets/network@0.2.0" - | "wasi:sockets/tcp-create-socket@0.2.0" - | "wasi:sockets/tcp@0.2.0" - | "wasi:sockets/udp-create-socket@0.2.0" - | "wasi:sockets/udp@0.2.0" => continue, - _ => {} + let instance = instance.into(); + match ty { + types::ComponentItem::ComponentFunc(ty) => { + let name = name.into(); + debug!(?instance, ?name, "linking function"); + link_function(linker, ty, instance, name, cx)?; } - let wit_parser::WorldItem::Interface(interface) = item else { - continue; - }; - let Some(wit_parser::Interface { - functions, types, .. - }) = resolve.interfaces.get(*interface) - else { - warn!("component imports a non-existent interface"); - continue; - }; - let Some(types::ComponentItem::ComponentInstance(instance)) = - ty.get_import(engine, &instance_name) - else { - trace!( - instance_name, - "component does not import the parsed instance" - ); - continue; - }; - - let mut linker = linker.root(); - let mut linker = match linker.instance(&instance_name) { - Ok(linker) => linker, - Err(err) => { - error!( - ?err, - ?instance_name, - "failed to instantiate interface from root" - ); - continue; - } - }; - for (name, _) in types { - let Some(types::ComponentItem::Resource(_)) = instance.get_export(engine, name) else { - trace!(?instance_name, name, "skip non-resource type import"); - continue; - }; - if let Err(err) = - linker.resource(name, ResourceType::host::(), |_, _| Ok(())) - { - error!(?err, "failed to polyfill imported resource type"); + types::ComponentItem::CoreFunc(_) => { + bail!("polyfilling core functions not supported yet") + } + types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"), + types::ComponentItem::Component(ty) => { + for (name, ty) in ty.imports(engine) { + debug!(?instance, name, "linking component item"); + link_item(engine, linker, ty, "", name, cx.clone())?; } } + types::ComponentItem::ComponentInstance(ty) => { + let name = name.into(); + let mut linker = linker + .instance(&name) + .with_context(|| format!("failed to instantiate `{name}` in the linker"))?; + debug!(?instance, ?name, "linking instance"); + link_instance(engine, &mut linker, ty, name, cx)?; + } + types::ComponentItem::Type(_) => {} + types::ComponentItem::Resource(_) => { + let name = name.into(); + linker.resource(&name, ResourceType::host::(), |_, _| Ok(()))?; + } + } + Ok(()) +} - let instance_name = Arc::new(instance_name); - for (func_name, ty) in functions { - trace!( - ?instance_name, - func_name, - "polyfill component function import" - ); - let Some(types::ComponentItem::ComponentFunc(func)) = - instance.get_export(engine, func_name) - else { - trace!( - ?instance_name, - func_name, - "instance does not export the parsed function" - ); - continue; - }; - let cx = cx.clone(); - let instance_name = Arc::clone(&instance_name); - let func_name = Arc::new(func_name.to_string()); - let rpc_name = Arc::new(rpc_func_name(ty).to_string()); - if let Err(err) = linker.func_new_async( - Arc::clone(&func_name).as_str(), - move |mut store, params, results| { - let cx = cx.clone(); - let instance_name = Arc::clone(&instance_name); - let func = func.clone(); - let rpc_name = Arc::clone(&rpc_name); - let func_name = Arc::clone(&func_name); - Box::new(async move { - let mut buf = BytesMut::default(); - let mut deferred = vec![]; - for (v, ref ty) in zip(params, func.params()) { - let mut enc = ValEncoder::new(store.as_context_mut(), ty); - enc.encode(v, &mut buf).context("failed to encode parameter")?; - deferred.push(enc.deferred); - } - let Invocation { outgoing, incoming, session } = store - .data() - .client() - .invoke(cx, &instance_name, &rpc_name, buf.freeze(), &[[]; 0]) +/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] +#[instrument(level = "trace", skip_all)] +pub fn link_instance<'a, C, V>( + engine: &Engine, + linker: &mut LinkerInstance, + ty: types::ComponentInstance, + name: impl Into>, + cx: C::Context, +) -> wasmtime::Result<()> +where + V: WrpcView + WasiView, + C: Invoke, + C::Context: Clone + 'static, +{ + let instance = name.into(); + for (name, ty) in ty.exports(engine) { + debug!(name, "linking instance item"); + link_item(engine, linker, ty, Arc::clone(&instance), name, cx.clone())?; + } + Ok(()) +} + +/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] +#[instrument(level = "trace", skip_all)] +pub fn link_function<'a, C, V>( + linker: &mut LinkerInstance, + ty: types::ComponentFunc, + instance: impl Into>, + name: impl Into>, + cx: C::Context, +) -> wasmtime::Result<()> +where + V: WrpcView + WasiView, + C: Invoke, + C::Context: Clone + 'static, +{ + let instance = instance.into(); + let name = name.into(); + linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| { + let cx = cx.clone(); + let ty = ty.clone(); + let instance = Arc::clone(&instance); + let name = Arc::clone(&name); + Box::new(async move { + let mut buf = BytesMut::default(); + let mut deferred = vec![]; + for (v, ref ty) in zip(params, ty.params()) { + let mut enc = ValEncoder::new(store.as_context_mut(), ty); + enc.encode(v, &mut buf) + .context("failed to encode parameter")?; + deferred.push(enc.deferred); + } + let Invocation { + outgoing, + incoming, + session, + } = store + .data() + .client() + .invoke(cx, &instance, &name, buf.freeze(), &[[]]) + .await + .with_context(|| { + format!("failed to invoke `{instance}.{name}` polyfill via wRPC") + })?; + try_join!( + async { + try_join_all( + zip(0.., deferred) + .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f))) + .map(|(w, f)| async move { + let w = w?; + f(w).await + }), + ) + .await + .context("failed to write asynchronous parameters")?; + pin!(outgoing) + .shutdown() + .await + .context("failed to shutdown outgoing stream") + }, + async { + let mut incoming = pin!(incoming); + for (i, (v, ref ty)) in zip(results, ty.results()).enumerate() { + read_value(&mut store, &mut incoming, v, ty, &[i]) .await - .with_context(|| { - format!("failed to invoke `{instance_name}.{func_name}` polyfill via wRPC") - })?; - try_join!( - async { - try_join_all( - zip(0.., deferred) - .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f))) - .map(|(w, f)| async move { - let w = w?; - f(w).await - }), - ) - .await - .context("failed to write asynchronous parameters")?; - pin!(outgoing) - .shutdown() - .await - .context("failed to shutdown outgoing stream") - }, - async { - let mut incoming = pin!(incoming); - for (i, (v, ref ty)) in zip(results, func.results()).enumerate() { - read_value(&mut store, &mut incoming, v, ty, &[i]) - .await - .context("failed to decode result value")?; - } - Ok(()) - }, - )?; - match session.finish(Ok(())).await? { - Ok(()) => Ok(()), - Err(err) => bail!(anyhow!("{err}").context("session failed")) - } - }) + .with_context(|| format!("failed to decode return value {i}"))?; + } + Ok(()) }, - ) { - error!(?err, "failed to polyfill component function import"); + )?; + match session.finish(Ok(())).await? { + Ok(()) => Ok(()), + Err(err) => bail!(anyhow!("{err}").context("session failed")), } - } - } + }) + }) } diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 6a768896..ed3d470f 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -1,5 +1,14 @@ #![allow(clippy::type_complexity)] +#[cfg(feature = "frame")] +pub mod frame; + +mod value; + +#[cfg(feature = "frame")] +pub use frame::{Decoder as FrameDecoder, Encoder as FrameEncoder, FrameRef}; +pub use value::*; + use core::future::Future; use core::pin::Pin; @@ -11,15 +20,7 @@ use futures::{SinkExt as _, Stream, TryStreamExt as _}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::try_join; use tokio_util::codec::{Encoder as _, FramedRead, FramedWrite}; - -#[cfg(feature = "frame")] -pub mod frame; -#[cfg(feature = "frame")] -pub use frame::{Decoder as FrameDecoder, Encoder as FrameEncoder, FrameRef}; - -mod value; use tracing::{debug, instrument, trace, Instrument as _}; -pub use value::*; /// `Index` implementations are capable of multiplexing underlying connections using a particular /// structural `path` diff --git a/crates/wasmtime-nats-cli/src/lib.rs b/crates/wasmtime-nats-cli/src/lib.rs index 520c008e..c5765ba0 100644 --- a/crates/wasmtime-nats-cli/src/lib.rs +++ b/crates/wasmtime-nats-cli/src/lib.rs @@ -1,16 +1,14 @@ use anyhow::{anyhow, bail, Context as _}; use clap::Parser; use tokio::fs; -use tracing::instrument; +use tracing::{error, instrument, trace}; use url::Url; use wasmcloud_component_adapters::WASI_PREVIEW1_COMMAND_COMPONENT_ADAPTER; -use wasmtime::{ - component::{Component, Linker}, - Store, -}; +use wasmtime::component::{types, Component, Linker}; +use wasmtime::Store; use wasmtime_wasi::{bindings::Command, WasiCtx, WasiView}; use wasmtime_wasi::{ResourceTable, WasiCtxBuilder}; -use wrpc_runtime_wasmtime::{polyfill, WrpcView}; +use wrpc_runtime_wasmtime::{link_instance, WrpcView}; use wrpc_transport::Invoke; #[derive(Parser, Debug)] @@ -137,14 +135,67 @@ pub async fn run() -> anyhow::Result<()> { .find_map(|(id, w)| (id == world).then_some(w)) .context("component world missing")?; - polyfill( - &resolve, - imports, - &engine, - &component.component_type(), - &mut linker, - None, - ); + let ty = component.component_type(); + for (wk, _) in imports { + let instance_name = resolve.name_world_key(wk); + // Avoid polyfilling instances, for which static bindings are linked + match instance_name.as_ref() { + "wasi:cli/environment@0.2.0" + | "wasi:cli/exit@0.2.0" + | "wasi:cli/stderr@0.2.0" + | "wasi:cli/stdin@0.2.0" + | "wasi:cli/stdout@0.2.0" + | "wasi:cli/terminal-input@0.2.0" + | "wasi:cli/terminal-output@0.2.0" + | "wasi:cli/terminal-stderr@0.2.0" + | "wasi:cli/terminal-stdin@0.2.0" + | "wasi:cli/terminal-stdout@0.2.0" + | "wasi:clocks/monotonic-clock@0.2.0" + | "wasi:clocks/wall-clock@0.2.0" + | "wasi:filesystem/preopens@0.2.0" + | "wasi:filesystem/types@0.2.0" + | "wasi:http/incoming-handler@0.2.0" + | "wasi:http/outgoing-handler@0.2.0" + | "wasi:http/types@0.2.0" + | "wasi:io/error@0.2.0" + | "wasi:io/poll@0.2.0" + | "wasi:io/streams@0.2.0" + | "wasi:keyvalue/store@0.2.0-draft" + | "wasi:random/random@0.2.0" + | "wasi:sockets/instance-network@0.2.0" + | "wasi:sockets/network@0.2.0" + | "wasi:sockets/tcp-create-socket@0.2.0" + | "wasi:sockets/tcp@0.2.0" + | "wasi:sockets/udp-create-socket@0.2.0" + | "wasi:sockets/udp@0.2.0" => continue, + _ => {} + } + let Some(types::ComponentItem::ComponentInstance(instance)) = + ty.get_import(&engine, &instance_name) + else { + trace!( + instance_name, + "component does not import the parsed instance" + ); + continue; + }; + + let mut linker = linker.root(); + let mut linker = match linker.instance(&instance_name) { + Ok(linker) => linker, + Err(err) => { + error!( + ?err, + ?instance_name, + "failed to instantiate interface from root" + ); + continue; + } + }; + if let Err(err) = link_instance(&engine, &mut linker, instance, instance_name, None) { + error!(?err, "failed to polyfill instance"); + } + } let pre = linker .instantiate_pre(&component)