From af24bb1bda3c1671cf98bba01fb747b3839a5328 Mon Sep 17 00:00:00 2001 From: Radu Matei Date: Fri, 5 Apr 2024 22:40:24 +0200 Subject: [PATCH 1/3] chore(*): cleanup engine imports Signed-off-by: Radu Matei --- containerd-shim-spin/src/engine.rs | 43 +++++++++++++++--------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/containerd-shim-spin/src/engine.rs b/containerd-shim-spin/src/engine.rs index b35a040d..6d617ed4 100644 --- a/containerd-shim-spin/src/engine.rs +++ b/containerd-shim-spin/src/engine.rs @@ -1,25 +1,28 @@ use anyhow::{anyhow, ensure, Context, Result}; -use containerd_shim_wasm::container::{Engine, RuntimeContext, Stdio}; -use containerd_shim_wasm::sandbox::WasmLayer; +use containerd_shim_wasm::{ + container::{Engine, RuntimeContext, Source, Stdio}, + sandbox::WasmLayer, +}; use log::info; use oci_spec::image::MediaType; use spin_app::locked::LockedApp; -use spin_loader::cache::Cache; -use spin_loader::FilesMountStrategy; +use spin_loader::{cache::Cache, FilesMountStrategy}; use spin_manifest::schema::v2::AppManifest; -use spin_trigger::TriggerHooks; -use spin_trigger::{loader, RuntimeConfig, TriggerExecutor, TriggerExecutorBuilder}; +use spin_oci::OciLoader; +use spin_trigger::{ + loader::TriggerLoader, RuntimeConfig, TriggerExecutor, TriggerExecutorBuilder, TriggerHooks, +}; use spin_trigger_http::HttpTrigger; use spin_trigger_redis::RedisTrigger; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashSet; -use std::env; -use std::fs::File; -use std::hash::{Hash, Hasher}; -use std::io::Write; -use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use std::path::{Path, PathBuf}; +use std::{ + collections::{hash_map::DefaultHasher, HashSet}, + env, + fs::File, + hash::{Hash, Hasher}, + io::Write, + net::{SocketAddr, ToSocketAddrs}, + path::{Path, PathBuf}, +}; use tokio::runtime::Runtime; use trigger_command::CommandTrigger; use trigger_sqs::SqsTrigger; @@ -97,10 +100,8 @@ impl std::fmt::Debug for AppSource { impl SpinEngine { async fn app_source(&self, ctx: &impl RuntimeContext, cache: &Cache) -> Result { match ctx.entrypoint().source { - containerd_shim_wasm::container::Source::File(_) => { - Ok(AppSource::File(SPIN_MANIFEST_FILE_PATH.into())) - } - containerd_shim_wasm::container::Source::Oci(layers) => { + Source::File(_) => Ok(AppSource::File(SPIN_MANIFEST_FILE_PATH.into())), + Source::Oci(layers) => { info!(" >>> configuring spin oci application {}", layers.len()); for layer in layers { @@ -162,7 +163,7 @@ impl SpinEngine { }, AppSource::Oci => { let working_dir = PathBuf::from("/"); - let loader = spin_oci::OciLoader::new(working_dir); + let loader = OciLoader::new(working_dir); // TODO: what is the best way to get this info? It isn't used only saved in the locked file let reference = "docker.io/library/wasmtest_spin:latest"; @@ -295,7 +296,7 @@ impl SpinEngine { let locked_url = self.write_locked_app(&app, &working_dir).await?; // Build trigger config - let mut loader = loader::TriggerLoader::new(working_dir.clone(), true); + let mut loader = TriggerLoader::new(working_dir.clone(), true); match app_source { AppSource::Oci => unsafe { // Configure the loader to support loading AOT compiled components.. From 5c32ecebf99e03b893a8b7031c809611685d94fb Mon Sep 17 00:00:00 2001 From: Radu Matei Date: Mon, 8 Apr 2024 14:58:42 +0200 Subject: [PATCH 2/3] Add new Spin loader and use in engine Signed-off-by: Radu Matei --- containerd-shim-spin/src/engine.rs | 617 +++++++++-------------------- containerd-shim-spin/src/loader.rs | 133 +++++++ containerd-shim-spin/src/main.rs | 5 +- 3 files changed, 333 insertions(+), 422 deletions(-) create mode 100644 containerd-shim-spin/src/loader.rs diff --git a/containerd-shim-spin/src/engine.rs b/containerd-shim-spin/src/engine.rs index 6d617ed4..9840997c 100644 --- a/containerd-shim-spin/src/engine.rs +++ b/containerd-shim-spin/src/engine.rs @@ -1,4 +1,11 @@ -use anyhow::{anyhow, ensure, Context, Result}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + net::{Ipv4Addr, SocketAddr, ToSocketAddrs}, + path::{Path, PathBuf}, +}; + +use anyhow::{anyhow, Context, Result}; use containerd_shim_wasm::{ container::{Engine, RuntimeContext, Source, Stdio}, sandbox::WasmLayer, @@ -6,28 +13,19 @@ use containerd_shim_wasm::{ use log::info; use oci_spec::image::MediaType; use spin_app::locked::LockedApp; -use spin_loader::{cache::Cache, FilesMountStrategy}; -use spin_manifest::schema::v2::AppManifest; -use spin_oci::OciLoader; +use spin_loader::FilesMountStrategy; use spin_trigger::{ loader::TriggerLoader, RuntimeConfig, TriggerExecutor, TriggerExecutorBuilder, TriggerHooks, }; use spin_trigger_http::HttpTrigger; use spin_trigger_redis::RedisTrigger; -use std::{ - collections::{hash_map::DefaultHasher, HashSet}, - env, - fs::File, - hash::{Hash, Hasher}, - io::Write, - net::{SocketAddr, ToSocketAddrs}, - path::{Path, PathBuf}, -}; use tokio::runtime::Runtime; -use trigger_command::CommandTrigger; -use trigger_sqs::SqsTrigger; +// use trigger_command::CommandTrigger; +// use trigger_sqs::SqsTrigger; use url::Url; +use crate::loader::ContainerdLoader; + const SPIN_ADDR: &str = "0.0.0.0:80"; /// RUNTIME_CONFIG_PATH specifies the expected location and name of the runtime /// config for a Spin application. The runtime config should be loaded into the @@ -43,210 +41,159 @@ const OCI_LAYER_MEDIA_TYPE_SPIN_CONFIG: &str = "application/vnd.fermyon.spin.app /// an OCI image const SPIN_MANIFEST_FILE_PATH: &str = "/spin.toml"; -#[derive(Clone)] -pub struct SpinEngine { - pub(crate) wasmtime_engine: wasmtime::Engine, -} +impl Engine for SpinEngine { + fn name() -> &'static str { + "spin" + } -impl Default for SpinEngine { - fn default() -> Self { - // the host expects epoch interruption to be enabled, so this has to be - // turned on for the components we compile. - let mut config = wasmtime::Config::default(); - config.epoch_interruption(true); - Self { - wasmtime_engine: wasmtime::Engine::new(&config).unwrap(), + fn run_wasi(&self, ctx: &impl RuntimeContext, stdio: Stdio) -> Result { + stdio.redirect()?; + info!("setting up wasi"); + let rt = Runtime::new().context("failed to create runtime")?; + + let (abortable, abort_handle) = futures::future::abortable(self.wasm_exec_async(ctx)); + ctrlc::set_handler(move || abort_handle.abort())?; + + match rt.block_on(abortable) { + Ok(Ok(())) => { + info!("run_wasi shut down: exiting"); + Ok(0) + } + Ok(Err(err)) => { + log::error!("run_wasi ERROR >>> failed: {:?}", err); + Err(err) + } + Err(aborted) => { + info!("Received signal to abort: {:?}", aborted); + Ok(0) + } } } -} -struct StdioTriggerHook; -impl TriggerHooks for StdioTriggerHook { - fn app_loaded( - &mut self, - _app: &spin_app::App, - _runtime_config: &RuntimeConfig, - _resolver: &std::sync::Arc, - ) -> Result<()> { + fn can_handle(&self, _ctx: &impl RuntimeContext) -> Result<()> { Ok(()) } - fn component_store_builder( - &self, - _component: &spin_app::AppComponent, - builder: &mut spin_core::StoreBuilder, - ) -> Result<()> { - builder.inherit_stdout(); - builder.inherit_stderr(); - Ok(()) + fn supported_layers_types() -> &'static [&'static str] { + &[ + OCI_LAYER_MEDIA_TYPE_WASM, + OCI_LAYER_MEDIA_TYPE_DATA, + OCI_LAYER_MEDIA_TYPE_SPIN_CONFIG, + ] + } + + fn precompile(&self, layers: &[WasmLayer]) -> Result>>> { + // Runwasi expects layers to be returned in the same order, so wrap each layer in an option, setting non Wasm layers to None + let precompiled_layers = layers + .iter() + .map(|layer| match SpinEngine::is_wasm_content(layer) { + Some(wasm_layer) => { + log::info!( + "Precompile called for wasm layer {:?}", + wasm_layer.config.digest() + ); + if self + .wasmtime_engine + .detect_precompiled(&wasm_layer.layer) + .is_some() + { + log::info!("Layer already precompiled {:?}", wasm_layer.config.digest()); + Ok(Some(wasm_layer.layer)) + } else { + let component = + spin_componentize::componentize_if_necessary(&wasm_layer.layer)?; + let precompiled = self.wasmtime_engine.precompile_component(&component)?; + Ok(Some(precompiled)) + } + } + None => Ok(None), + }) + .collect::>()?; + Ok(precompiled_layers) + } + + fn can_precompile(&self) -> Option { + let mut hasher = DefaultHasher::new(); + self.wasmtime_engine + .precompile_compatibility_hash() + .hash(&mut hasher); + Some(hasher.finish().to_string()) } } #[derive(Clone)] -enum AppSource { - File(PathBuf), - Oci, +pub struct SpinEngine { + pub wasmtime_engine: wasmtime::Engine, + pub working_dir: PathBuf, } -impl std::fmt::Debug for AppSource { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AppSource::File(path) => write!(f, "File({})", path.display()), - AppSource::Oci => write!(f, "Oci"), +impl Default for SpinEngine { + fn default() -> Self { + // the host expects epoch interruption to be enabled, so this has to be + // turned on for the components we compile. + let mut config = wasmtime::Config::default(); + let working_dir = PathBuf::from("/"); + config.epoch_interruption(true); + Self { + wasmtime_engine: wasmtime::Engine::new(&config) + .expect("cannot create new Wasmtime engine for SpinEngine"), + working_dir, } } } impl SpinEngine { - async fn app_source(&self, ctx: &impl RuntimeContext, cache: &Cache) -> Result { - match ctx.entrypoint().source { - Source::File(_) => Ok(AppSource::File(SPIN_MANIFEST_FILE_PATH.into())), - Source::Oci(layers) => { - info!(" >>> configuring spin oci application {}", layers.len()); - - for layer in layers { - log::debug!("<<< layer config: {:?}", layer.config); - } - - for artifact in layers { - match artifact.config.media_type() { - MediaType::Other(name) - if name == spin_oci::client::SPIN_APPLICATION_MEDIA_TYPE => - { - let path = PathBuf::from("/spin.json"); - log::info!("writing spin oci config to {:?}", path); - File::create(&path) - .context("failed to create spin.json")? - .write_all(&artifact.layer) - .context("failed to write spin.json")?; - } - MediaType::Other(name) if name == OCI_LAYER_MEDIA_TYPE_WASM => { - log::info!( - "<<< writing wasm artifact with length {:?} config to cache, near {:?}", - artifact.layer.len(), cache.manifests_dir() - ); - cache - .write_wasm(&artifact.layer, &artifact.config.digest()) - .await?; - } - MediaType::Other(name) if name == OCI_LAYER_MEDIA_TYPE_DATA => { - log::debug!( - "<<< writing data layer to cache, near {:?}", - cache.manifests_dir() - ); - cache - .write_data(&artifact.layer, &artifact.config.digest()) - .await?; - } - _ => { - log::debug!( - "<<< unknown media type {:?}", - artifact.config.media_type() - ); - } - } - } - Ok(AppSource::Oci) - } - } - } - - async fn resolve_app_source( - &self, - app_source: AppSource, - cache: &Cache, - ) -> Result { - let resolve_app_source = match app_source { - AppSource::File(source) => ResolvedAppSource::File { - manifest_path: source.clone(), - manifest: spin_manifest::manifest_from_file(source.clone())?, - }, - AppSource::Oci => { - let working_dir = PathBuf::from("/"); - let loader = OciLoader::new(working_dir); - - // TODO: what is the best way to get this info? It isn't used only saved in the locked file - let reference = "docker.io/library/wasmtest_spin:latest"; - - let locked_app = loader - .load_from_cache(PathBuf::from("/spin.json"), reference, cache) - .await?; - ResolvedAppSource::OciRegistry { locked_app } - } - }; - Ok(resolve_app_source) - } - async fn wasm_exec_async(&self, ctx: &impl RuntimeContext) -> Result<()> { - // create a cache directory at /.cache - // this is needed for the spin LocalLoader to work - // TODO: spin should provide a more flexible `loader::from_file` that - // does not assume the existence of a cache directory - let cache_dir = PathBuf::from("/.cache"); - let cache = Cache::new(Some(cache_dir.clone())) - .await - .context("failed to create cache")?; - env::set_var("XDG_CACHE_HOME", &cache_dir); - let app_source = self.app_source(ctx, &cache).await?; - let resolved_app_source = self.resolve_app_source(app_source.clone(), &cache).await?; - let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source) - .with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?; - let locked_app = self.load_resolved_app_source(resolved_app_source).await?; - self.run_trigger(ctx, &trigger_cmd, locked_app, app_source) - .await + let app = self.load(ctx.entrypoint().source).await?; + self.run(app, ctx.entrypoint().source).await } - async fn run_trigger( - &self, - ctx: &impl RuntimeContext, - trigger_type: &str, - app: LockedApp, - app_source: AppSource, - ) -> Result<()> { - let working_dir = PathBuf::from("/"); - let f = match trigger_type { + async fn run<'a>(&self, app: LockedApp, source: Source<'a>) -> Result<()> { + let trigger = Self::trigger_command(&app)?; + + let f = match trigger.as_str() { HttpTrigger::TRIGGER_TYPE => { let http_trigger: HttpTrigger = self - .build_spin_trigger(working_dir, app, app_source) + .build_trigger(app, source) .await .context("failed to build spin trigger")?; info!(" >>> running spin trigger"); http_trigger.run(spin_trigger_http::CliArgs { - address: parse_addr(SPIN_ADDR).unwrap(), + address: Self::parse_listen_addr(SPIN_ADDR)?, tls_cert: None, tls_key: None, }) } RedisTrigger::TRIGGER_TYPE => { let redis_trigger: RedisTrigger = self - .build_spin_trigger(working_dir, app, app_source) + .build_trigger(app, source) .await .context("failed to build spin trigger")?; info!(" >>> running spin trigger"); redis_trigger.run(spin_trigger::cli::NoArgs) } - SqsTrigger::TRIGGER_TYPE => { - let sqs_trigger: SqsTrigger = self - .build_spin_trigger(working_dir, app, app_source) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin trigger"); - sqs_trigger.run(spin_trigger::cli::NoArgs) - } - CommandTrigger::TRIGGER_TYPE => { - let command_trigger: CommandTrigger = self - .build_spin_trigger(working_dir, app, app_source) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin trigger"); - command_trigger.run(trigger_command::CliArgs { - guest_args: ctx.args().to_vec(), - }) - } + // SqsTrigger::TRIGGER_TYPE => { + // let sqs_trigger: SqsTrigger = self + // .build_spin_trigger(working_dir, app, app_source) + // .await + // .context("failed to build spin trigger")?; + // + // info!(" >>> running spin trigger"); + // sqs_trigger.run(spin_trigger::cli::NoArgs) + // } + // CommandTrigger::TRIGGER_TYPE => { + // let command_trigger: CommandTrigger = self + // .build_spin_trigger(working_dir, app, app_source) + // .await + // .context("failed to build spin trigger")?; + // + // info!(" >>> running spin trigger"); + // command_trigger.run(trigger_command::CliArgs { + // guest_args: ctx.args().to_vec(), + // }) + // } _ => { todo!("Only Http, Redis and SQS triggers are currently supported.") } @@ -255,59 +202,57 @@ impl SpinEngine { f.await } - async fn load_resolved_app_source( - &self, - resolved: ResolvedAppSource, - ) -> anyhow::Result { - match resolved { - ResolvedAppSource::File { manifest_path, .. } => { - // TODO: This should be configurable, see https://github.com/deislabs/containerd-wasm-shims/issues/166 - // TODO: ^^ Move aforementioned issue to this repo + async fn load<'a>(&self, source: Source<'a>) -> Result { + match source { + Source::File(_) => { let files_mount_strategy = FilesMountStrategy::Direct; - spin_loader::from_file(&manifest_path, files_mount_strategy, None).await + spin_loader::from_file( + &PathBuf::from(SPIN_MANIFEST_FILE_PATH), + files_mount_strategy, + None, + ) + .await + } + Source::Oci(layers) => { + let loader = ContainerdLoader::new(&self.working_dir); + loader.load_from_layers(layers).await } - ResolvedAppSource::OciRegistry { locked_app } => Ok(locked_app), } - } - async fn write_locked_app(&self, locked_app: &LockedApp, working_dir: &Path) -> Result { - let locked_path = working_dir.join("spin.lock"); - let locked_app_contents = - serde_json::to_vec_pretty(&locked_app).context("failed to serialize locked app")?; - tokio::fs::write(&locked_path, locked_app_contents) - .await - .with_context(|| format!("failed to write {:?}", locked_path))?; - let locked_url = Url::from_file_path(&locked_path) - .map_err(|_| anyhow!("cannot convert to file URL: {locked_path:?}"))? - .to_string(); + // Ok((app, trigger)) + } - Ok(locked_url) + fn trigger_command(app: &LockedApp) -> Result { + // TODO: gracefully handle multiple trigger types + Ok(app + .triggers + .first() + .context("expected app to have one trigger")? + .trigger_type + .clone()) } - async fn build_spin_trigger( + async fn build_trigger<'a, T: spin_trigger::TriggerExecutor>( &self, - working_dir: PathBuf, app: LockedApp, - app_source: AppSource, + source: Source<'a>, ) -> Result where for<'de> ::TriggerConfig: serde::de::Deserialize<'de>, { - let locked_url = self.write_locked_app(&app, &working_dir).await?; + let locked_url = self.write_locked(&app, &self.working_dir).await?; + let mut loader = TriggerLoader::new(&self.working_dir, true); - // Build trigger config - let mut loader = TriggerLoader::new(working_dir.clone(), true); - match app_source { - AppSource::Oci => unsafe { + match source { + Source::Oci(_) => unsafe { // Configure the loader to support loading AOT compiled components.. // Since all components were compiled by the shim (during `precompile`), // this operation can be considered safe. loader.enable_loading_aot_compiled_components(); }, - // Currently, it is only possible to precompile applications distributed using - // `spin registry push` - AppSource::File(_) => {} + Source::File(_) => {} }; + let mut runtime_config = RuntimeConfig::new(PathBuf::from("/").into()); // Load in runtime config if one exists at expected location if Path::new(RUNTIME_CONFIG_PATH).exists() { @@ -324,6 +269,20 @@ impl SpinEngine { Ok(executor) } + async fn write_locked(&self, locked_app: &LockedApp, working_dir: &Path) -> Result { + let locked_path = working_dir.join("spin.lock"); + let locked_app_contents = + serde_json::to_vec_pretty(&locked_app).context("failed to serialize locked app")?; + tokio::fs::write(&locked_path, locked_app_contents) + .await + .with_context(|| format!("failed to write {:?}", locked_path))?; + let locked_url = Url::from_file_path(&locked_path) + .map_err(|_| anyhow!("cannot convert to file URL: {locked_path:?}"))? + .to_string(); + + Ok(locked_url) + } + // Returns Some(WasmLayer) if the layer contains wasm, otherwise None fn is_wasm_content(layer: &WasmLayer) -> Option { if let MediaType::Other(name) = layer.config.media_type() { @@ -333,221 +292,39 @@ impl SpinEngine { } None } -} - -impl Engine for SpinEngine { - fn name() -> &'static str { - "spin" - } - - fn run_wasi(&self, ctx: &impl RuntimeContext, stdio: Stdio) -> Result { - stdio.redirect()?; - info!("setting up wasi"); - let rt = Runtime::new().context("failed to create runtime")?; - - let (abortable, abort_handle) = futures::future::abortable(self.wasm_exec_async(ctx)); - ctrlc::set_handler(move || abort_handle.abort())?; - - match rt.block_on(abortable) { - Ok(Ok(())) => { - info!("run_wasi shut down: exiting"); - Ok(0) - } - Ok(Err(err)) => { - log::error!("run_wasi ERROR >>> failed: {:?}", err); - Err(err) - } - Err(aborted) => { - info!("Received signal to abort: {:?}", aborted); - Ok(0) - } - } - } - - fn can_handle(&self, _ctx: &impl RuntimeContext) -> Result<()> { - Ok(()) - } - - fn supported_layers_types() -> &'static [&'static str] { - &[ - OCI_LAYER_MEDIA_TYPE_WASM, - OCI_LAYER_MEDIA_TYPE_DATA, - OCI_LAYER_MEDIA_TYPE_SPIN_CONFIG, - ] - } - fn precompile(&self, layers: &[WasmLayer]) -> Result>>> { - // Runwasi expects layers to be returned in the same order, so wrap each layer in an option, setting non Wasm layers to None - let precompiled_layers = layers + pub fn parse_listen_addr(addr: &str) -> anyhow::Result { + let addrs: Vec = addr.to_socket_addrs()?.collect(); + // Prefer 127.0.0.1 over e.g. [::1] because CHANGE IS HARD + if let Some(addr) = addrs .iter() - .map(|layer| match SpinEngine::is_wasm_content(layer) { - Some(wasm_layer) => { - log::info!( - "Precompile called for wasm layer {:?}", - wasm_layer.config.digest() - ); - if self - .wasmtime_engine - .detect_precompiled(&wasm_layer.layer) - .is_some() - { - log::info!("Layer already precompiled {:?}", wasm_layer.config.digest()); - Ok(Some(wasm_layer.layer)) - } else { - let component = - spin_componentize::componentize_if_necessary(&wasm_layer.layer)?; - let precompiled = self.wasmtime_engine.precompile_component(&component)?; - Ok(Some(precompiled)) - } - } - None => Ok(None), - }) - .collect::>()?; - Ok(precompiled_layers) - } - - fn can_precompile(&self) -> Option { - let mut hasher = DefaultHasher::new(); - self.wasmtime_engine - .precompile_compatibility_hash() - .hash(&mut hasher); - Some(hasher.finish().to_string()) - } -} - -fn parse_addr(addr: &str) -> Result { - let addrs: SocketAddr = addr - .to_socket_addrs()? - .next() - .ok_or_else(|| anyhow!("could not parse address: {}", addr))?; - Ok(addrs) -} - -// TODO: we should use spin's ResolvedAppSource -pub enum ResolvedAppSource { - File { - manifest_path: PathBuf, - manifest: AppManifest, - }, - OciRegistry { - locked_app: LockedApp, - }, -} - -impl ResolvedAppSource { - pub fn trigger_type(&self) -> anyhow::Result<&str> { - let types = match self { - ResolvedAppSource::File { manifest, .. } => { - manifest.triggers.keys().collect::>() - } - ResolvedAppSource::OciRegistry { locked_app } => locked_app - .triggers - .iter() - .map(|t| &t.trigger_type) - .collect::>(), - }; - - ensure!(!types.is_empty(), "no triggers in app"); - ensure!(types.len() == 1, "multiple trigger types not yet supported"); - Ok(types.into_iter().next().unwrap()) - } -} - -fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result { - let trigger_type = resolved.trigger_type()?; - - match trigger_type { - RedisTrigger::TRIGGER_TYPE - | HttpTrigger::TRIGGER_TYPE - | SqsTrigger::TRIGGER_TYPE - | CommandTrigger::TRIGGER_TYPE => Ok(trigger_type.to_owned()), - _ => { - todo!("Only Http, Redis, SQS, and command triggers are currently supported.") + .find(|addr| addr.is_ipv4() && addr.ip() == Ipv4Addr::LOCALHOST) + { + return Ok(*addr); } + // Otherwise, take the first addr (OS preference) + addrs.into_iter().next().context("couldn't resolve address") } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn can_parse_spin_address() { - let parsed = parse_addr(SPIN_ADDR).unwrap(); - assert_eq!(parsed.clone().port(), 80); - assert_eq!(parsed.ip().to_string(), "0.0.0.0"); - } - - #[test] - fn is_wasm_content() { - let wasm_content = WasmLayer { - layer: vec![], - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), - 1024, - "sha256:1234", - ), - }; - // Should be ignored - let data_content = WasmLayer { - layer: vec![], - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_DATA.to_string()), - 1024, - "sha256:1234", - ), - }; - assert!(SpinEngine::is_wasm_content(&wasm_content).is_some()); - assert!(SpinEngine::is_wasm_content(&data_content).is_none()); +struct StdioTriggerHook; +impl TriggerHooks for StdioTriggerHook { + fn app_loaded( + &mut self, + _app: &spin_app::App, + _runtime_config: &RuntimeConfig, + _resolver: &std::sync::Arc, + ) -> Result<()> { + Ok(()) } - #[test] - fn precompile() { - let module = wat::parse_str("(module)").unwrap(); - let wasmtime_engine = wasmtime::Engine::default(); - let component = wasmtime::component::Component::new(&wasmtime_engine, "(component)") - .unwrap() - .serialize() - .unwrap(); - let wasm_layers: Vec = vec![ - // Needs to be precompiled - WasmLayer { - layer: module.clone(), - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), - 1024, - "sha256:1234", - ), - }, - // Precompiled - WasmLayer { - layer: component.to_owned(), - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), - 1024, - "sha256:1234", - ), - }, - // Content that should be skipped - WasmLayer { - layer: vec![], - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_DATA.to_string()), - 1024, - "sha256:1234", - ), - }, - ]; - let spin_engine = SpinEngine::default(); - let precompiled = spin_engine - .precompile(&wasm_layers) - .expect("precompile failed"); - assert_eq!(precompiled.len(), 3); - assert_ne!(precompiled[0].as_deref().expect("no first entry"), module); - assert_eq!( - precompiled[1].as_deref().expect("no second entry"), - component - ); - assert!(precompiled[2].is_none()); + fn component_store_builder( + &self, + _component: &spin_app::AppComponent, + builder: &mut spin_core::StoreBuilder, + ) -> Result<()> { + builder.inherit_stdout(); + builder.inherit_stderr(); + Ok(()) } } diff --git a/containerd-shim-spin/src/loader.rs b/containerd-shim-spin/src/loader.rs new file mode 100644 index 00000000..0fbe95da --- /dev/null +++ b/containerd-shim-spin/src/loader.rs @@ -0,0 +1,133 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, ensure, Context, Result}; +use containerd_shim_wasm::sandbox::WasmLayer; +use spin_app::locked::{ContentPath, ContentRef, LockedApp, LockedComponent}; +use url::Url; + +pub struct ContainerdLoader { + pub working_dir: PathBuf, +} + +impl ContainerdLoader { + pub fn new(working_dir: impl Into) -> Self { + let working_dir = working_dir.into(); + Self { working_dir } + } + + /// Load a Spin application given a list of layers that represent + /// a Spin application distributed as an OCI artifact. + pub async fn load_from_layers(&self, layers: &[WasmLayer]) -> Result { + // get the locked application file from the layer list + let locked_layer = layers + .iter() + .find(|l| { + l.config.media_type().to_string() == spin_oci::client::SPIN_APPLICATION_MEDIA_TYPE + }) + .context("cannot find locked application in layers for application")?; + let mut locked_app = LockedApp::from_json(&locked_layer.layer)?; + + for component in &mut locked_app.components { + self.resolve_component_content_refs(component, layers) + .await + .with_context(|| { + format!("failed to resolve content for component {:?}", component.id) + })?; + } + + Ok(locked_app) + } + + async fn resolve_component_content_refs( + &self, + component: &mut LockedComponent, + layers: &[WasmLayer], + ) -> Result<()> { + let wasm_digest = &component + .source + .content + .digest + .as_deref() + .context("component must have source digest")?; + let wasm = layers + .iter() + .find(|l| l.config.digest() == wasm_digest) + .context("cannot find wasm source for component")?; + let wasm_path = self.working_dir.join(wasm_digest); + tokio::fs::write(&wasm_path, &wasm.layer).await?; + component.source.content = Self::content_ref(wasm_path)?; + + if !component.files.is_empty() { + let mount_dir = self.working_dir.join("assets").join(&component.id); + for file in &mut component.files { + ensure!( + Self::is_safe_to_join(&file.path), + "invalid file mount {file:?}" + ); + let mount_path = mount_dir.join(&file.path); + + // Create parent directory + let mount_parent = mount_path + .parent() + .with_context(|| format!("invalid mount path {mount_path:?}"))?; + tokio::fs::create_dir_all(mount_parent) + .await + .with_context(|| { + format!("failed to create temporary mount path {mount_path:?}") + })?; + + if let Some(content_bytes) = file.content.inline.as_deref() { + // Write inline content to disk + tokio::fs::write(&mount_path, content_bytes) + .await + .with_context(|| { + format!("failed to write inline content to {mount_path:?}") + })?; + } else { + // Copy content + let digest = Self::content_digest(&file.content)?; + let content_bytes = layers + .iter() + .find(|l| l.config.digest() == digest) + .context("cannot find static asset")?; + // Write inline content to disk + tokio::fs::write(&mount_path, &content_bytes.layer) + .await + .with_context(|| { + format!("failed to write inline content to {mount_path:?}") + })?; + } + } + + component.files = vec![ContentPath { + content: Self::content_ref(mount_dir)?, + path: "/".into(), + }] + } + + Ok(()) + } + + pub fn content_digest(content_ref: &ContentRef) -> Result<&str> { + content_ref + .digest + .as_deref() + .with_context(|| format!("content missing expected digest: {content_ref:?}")) + } + + fn content_ref(path: impl AsRef) -> Result { + let path = std::fs::canonicalize(path)?; + let url = Url::from_file_path(path).map_err(|_| anyhow!("couldn't build file URL"))?; + Ok(ContentRef { + source: Some(url.to_string()), + ..Default::default() + }) + } + + fn is_safe_to_join(path: impl AsRef) -> bool { + // This could be loosened, but currently should always be true + path.as_ref() + .components() + .all(|c| matches!(c, std::path::Component::Normal(_))) + } +} diff --git a/containerd-shim-spin/src/main.rs b/containerd-shim-spin/src/main.rs index 117d18a6..947637b9 100644 --- a/containerd-shim-spin/src/main.rs +++ b/containerd-shim-spin/src/main.rs @@ -2,12 +2,13 @@ use containerd_shim::Config; use containerd_shim_wasm::container::Instance; use containerd_shim_wasm::sandbox::cli::{revision, shim_main, version}; -mod engine; +pub mod engine; +pub mod loader; fn main() { // Configure the shim to have only error level logging for performance improvements. let shim_config = Config { - default_log_level: "error".to_string(), + // default_log_level: "error".to_string(), ..Default::default() }; shim_main::>( From 65efae377c340bf839432695c056186337f2b9f7 Mon Sep 17 00:00:00 2001 From: Radu Matei Date: Tue, 30 Apr 2024 13:00:51 +0200 Subject: [PATCH 3/3] Re-add other triggers Signed-off-by: Radu Matei --- containerd-shim-spin/src/engine.rs | 135 ++++++++++++++++++++++++----- containerd-shim-spin/src/main.rs | 7 +- 2 files changed, 115 insertions(+), 27 deletions(-) diff --git a/containerd-shim-spin/src/engine.rs b/containerd-shim-spin/src/engine.rs index 9840997c..b6da811e 100644 --- a/containerd-shim-spin/src/engine.rs +++ b/containerd-shim-spin/src/engine.rs @@ -20,8 +20,8 @@ use spin_trigger::{ use spin_trigger_http::HttpTrigger; use spin_trigger_redis::RedisTrigger; use tokio::runtime::Runtime; -// use trigger_command::CommandTrigger; -// use trigger_sqs::SqsTrigger; +use trigger_command::CommandTrigger; +use trigger_sqs::SqsTrigger; use url::Url; use crate::loader::ContainerdLoader; @@ -145,11 +145,12 @@ impl Default for SpinEngine { impl SpinEngine { async fn wasm_exec_async(&self, ctx: &impl RuntimeContext) -> Result<()> { let app = self.load(ctx.entrypoint().source).await?; - self.run(app, ctx.entrypoint().source).await + self.run(app, ctx).await } - async fn run<'a>(&self, app: LockedApp, source: Source<'a>) -> Result<()> { + async fn run<'a>(&self, app: LockedApp, ctx: &impl RuntimeContext) -> Result<()> { let trigger = Self::trigger_command(&app)?; + let source = ctx.entrypoint().source; let f = match trigger.as_str() { HttpTrigger::TRIGGER_TYPE => { @@ -174,26 +175,26 @@ impl SpinEngine { info!(" >>> running spin trigger"); redis_trigger.run(spin_trigger::cli::NoArgs) } - // SqsTrigger::TRIGGER_TYPE => { - // let sqs_trigger: SqsTrigger = self - // .build_spin_trigger(working_dir, app, app_source) - // .await - // .context("failed to build spin trigger")?; - // - // info!(" >>> running spin trigger"); - // sqs_trigger.run(spin_trigger::cli::NoArgs) - // } - // CommandTrigger::TRIGGER_TYPE => { - // let command_trigger: CommandTrigger = self - // .build_spin_trigger(working_dir, app, app_source) - // .await - // .context("failed to build spin trigger")?; - // - // info!(" >>> running spin trigger"); - // command_trigger.run(trigger_command::CliArgs { - // guest_args: ctx.args().to_vec(), - // }) - // } + SqsTrigger::TRIGGER_TYPE => { + let sqs_trigger: SqsTrigger = self + .build_trigger(app, source) + .await + .context("failed to build spin trigger")?; + + info!(" >>> running spin trigger"); + sqs_trigger.run(spin_trigger::cli::NoArgs) + } + CommandTrigger::TRIGGER_TYPE => { + let command_trigger: CommandTrigger = self + .build_trigger(app, source) + .await + .context("failed to build spin trigger")?; + + info!(" >>> running spin trigger"); + command_trigger.run(trigger_command::CliArgs { + guest_args: ctx.args().to_vec(), + }) + } _ => { todo!("Only Http, Redis and SQS triggers are currently supported.") } @@ -328,3 +329,89 @@ impl TriggerHooks for StdioTriggerHook { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_parse_spin_address() { + let parsed = SpinEngine::parse_listen_addr(SPIN_ADDR).unwrap(); + + assert_eq!(parsed.clone().port(), 80); + assert_eq!(parsed.ip().to_string(), "0.0.0.0"); + } + + #[test] + fn is_wasm_content() { + let wasm_content = WasmLayer { + layer: vec![], + config: oci_spec::image::Descriptor::new( + MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + 1024, + "sha256:1234", + ), + }; + // Should be ignored + let data_content = WasmLayer { + layer: vec![], + config: oci_spec::image::Descriptor::new( + MediaType::Other(OCI_LAYER_MEDIA_TYPE_DATA.to_string()), + 1024, + "sha256:1234", + ), + }; + assert!(SpinEngine::is_wasm_content(&wasm_content).is_some()); + assert!(SpinEngine::is_wasm_content(&data_content).is_none()); + } + + #[test] + fn precompile() { + let module = wat::parse_str("(module)").unwrap(); + let wasmtime_engine = wasmtime::Engine::default(); + let component = wasmtime::component::Component::new(&wasmtime_engine, "(component)") + .unwrap() + .serialize() + .unwrap(); + let wasm_layers: Vec = vec![ + // Needs to be precompiled + WasmLayer { + layer: module.clone(), + config: oci_spec::image::Descriptor::new( + MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + 1024, + "sha256:1234", + ), + }, + // Precompiled + WasmLayer { + layer: component.to_owned(), + config: oci_spec::image::Descriptor::new( + MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + 1024, + "sha256:1234", + ), + }, + // Content that should be skipped + WasmLayer { + layer: vec![], + config: oci_spec::image::Descriptor::new( + MediaType::Other(OCI_LAYER_MEDIA_TYPE_DATA.to_string()), + 1024, + "sha256:1234", + ), + }, + ]; + let spin_engine = SpinEngine::default(); + let precompiled = spin_engine + .precompile(&wasm_layers) + .expect("precompile failed"); + assert_eq!(precompiled.len(), 3); + assert_ne!(precompiled[0].as_deref().expect("no first entry"), module); + assert_eq!( + precompiled[1].as_deref().expect("no second entry"), + component + ); + assert!(precompiled[2].is_none()); + } +} diff --git a/containerd-shim-spin/src/main.rs b/containerd-shim-spin/src/main.rs index 947637b9..68561f2a 100644 --- a/containerd-shim-spin/src/main.rs +++ b/containerd-shim-spin/src/main.rs @@ -1,6 +1,8 @@ use containerd_shim::Config; -use containerd_shim_wasm::container::Instance; -use containerd_shim_wasm::sandbox::cli::{revision, shim_main, version}; +use containerd_shim_wasm::{ + container::Instance, + sandbox::cli::{revision, shim_main, version}, +}; pub mod engine; pub mod loader; @@ -8,7 +10,6 @@ pub mod loader; fn main() { // Configure the shim to have only error level logging for performance improvements. let shim_config = Config { - // default_log_level: "error".to_string(), ..Default::default() }; shim_main::>(