Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce async support #96

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ tidy:
.PHONY: test
test: wasm
cargo test --workspace

.PHONY: doc
doc:
RUSTDOCFLAGS="--cfg docsrs -D warnings" cargo +nightly doc --all-features --no-deps
2 changes: 1 addition & 1 deletion crates/wapc-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ independent = true
status = "actively-developed"

[dependencies]
wapc = { path = "../wapc", version = "1.1.0" }
wapc = { path = "../wapc", version = "2.0.0" }
log = "0.4"
thiserror = "1.0"
rusty_pool = "0.7"
Expand Down
15 changes: 14 additions & 1 deletion crates/wapc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
[package]
name = "wapc"
version = "1.1.0"
version = "2.0.0"
authors = [
"Kevin Hoffman <[email protected]>",
"Jarrod Overson <[email protected]>",
"Phil Kedy <[email protected]>",
"Flavio Castelli <[email protected]>",
]
edition = "2021"
description = "An engine-pluggable WebAssembly Host Runtime implementing the waPC protocol."
license = "Apache-2.0"
homepage = "https://wapc.io"
repository = "https://github.com/wapc/wapc-rs"
documentation = "https://docs.rs/wapc"
readme = "README.md"
keywords = ["sdk", "wapc", "webassembly", "wasm", "wasi"]
categories = ["wasm", "api-bindings"]

[features]
default = ["async"]
async = ["async-trait", "tokio"]

[package.metadata.docs.rs]
all-features = true

[package.metadata.workspaces]
independent = true

[dependencies]
log = "0.4"
parking_lot = "0.12"
thiserror = "1.0"
async-trait = { version = "0.1", optional = true }
tokio = { version = "1", optional = true, default-features = false, features = [
"sync",
] }

[dev-dependencies]
wasmtime-provider = { path = "../wasmtime-provider" }
67 changes: 65 additions & 2 deletions crates/wapc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ To make function calls, ensure that you provided a suitable host callback functi
The following is an example of synchronous, bi-directional procedure calls between a WebAssembly host runtime and the guest module.

```rust
use wasmtime_provider::WasmtimeEngineProviderBuilder; // Or Wasm3EngineProvider
use wapc::WapcHost;
use std::error::Error;

use wapc::WapcHost;
use wasmtime_provider::WasmtimeEngineProviderBuilder; // Or Wasm3EngineProvider

pub fn main() -> Result<(), Box<dyn Error>> {

// Sample host callback that prints the operation a WASM module requested.
Expand Down Expand Up @@ -64,6 +66,67 @@ repositories:

waPC is _reactive_. Hosts make requests and guests respond. During a request, guests can initiate calls back to the host and interact with the environment (via WASI). When a request is done the guest should be considered parked until the next request.

# `async` Support

A waPC-compliant WebAssembly module can be used in an asynchronous context. This can
be done using a [`WapcHostAsync`] and a provider that implements the [`WebAssemblyEngineProviderAsync`] trait.
Currently only the [`wasmtime-provider`](https://crates.io/crates/wasmtime-provider) crate
provides an implementation of this trait.

**Note:** the `async` support relies on the tokio runtime.

## Example

The following is an example of synchronous, bi-directional procedure calls between a WebAssembly host runtime and the guest module,
all done inside an asynchronous context.

```rust
use std::error::Error;

use wapc::{HostCallbackAsync, WapcHostAsync};
use wasmtime_provider::WasmtimeEngineProviderBuilder;

async fn host_callback(
id: u64,
bd: String,
ns: String,
op: String,
payload: Vec<u8>,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
println!(
"Guest {} invoked '{}->{}:{}' on the host with a payload of '{}'",
id,
bd,
ns,
op,
::std::str::from_utf8(&payload).unwrap()
);
Ok(vec![])
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let file = "../../wasm/crates/wasm-basic/build/wasm_basic.wasm";
let module_bytes = std::fs::read(file)?;

let engine = WasmtimeEngineProviderBuilder::new()
.module_bytes(&module_bytes)
.build_async()?;

let callback: Box<HostCallbackAsync> = Box::new(move |id, bd, ns, op, payload| {
let fut = host_callback(id, bd, ns, op, payload);
Box::pin(fut)
});

let host = WapcHostAsync::new(Box::new(engine), Some(callback)).await?;

let res = host.call("ping", b"payload bytes").await?;
assert_eq!(res, b"payload bytes");

Ok(())
}
```

## RPC Exchange Flow

The following is a detailed outline of which functions are invoked and in which order to support
Expand Down
46 changes: 35 additions & 11 deletions crates/wapc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,56 @@
)]
#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/54989751?s=200&v=4")]
#![doc = include_str!("../README.md")]

#[macro_use]
extern crate log;
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod errors;

use std::error::Error;

/// The host module name / namespace that guest modules must use for imports
pub const HOST_NAMESPACE: &str = "wapc";

/// A list of the function names that are part of each waPC conversation
pub mod wapc_functions;

mod wapchost;
mod wasi;

#[cfg(feature = "async")]
use core::future::Future;

use std::error::Error;

// sync exports
pub use wapchost::modulestate::ModuleState;
pub use wapchost::traits::{ModuleHost, WebAssemblyEngineProvider};
pub use wapchost::WapcHost;

// async exports
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use wapchost::modulestate_async::ModuleStateAsync;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use wapchost::traits::{ModuleHostAsync, WebAssemblyEngineProviderAsync};
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use wapchost::WapcHostAsync;

pub use wasi::WasiParams;

/// The host module name / namespace that guest modules must use for imports
pub const HOST_NAMESPACE: &str = "wapc";

/// The signature of a Host Callback function.
pub type HostCallback =
dyn Fn(u64, &str, &str, &str, &[u8]) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> + Sync + Send + 'static;

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
/// The signature of an async Host Callback function.
pub type HostCallbackAsync = dyn Fn(
u64,
String,
String,
String,
Vec<u8>,
) -> std::pin::Pin<Box<dyn Future<Output = Result<Vec<u8>, Box<dyn Error + Send + Sync>>> + Send>>
+ Send
+ Sync;

#[derive(Debug, Clone)]
/// Represents a waPC invocation, which is a combination of an operation string and the
/// corresponding binary payload
Expand Down
2 changes: 2 additions & 0 deletions crates/wapc/src/wapc_functions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A list of the function names that are part of each waPC conversation

// -- Functions called by guest, exported by host
/// The waPC protocol function `__console_log`
pub const HOST_CONSOLE_LOG: &str = "__console_log";
Expand Down
143 changes: 13 additions & 130 deletions crates/wapc/src/wapchost.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,22 @@
mod host;
pub(crate) mod modulestate;
pub(crate) mod traits;

use std::cell::RefCell;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use self::modulestate::ModuleState;
use self::traits::WebAssemblyEngineProvider;
use crate::{errors, HostCallback, Invocation};

static GLOBAL_MODULE_COUNT: AtomicU64 = AtomicU64::new(1);

type Result<T> = std::result::Result<T, errors::Error>;

/// A WebAssembly host runtime for waPC-compliant modules
///
/// Use an instance of this struct to provide a means of invoking procedure calls by
/// specifying an operation name and a set of bytes representing the opaque operation payload.
/// `WapcHost` makes no assumptions about the contents or format of either the payload or the
/// operation name, other than that the operation name is a UTF-8 encoded string.
#[must_use]
pub struct WapcHost {
engine: RefCell<Box<dyn WebAssemblyEngineProvider>>,
state: Arc<ModuleState>,
}

impl std::fmt::Debug for WapcHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WapcHost").field("state", &self.state).finish()
}
}
#[cfg(feature = "async")]
mod host_async;
#[cfg(feature = "async")]
pub(crate) mod modulestate_async;

impl WapcHost {
/// Creates a new instance of a waPC-compliant host runtime paired with a given
/// low-level engine provider
pub fn new(engine: Box<dyn WebAssemblyEngineProvider>, host_callback: Option<Box<HostCallback>>) -> Result<Self> {
let id = GLOBAL_MODULE_COUNT.fetch_add(1, Ordering::SeqCst);

let state = Arc::new(ModuleState::new(host_callback, id));

let mh = WapcHost {
engine: RefCell::new(engine),
state: state.clone(),
};

mh.initialize(state)?;

Ok(mh)
}

fn initialize(&self, state: Arc<ModuleState>) -> Result<()> {
match self.engine.borrow_mut().init(state) {
Ok(_) => Ok(()),
Err(e) => Err(errors::Error::InitFailed(e.to_string())),
}
}
pub(crate) mod traits;

/// Returns a reference to the unique identifier of this module. If a parent process
/// has instantiated multiple `WapcHost`s, then the single static host callback function
/// will contain this value to allow disambiguation of modules
pub fn id(&self) -> u64 {
self.state.id
}
use std::sync::atomic::AtomicU64;

/// Invokes the `__guest_call` function within the guest module as per the waPC specification.
/// Provide an operation name and an opaque payload of bytes and the function returns a `Result`
/// containing either an error or an opaque reply of bytes.
///
/// It is worth noting that the _first_ time `call` is invoked, the WebAssembly module
/// might incur a "cold start" penalty, depending on which underlying engine you're using. This
/// might be due to lazy initialization or JIT-compilation.
pub fn call(&self, op: &str, payload: &[u8]) -> Result<Vec<u8>> {
let inv = Invocation::new(op, payload.to_vec());
let op_len = inv.operation.len();
let msg_len = inv.msg.len();
use crate::{errors, HostCallback, Invocation};

{
*self.state.guest_response.write() = None;
*self.state.guest_request.write() = Some(inv);
*self.state.guest_error.write() = None;
*self.state.host_response.write() = None;
*self.state.host_error.write() = None;
}
pub(crate) static GLOBAL_MODULE_COUNT: AtomicU64 = AtomicU64::new(1);

let callresult = match self.engine.borrow_mut().call(op_len as i32, msg_len as i32) {
Ok(c) => c,
Err(e) => {
return Err(errors::Error::GuestCallFailure(e.to_string()));
}
};
pub(crate) type Result<T> = std::result::Result<T, errors::Error>;

if callresult == 0 {
// invocation failed
let lock = self.state.guest_error.read();
lock.as_ref().map_or_else(
|| {
Err(errors::Error::GuestCallFailure(
"No error message set for call failure".to_owned(),
))
},
|s| Err(errors::Error::GuestCallFailure(s.clone())),
)
} else {
// invocation succeeded
self.state.guest_response.read().as_ref().map_or_else(
|| {
let lock = self.state.guest_error.read();
lock.as_ref().map_or_else(
|| {
Err(errors::Error::GuestCallFailure(
"No error message OR response set for call success".to_owned(),
))
},
|s| Err(errors::Error::GuestCallFailure(s.clone())),
)
},
|e| Ok(e.clone()),
)
}
}
pub use host::WapcHost;

/// Performs a live "hot swap" of the WebAssembly module. Since all internal waPC execution is assumed to be
/// single-threaded and non-reentrant, this call is synchronous and so
/// you should never attempt to invoke `call` from another thread while performing this hot swap.
///
/// **Note**: if the underlying engine you've chosen is a JITting engine, then performing a swap
/// will re-introduce a "cold start" delay upon the next function call.
///
/// If you perform a hot swap of a WASI module, you cannot alter the parameters used to create the WASI module
/// like the environment variables, mapped directories, pre-opened files, etc. Not abiding by this could lead
/// to privilege escalation attacks or non-deterministic behavior after the swap.
pub fn replace_module(&self, module: &[u8]) -> Result<()> {
match self.engine.borrow_mut().replace(module) {
Ok(_) => Ok(()),
Err(e) => Err(errors::Error::ReplacementFailed(e.to_string())),
}
}
}
#[cfg(feature = "async")]
pub use host_async::WapcHostAsync;
Loading