diff --git a/Cargo.lock b/Cargo.lock index 990389b71..bc0289c11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -693,16 +693,9 @@ dependencies = [ name = "containerd-shim-benchmarks" version = "0.4.0" dependencies = [ - "anyhow", - "chrono", "containerd-shim-wasm", "containerd-shim-wasmtime", "criterion", - "libc", - "oci-spec", - "serde", - "serde_json", - "tempfile", "wasmtime", ] @@ -767,7 +760,6 @@ version = "0.4.1" dependencies = [ "anyhow", "lazy_static", - "libc", "wat", ] @@ -776,13 +768,10 @@ name = "containerd-shim-wasmedge" version = "0.5.0" dependencies = [ "anyhow", - "containerd-shim", "containerd-shim-wasm", "libc", "log", - "oci-spec", "serial_test", - "ttrpc", "wasmedge-sdk", ] @@ -791,15 +780,11 @@ name = "containerd-shim-wasmer" version = "0.5.0" dependencies = [ "anyhow", - "containerd-shim", "containerd-shim-wasm", "log", - "oci-spec", "serial_test", "tokio", - "ttrpc", "wasmer", - "wasmer-compiler", "wasmer-wasix", ] @@ -808,13 +793,9 @@ name = "containerd-shim-wasmtime" version = "0.5.0" dependencies = [ "anyhow", - "containerd-shim", "containerd-shim-wasm", "log", - "oci-spec", "serial_test", - "sha256", - "ttrpc", "wasmtime", "wasmtime-wasi", ] diff --git a/benches/containerd-shim-benchmarks/Cargo.toml b/benches/containerd-shim-benchmarks/Cargo.toml index fd704bc59..318097bd5 100644 --- a/benches/containerd-shim-benchmarks/Cargo.toml +++ b/benches/containerd-shim-benchmarks/Cargo.toml @@ -4,15 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] -anyhow = { workspace = true } -chrono = { workspace = true } containerd-shim-wasm = { path = "../../crates/containerd-shim-wasm", features = ["testing"] } containerd-shim-wasmtime = { path = "../../crates/containerd-shim-wasmtime" } -libc = { workspace = true } -oci-spec = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -tempfile = "3.8" wasmtime = { workspace = true } [dev-dependencies] diff --git a/crates/containerd-shim-wasm-test-modules/Cargo.toml b/crates/containerd-shim-wasm-test-modules/Cargo.toml index a5f3572ce..31a629f13 100644 --- a/crates/containerd-shim-wasm-test-modules/Cargo.toml +++ b/crates/containerd-shim-wasm-test-modules/Cargo.toml @@ -6,8 +6,6 @@ edition.workspace = true license.workspace = true [dependencies] -anyhow = { workspace = true } -libc = { workspace = true } [build-dependencies] anyhow = { workspace = true } diff --git a/crates/containerd-shim-wasmedge/Cargo.toml b/crates/containerd-shim-wasmedge/Cargo.toml index c2047dc90..18fc12e09 100644 --- a/crates/containerd-shim-wasmedge/Cargo.toml +++ b/crates/containerd-shim-wasmedge/Cargo.toml @@ -5,11 +5,8 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } -containerd-shim = { workspace = true } containerd-shim-wasm = { workspace = true } log = { workspace = true } -oci-spec = { workspace = true, features = ["runtime"] } -ttrpc = { workspace = true } # may need to bump wasmedge version in scripts/setup-windows.sh wasmedge-sdk = { version = "0.13.2" } diff --git a/crates/containerd-shim-wasmer/Cargo.toml b/crates/containerd-shim-wasmer/Cargo.toml index 5396614e3..a2fd3909d 100644 --- a/crates/containerd-shim-wasmer/Cargo.toml +++ b/crates/containerd-shim-wasmer/Cargo.toml @@ -5,15 +5,11 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } -containerd-shim = { workspace = true } containerd-shim-wasm = { workspace = true } log = { workspace = true } -oci-spec = { workspace = true, features = ["runtime"] } -ttrpc = { workspace = true } tokio = "1.38.1" wasmer = { version = "4.1.2" } -wasmer-compiler = { version = "4.1.2", features = ["compiler"] } wasmer-wasix = { version = "0.12.0" } [dev-dependencies] diff --git a/crates/containerd-shim-wasmtime/Cargo.toml b/crates/containerd-shim-wasmtime/Cargo.toml index c36696855..0ac74520c 100644 --- a/crates/containerd-shim-wasmtime/Cargo.toml +++ b/crates/containerd-shim-wasmtime/Cargo.toml @@ -5,12 +5,8 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } -containerd-shim = { workspace = true } containerd-shim-wasm = { workspace = true, features = ["opentelemetry"] } log = { workspace = true } -oci-spec = { workspace = true, features = ["runtime"] } -ttrpc = { workspace = true } -sha256 = { workspace = true } wasmtime = { workspace = true } wasmtime-wasi = { workspace = true } diff --git a/crates/containerd-shim-wasmtime/src/component.rs b/crates/containerd-shim-wasmtime/src/component.rs new file mode 100644 index 000000000..584eb04c2 --- /dev/null +++ b/crates/containerd-shim-wasmtime/src/component.rs @@ -0,0 +1,43 @@ +use containerd_shim_wasm::container::WasmBinaryType; +use wasmtime::component::Component; + +use crate::instance::{WasiConfig, WasmtimeEngine}; + +pub(crate) enum Worlds { + WasiHTTPProxy, + WasiCLICommand, +} + +impl WasmtimeEngine +where + T: std::clone::Clone + Sync + WasiConfig + Send + 'static, +{ + pub(crate) fn detect_world(&self, component: Component) -> Worlds { + let ty = component.component_type(); + for (name, _) in ty.exports(&self.engine) { + match name { + "wasi:http/incoming-handler@0.2.0" | "wasi:http/incoming-handler@0.2.1" => { + return Worlds::WasiHTTPProxy + } + _ => {} + } + } + Worlds::WasiCLICommand + } + + // best effort to detect if the wasm component is targeting a wasi-http proxy + pub fn is_wasi_http(&self, wasm_bytes: &std::borrow::Cow<'_, [u8]>) -> anyhow::Result { + let res = match WasmBinaryType::from_bytes(wasm_bytes) { + Some(WasmBinaryType::Component) => { + let component = Component::from_binary(&self.engine, wasm_bytes)?; + match self.detect_world(component) { + Worlds::WasiHTTPProxy => true, + _ => false, + } + } + Some(_) => false, + None => false, + }; + Ok(res) + } +} diff --git a/crates/containerd-shim-wasmtime/src/http.rs b/crates/containerd-shim-wasmtime/src/http.rs new file mode 100644 index 000000000..2b20def6c --- /dev/null +++ b/crates/containerd-shim-wasmtime/src/http.rs @@ -0,0 +1,423 @@ +use anyhow::{anyhow, bail, Result}; +use std::net::SocketAddr; +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, +}; +use wasmtime::component::{InstancePre, Linker}; +use wasmtime::{Config, Engine, Memory, MemoryType, Store, StoreLimits}; +use wasmtime_wasi::{StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi_http::io::TokioIo; +use wasmtime_wasi_http::{ + bindings::http::types as http_types, body::HyperOutgoingBody, hyper_response_error, + WasiHttpCtx, WasiHttpView, +}; +use wasmtime_wasi::{self as wasi_preview2}; +struct Host { + table: wasmtime::component::ResourceTable, + ctx: WasiCtx, + http: WasiHttpCtx, + + limits: StoreLimits, + + #[cfg(feature = "wasi-nn")] + nn: Option, +} + +impl WasiView for Host { + fn table(&mut self) -> &mut wasmtime::component::ResourceTable { + &mut self.table + } + + fn ctx(&mut self) -> &mut WasiCtx { + &mut self.ctx + } +} + +impl WasiHttpView for Host { + fn table(&mut self) -> &mut wasmtime::component::ResourceTable { + &mut self.table + } + + fn ctx(&mut self) -> &mut WasiHttpCtx { + &mut self.http + } +} + +const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), + 8080, +); + +pub struct ServeCommand { +} + +impl ServeCommand { + /// Start a server to run the given wasi-http proxy component + pub fn execute(mut self) -> Result<()> { + self.run.common.init_logging()?; + + // We force cli errors before starting to listen for connections so then we don't + // accidentally delay them to the first request. + if self.run.common.wasi.nn == Some(true) { + #[cfg(not(feature = "wasi-nn"))] + { + bail!("Cannot enable wasi-nn when the binary is not compiled with this feature."); + } + } + + if let Some(Profile::Guest { .. }) = &self.run.profile { + bail!("Cannot use the guest profiler with components"); + } + + if self.run.common.wasi.nn == Some(true) { + #[cfg(not(feature = "wasi-nn"))] + { + bail!("Cannot enable wasi-nn when the binary is not compiled with this feature."); + } + } + + if self.run.common.wasi.threads == Some(true) { + bail!("wasi-threads does not support components yet") + } + + // The serve command requires both wasi-http and the component model, so we enable those by + // default here. + if self.run.common.wasi.http.replace(true) == Some(false) { + bail!("wasi-http is required for the serve command, and must not be disabled"); + } + if self.run.common.wasm.component_model.replace(true) == Some(false) { + bail!("components are required for the serve command, and must not be disabled"); + } + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build()?; + + runtime.block_on(async move { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + Ok::<_, anyhow::Error>(()) + } + + res = self.serve() => { + res + } + } + })?; + + Ok(()) + } + + fn new_store(&self, builder: wasi_preview2::WasiCtxBuilder, engine: &Engine, req_id: u64) -> Result> { + let mut builder = builder; + builder.env("REQUEST_ID", req_id.to_string()); + let host = Host { + table: wasmtime::component::ResourceTable::new(), + ctx: builder.build(), + http: WasiHttpCtx::new(), + + limits: StoreLimits::default(), + }; + + let mut store = Store::new(engine, host); + store.limiter(|t| &mut t.limits); + Ok(store) + } + + fn add_to_linker(&self, linker: &mut Linker) -> Result<()> { + wasmtime_wasi::add_to_linker_async(linker)?; + wasmtime_wasi_http::proxy::add_only_http_to_linker(linker)?; + Ok(()) + } + + async fn serve(mut self) -> Result<()> { + use hyper::server::conn::http1; + + let component = match self.run.load_module(&engine, &self.component)? { + RunTarget::Core(_) => bail!("The serve command currently requires a component"), + RunTarget::Component(c) => c, + }; + + let instance = linker.instantiate_pre(&component)?; + + let socket = match &self.addr { + SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?, + SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?, + }; + // Conditionally enable `SO_REUSEADDR` depending on the current + // platform. On Unix we want this to be able to rebind an address in + // the `TIME_WAIT` state which can happen then a server is killed with + // active TCP connections and then restarted. On Windows though if + // `SO_REUSEADDR` is specified then it enables multiple applications to + // bind the port at the same time which is not something we want. Hence + // this is conditionally set based on the platform (and deviates from + // Tokio's default from always-on). + socket.set_reuseaddr(!cfg!(windows))?; + socket.bind(self.addr)?; + let listener = socket.listen(100)?; + + eprintln!("Serving HTTP on http://{}/", listener.local_addr()?); + + let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout { + Some(EpochThread::spawn( + timeout / EPOCH_PRECISION, + engine.clone(), + )) + } else { + None + }; + + log::info!("Listening on {}", self.addr); + + let handler = ProxyHandler::new(self, engine, instance); + + loop { + let (stream, _) = listener.accept().await?; + let stream = TokioIo::new(stream); + let h = handler.clone(); + tokio::task::spawn(async { + if let Err(e) = http1::Builder::new() + .keep_alive(true) + .serve_connection( + stream, + hyper::service::service_fn(move |req| handle_request(h.clone(), req)), + ) + .await + { + eprintln!("error: {e:?}"); + } + }); + } + } +} + +/// This is the number of epochs that we will observe before expiring a request handler. As +/// instances may be started at any point within an epoch, and epochs are counted globally per +/// engine, we expire after `EPOCH_PRECISION + 1` epochs have been observed. This gives a maximum +/// overshoot of `timeout / EPOCH_PRECISION`, which is more desirable than expiring early. +const EPOCH_PRECISION: u32 = 10; + +struct EpochThread { + shutdown: Arc, + handle: Option>, +} + +impl EpochThread { + fn spawn(timeout: std::time::Duration, engine: Engine) -> Self { + let shutdown = Arc::new(AtomicBool::new(false)); + let handle = { + let shutdown = Arc::clone(&shutdown); + let handle = std::thread::spawn(move || { + while !shutdown.load(Ordering::Relaxed) { + std::thread::sleep(timeout); + engine.increment_epoch(); + } + }); + Some(handle) + }; + + EpochThread { shutdown, handle } + } +} + +impl Drop for EpochThread { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + self.shutdown.store(true, Ordering::Relaxed); + handle.join().unwrap(); + } + } +} + +struct ProxyHandlerInner { + cmd: ServeCommand, + engine: Engine, + instance_pre: InstancePre, + next_id: AtomicU64, +} + +impl ProxyHandlerInner { + fn next_req_id(&self) -> u64 { + self.next_id.fetch_add(1, Ordering::Relaxed) + } +} + +#[derive(Clone)] +struct ProxyHandler(Arc); + +impl ProxyHandler { + fn new(cmd: ServeCommand, engine: Engine, instance_pre: InstancePre) -> Self { + Self(Arc::new(ProxyHandlerInner { + cmd, + engine, + instance_pre, + next_id: AtomicU64::from(0), + })) + } +} + +type Request = hyper::Request; + +async fn handle_request( + ProxyHandler(inner): ProxyHandler, + req: Request, +) -> Result> { + use http_body_util::BodyExt; + + let (sender, receiver) = tokio::sync::oneshot::channel(); + + let task = tokio::task::spawn(async move { + let req_id = inner.next_req_id(); + let (mut parts, body) = req.into_parts(); + + parts.uri = { + let uri_parts = parts.uri.into_parts(); + + let scheme = uri_parts.scheme.unwrap_or(http::uri::Scheme::HTTP); + + let host = if let Some(val) = parts.headers.get(hyper::header::HOST) { + std::str::from_utf8(val.as_bytes()) + .map_err(|_| http_types::ErrorCode::HttpRequestUriInvalid)? + } else { + uri_parts + .authority + .as_ref() + .ok_or(http_types::ErrorCode::HttpRequestUriInvalid)? + .host() + }; + + let path_with_query = uri_parts + .path_and_query + .ok_or(http_types::ErrorCode::HttpRequestUriInvalid)?; + + hyper::Uri::builder() + .scheme(scheme) + .authority(host) + .path_and_query(path_with_query) + .build() + .map_err(|_| http_types::ErrorCode::HttpRequestUriInvalid)? + }; + + let req = hyper::Request::from_parts(parts, body.map_err(hyper_response_error).boxed()); + + log::info!( + "Request {req_id} handling {} to {}", + req.method(), + req.uri() + ); + + let mut store = inner.cmd.new_store(&inner.engine, req_id)?; + + let req = store.data_mut().new_incoming_request(req)?; + let out = store.data_mut().new_response_outparam(sender)?; + + let (proxy, _inst) = + wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &inner.instance_pre) + .await?; + + if let Err(e) = proxy + .wasi_http_incoming_handler() + .call_handle(store, req, out) + .await + { + log::error!("[{req_id}] :: {:#?}", e); + return Err(e); + } + + Ok(()) + }); + + match receiver.await { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(e)) => Err(e.into()), + Err(_) => { + // An error in the receiver (`RecvError`) only indicates that the + // task exited before a response was sent (i.e., the sender was + // dropped); it does not describe the underlying cause of failure. + // Instead we retrieve and propagate the error from inside the task + // which should more clearly tell the user what went wrong. Note + // that we assume the task has already exited at this point so the + // `await` should resolve immediately. + let e = match task.await { + Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"), + Err(e) => e.into(), + }; + bail!("guest never invoked `response-outparam::set` method: {e:?}") + } + } +} + +#[derive(Clone)] +enum Output { + Stdout, + Stderr, +} + +impl Output { + fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> { + use std::io::Write; + + match self { + Output::Stdout => std::io::stdout().write_all(buf), + Output::Stderr => std::io::stderr().write_all(buf), + } + .map_err(|e| anyhow!(e)) + } +} + +#[derive(Clone)] +struct LogStream { + prefix: String, + output: Output, +} + +impl wasmtime_wasi::StdoutStream for LogStream { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + use std::io::IsTerminal; + + match &self.output { + Output::Stdout => std::io::stdout().is_terminal(), + Output::Stderr => std::io::stderr().is_terminal(), + } + } +} + +impl wasmtime_wasi::HostOutputStream for LogStream { + fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> { + let mut msg = Vec::new(); + + for line in bytes.split(|c| *c == b'\n') { + if !line.is_empty() { + msg.extend_from_slice(&self.prefix.as_bytes()); + msg.extend_from_slice(line); + msg.push(b'\n'); + } + } + + self.output + .write_all(&msg) + .map_err(StreamError::LastOperationFailed) + } + + fn flush(&mut self) -> StreamResult<()> { + Ok(()) + } + + fn check_write(&mut self) -> StreamResult { + Ok(1024 * 1024) + } +} + +#[async_trait::async_trait] +impl wasmtime_wasi::Subscribe for LogStream { + async fn ready(&mut self) {} +}