diff --git a/Cargo.lock b/Cargo.lock index 15c0c286..91cf9a21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1452,6 +1452,7 @@ dependencies = [ "log", "prost", "redb", + "scopeguard", "signal-hook", "tokio", "tokio-stream", diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index d202d3fb..f91823b7 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -1,13 +1,13 @@ use std::collections::HashMap; use anyhow::Result; -use clap::Parser; +use clap::{Parser, ValueEnum}; use krata::{ events::EventStream, v1::{ common::{ - guest_image_spec::Image, GuestImageSpec, GuestOciImageFormat, GuestOciImageSpec, - GuestSpec, GuestStatus, GuestTaskSpec, GuestTaskSpecEnvVar, + guest_image_spec::Image, GuestImageSpec, GuestOciImageSpec, GuestSpec, GuestStatus, + GuestTaskSpec, GuestTaskSpecEnvVar, OciImageFormat, }, control::{ control_service_client::ControlServiceClient, watch_events_reply::Event, @@ -21,13 +21,17 @@ use tonic::{transport::Channel, Request}; use crate::{console::StdioConsoleStream, pull::pull_interactive_progress}; -use super::pull::PullImageFormat; +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +pub enum LaunchImageFormat { + Squashfs, + Erofs, +} #[derive(Parser)] #[command(about = "Launch a new guest")] pub struct LauchCommand { #[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] - image_format: PullImageFormat, + image_format: LaunchImageFormat, #[arg(short, long, help = "Name of the guest")] name: Option, #[arg( @@ -78,8 +82,8 @@ impl LauchCommand { .pull_image(PullImageRequest { image: self.oci.clone(), format: match self.image_format { - PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), - PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + LaunchImageFormat::Squashfs => OciImageFormat::Squashfs.into(), + LaunchImageFormat::Erofs => OciImageFormat::Erofs.into(), }, }) .await?; diff --git a/crates/ctl/src/cli/pull.rs b/crates/ctl/src/cli/pull.rs index 50ab84c7..9398c1da 100644 --- a/crates/ctl/src/cli/pull.rs +++ b/crates/ctl/src/cli/pull.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::{Parser, ValueEnum}; use krata::v1::{ - common::GuestOciImageFormat, + common::OciImageFormat, control::{control_service_client::ControlServiceClient, PullImageRequest}, }; @@ -13,6 +13,7 @@ use crate::pull::pull_interactive_progress; pub enum PullImageFormat { Squashfs, Erofs, + Tar, } #[derive(Parser)] @@ -30,8 +31,9 @@ impl PullCommand { .pull_image(PullImageRequest { image: self.image.clone(), format: match self.image_format { - PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), - PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + PullImageFormat::Squashfs => OciImageFormat::Squashfs.into(), + PullImageFormat::Erofs => OciImageFormat::Erofs.into(), + PullImageFormat::Tar => OciImageFormat::Tar.into(), }, }) .await?; diff --git a/crates/daemon/Cargo.toml b/crates/daemon/Cargo.toml index 12d72955..2f1c0530 100644 --- a/crates/daemon/Cargo.toml +++ b/crates/daemon/Cargo.toml @@ -23,6 +23,7 @@ krata-runtime = { path = "../runtime", version = "^0.0.9" } log = { workspace = true } prost = { workspace = true } redb = { workspace = true } +scopeguard = { workspace = true } signal-hook = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 2a81c1aa..44ae3b94 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -6,7 +6,7 @@ use krata::{ IdmMetricsRequest, }, v1::{ - common::{Guest, GuestOciImageFormat, GuestState, GuestStatus}, + common::{Guest, GuestState, GuestStatus, OciImageFormat}, control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, @@ -18,7 +18,7 @@ use krata::{ }; use krataoci::{ name::ImageName, - packer::{service::OciPackerService, OciImagePacked, OciPackedFormat}, + packer::{service::OciPackerService, OciPackedFormat, OciPackedImage}, progress::{OciProgress, OciProgressContext}, }; use std::{pin::Pin, str::FromStr}; @@ -90,8 +90,8 @@ enum ConsoleDataSelect { } enum PullImageSelect { - Progress(usize), - Completed(Result, JoinError>), + Progress(Option), + Completed(Result, JoinError>), } #[tonic::async_trait] @@ -362,36 +362,51 @@ impl ControlService for DaemonControlService { message: err.to_string(), })?; let format = match request.format() { - GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, - GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, - GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + OciImageFormat::Unknown => OciPackedFormat::Squashfs, + OciImageFormat::Squashfs => OciPackedFormat::Squashfs, + OciImageFormat::Erofs => OciPackedFormat::Erofs, + OciImageFormat::Tar => OciPackedFormat::Tar, }; - let (sender, mut receiver) = channel::(100); - let context = OciProgressContext::new(sender); - + let (context, mut receiver) = OciProgressContext::create(); let our_packer = self.packer.clone(); let output = try_stream! { let mut task = tokio::task::spawn(async move { our_packer.request(name, format, context).await }); + let abort_handle = task.abort_handle(); + let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { + handle.abort(); + }); + loop { - let mut progresses = Vec::new(); let what = select! { - x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x), + x = receiver.recv() => PullImageSelect::Progress(x.ok()), x = &mut task => PullImageSelect::Completed(x), }; match what { - PullImageSelect::Progress(count) => { - if count > 0 { - let progress = progresses.remove(progresses.len() - 1); - let reply = PullImageReply { - progress: Some(convert_oci_progress(progress)), - digest: String::new(), - format: GuestOciImageFormat::Unknown.into(), - }; - yield reply; + PullImageSelect::Progress(Some(mut progress)) => { + let mut drain = 0; + loop { + if drain >= 10 { + break; + } + + if let Ok(latest) = receiver.try_recv() { + progress = latest; + } else { + break; + } + + drain += 1; } + + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: OciImageFormat::Unknown.into(), + }; + yield reply; }, PullImageSelect::Completed(result) => { @@ -405,13 +420,18 @@ impl ControlService for DaemonControlService { progress: None, digest: packed.digest, format: match packed.format { - OciPackedFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), - OciPackedFormat::Erofs => GuestOciImageFormat::Erofs.into(), + OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(), + OciPackedFormat::Erofs => OciImageFormat::Erofs.into(), + _ => OciImageFormat::Unknown.into(), }, }; yield reply; break; }, + + _ => { + continue; + } } } }; diff --git a/crates/daemon/src/reconcile/guest.rs b/crates/daemon/src/reconcile/guest.rs index ef8478e4..5e1876ce 100644 --- a/crates/daemon/src/reconcile/guest.rs +++ b/crates/daemon/src/reconcile/guest.rs @@ -9,7 +9,7 @@ use krata::launchcfg::LaunchPackedFormat; use krata::v1::{ common::{ guest_image_spec::Image, Guest, GuestErrorInfo, GuestExitInfo, GuestNetworkState, - GuestOciImageFormat, GuestState, GuestStatus, + GuestState, GuestStatus, OciImageFormat, }, control::GuestChangedEvent, }; @@ -244,9 +244,12 @@ impl GuestReconciler { .recall( &oci.digest, match oci.format() { - GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, - GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, - GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + OciImageFormat::Unknown => OciPackedFormat::Squashfs, + OciImageFormat::Squashfs => OciPackedFormat::Squashfs, + OciImageFormat::Erofs => OciPackedFormat::Erofs, + OciImageFormat::Tar => { + return Err(anyhow!("tar image format is not supported for guests")); + } }, ) .await?; diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 78bc7ede..1c208755 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -29,15 +29,17 @@ message GuestImageSpec { } } -enum GuestOciImageFormat { - GUEST_OCI_IMAGE_FORMAT_UNKNOWN = 0; - GUEST_OCI_IMAGE_FORMAT_SQUASHFS = 1; - GUEST_OCI_IMAGE_FORMAT_EROFS = 2; +enum OciImageFormat { + OCI_IMAGE_FORMAT_UNKNOWN = 0; + OCI_IMAGE_FORMAT_SQUASHFS = 1; + OCI_IMAGE_FORMAT_EROFS = 2; + // Tar format is not launchable, and is intended for kernel images. + OCI_IMAGE_FORMAT_TAR = 3; } message GuestOciImageSpec { string digest = 1; - GuestOciImageFormat format = 2; + OciImageFormat format = 2; } message GuestTaskSpec { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 61cf91bb..692eba87 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -124,11 +124,11 @@ message PullImageProgress { message PullImageRequest { string image = 1; - krata.v1.common.GuestOciImageFormat format = 2; + krata.v1.common.OciImageFormat format = 2; } message PullImageReply { PullImageProgress progress = 1; string digest = 2; - krata.v1.common.GuestOciImageFormat format = 3; + krata.v1.common.OciImageFormat format = 3; } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 8bd29322..777c951a 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -5,10 +5,10 @@ use env_logger::Env; use krataoci::{ name::ImageName, packer::{service::OciPackerService, OciPackedFormat}, - progress::{OciProgress, OciProgressContext}, + progress::OciProgressContext, registry::OciPlatform, }; -use tokio::{fs, sync::mpsc::channel}; +use tokio::fs; #[tokio::main] async fn main() -> Result<()> { @@ -22,14 +22,28 @@ async fn main() -> Result<()> { fs::create_dir(&cache_dir).await?; } - let (sender, mut receiver) = channel::(100); + let (context, mut receiver) = OciProgressContext::create(); tokio::task::spawn(async move { loop { - let mut progresses = Vec::new(); - let _ = receiver.recv_many(&mut progresses, 100).await; - let Some(progress) = progresses.last() else { - continue; + let Ok(mut progress) = receiver.recv().await else { + return; }; + + let mut drain = 0; + loop { + if drain >= 10 { + break; + } + + if let Ok(latest) = receiver.try_recv() { + progress = latest; + } else { + break; + } + + drain += 1; + } + println!("phase {:?}", progress.phase); for (id, layer) in &progress.layers { println!( @@ -39,7 +53,6 @@ async fn main() -> Result<()> { } } }); - let context = OciProgressContext::new(sender); let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let packed = service .request(image.clone(), OciPackedFormat::Squashfs, context) diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 5f13ae31..2931f3cc 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -1,11 +1,14 @@ use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; use crate::progress::OciBoundProgress; +use crate::schema::OciSchema; use crate::vfs::{VfsNode, VfsTree}; use anyhow::{anyhow, Result}; use log::{debug, trace, warn}; use oci_spec::image::{ImageConfiguration, ImageManifest}; + use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs; use tokio::io::AsyncRead; @@ -15,8 +18,8 @@ use uuid::Uuid; pub struct OciImageAssembled { pub digest: String, - pub manifest: ImageManifest, - pub config: ImageConfiguration, + pub manifest: OciSchema, + pub config: OciSchema, pub vfs: Arc, pub tmp_dir: Option, } @@ -33,11 +36,12 @@ impl Drop for OciImageAssembled { pub struct OciImageAssembler { downloader: OciImageFetcher, - resolved: OciResolvedImage, + resolved: Option, progress: OciBoundProgress, work_dir: PathBuf, disk_dir: PathBuf, tmp_dir: Option, + success: AtomicBool, } impl OciImageAssembler { @@ -81,11 +85,12 @@ impl OciImageAssembler { Ok(OciImageAssembler { downloader, - resolved, + resolved: Some(resolved), progress, work_dir, disk_dir: target_dir, tmp_dir, + success: AtomicBool::new(false), }) } @@ -97,11 +102,11 @@ impl OciImageAssembler { self.assemble_with(&layer_dir).await } - async fn assemble_with(self, layer_dir: &Path) -> Result { - let local = self - .downloader - .download(self.resolved.clone(), layer_dir) - .await?; + async fn assemble_with(mut self, layer_dir: &Path) -> Result { + let Some(ref resolved) = self.resolved else { + return Err(anyhow!("resolved image was not available when expected")); + }; + let local = self.downloader.download(resolved, layer_dir).await?; let mut vfs = VfsTree::new(); for layer in &local.layers { debug!( @@ -145,13 +150,20 @@ impl OciImageAssembler { fs::remove_file(&layer.path).await?; } } - Ok(OciImageAssembled { + + let Some(resolved) = self.resolved.take() else { + return Err(anyhow!("resolved image was not available when expected")); + }; + + let assembled = OciImageAssembled { vfs: Arc::new(vfs), - digest: self.resolved.digest, - manifest: self.resolved.manifest, + digest: resolved.digest, + manifest: resolved.manifest, config: local.config, - tmp_dir: self.tmp_dir, - }) + tmp_dir: self.tmp_dir.clone(), + }; + self.success.store(true, Ordering::Release); + Ok(assembled) } async fn process_whiteout_entry( @@ -222,6 +234,18 @@ impl OciImageAssembler { } } +impl Drop for OciImageAssembler { + fn drop(&mut self) { + if !self.success.load(Ordering::Acquire) { + if let Some(tmp_dir) = self.tmp_dir.clone() { + tokio::task::spawn(async move { + let _ = fs::remove_dir_all(tmp_dir).await; + }); + } + } + } +} + async fn delete_disk_paths(node: &VfsNode) -> Result<()> { let mut queue = vec![node]; while !queue.is_empty() { diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index 9260afe5..05817d73 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -1,4 +1,7 @@ -use crate::progress::{OciBoundProgress, OciProgressPhase}; +use crate::{ + progress::{OciBoundProgress, OciProgressPhase}, + schema::OciSchema, +}; use super::{ name::ImageName, @@ -6,6 +9,7 @@ use super::{ }; use std::{ + fmt::Debug, path::{Path, PathBuf}, pin::Pin, }; @@ -66,13 +70,13 @@ impl OciImageLayer { pub struct OciResolvedImage { pub name: ImageName, pub digest: String, - pub manifest: ImageManifest, + pub manifest: OciSchema, } #[derive(Clone, Debug)] pub struct OciLocalImage { pub image: OciResolvedImage, - pub config: ImageConfiguration, + pub config: OciSchema, pub layers: Vec, } @@ -89,10 +93,10 @@ impl OciImageFetcher { } } - async fn load_seed_json_blob( + async fn load_seed_json_blob( &self, descriptor: &Descriptor, - ) -> Result> { + ) -> Result>> { let digest = descriptor.digest(); let Some((digest_type, digest_content)) = digest.split_once(':') else { return Err(anyhow!("digest content was not properly formatted")); @@ -101,7 +105,10 @@ impl OciImageFetcher { self.load_seed_json(&want).await } - async fn load_seed_json(&self, want: &str) -> Result> { + async fn load_seed_json( + &self, + want: &str, + ) -> Result>> { let Some(ref seed) = self.seed else { return Ok(None); }; @@ -113,10 +120,10 @@ impl OciImageFetcher { let mut entry = entry?; let path = String::from_utf8(entry.path_bytes().to_vec())?; if path == want { - let mut content = String::new(); - entry.read_to_string(&mut content).await?; - let data = serde_json::from_str::(&content)?; - return Ok(Some(data)); + let mut content = Vec::new(); + entry.read_to_end(&mut content).await?; + let item = serde_json::from_slice::(&content)?; + return Ok(Some(OciSchema::new(content, item))); } } Ok(None) @@ -154,7 +161,7 @@ impl OciImageFetcher { if let Some(index) = self.load_seed_json::("index.json").await? { let mut found: Option<&Descriptor> = None; - for manifest in index.manifests() { + for manifest in index.item().manifests() { let Some(annotations) = manifest.annotations() else { continue; }; @@ -212,10 +219,10 @@ impl OciImageFetcher { pub async fn download( &self, - image: OciResolvedImage, + image: &OciResolvedImage, layer_dir: &Path, ) -> Result { - let config: ImageConfiguration; + let config: OciSchema; self.progress .update(|progress| { progress.phase = OciProgressPhase::ConfigAcquire; @@ -223,27 +230,30 @@ impl OciImageFetcher { .await; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; if let Some(seeded) = self - .load_seed_json_blob::(image.manifest.config()) + .load_seed_json_blob::(image.manifest.item().config()) .await? { config = seeded; } else { let config_bytes = client - .get_blob(&image.name.name, image.manifest.config()) + .get_blob(&image.name.name, image.manifest.item().config()) .await?; - config = serde_json::from_slice(&config_bytes)?; + config = OciSchema::new( + config_bytes.to_vec(), + serde_json::from_slice(&config_bytes)?, + ); } self.progress .update(|progress| { progress.phase = OciProgressPhase::LayerAcquire; - for layer in image.manifest.layers() { + for layer in image.manifest.item().layers() { progress.add_layer(layer.digest(), layer.size() as usize); } }) .await; let mut layers = Vec::new(); - for layer in image.manifest.layers() { + for layer in image.manifest.item().layers() { self.progress .update(|progress| { progress.downloading_layer(layer.digest(), 0, layer.size() as usize); @@ -260,7 +270,7 @@ impl OciImageFetcher { .await; } Ok(OciLocalImage { - image, + image: image.clone(), config, layers, }) diff --git a/crates/oci/src/lib.rs b/crates/oci/src/lib.rs index 4372fd36..de3f17d9 100644 --- a/crates/oci/src/lib.rs +++ b/crates/oci/src/lib.rs @@ -4,4 +4,5 @@ pub mod name; pub mod packer; pub mod progress; pub mod registry; +pub mod schema; pub mod vfs; diff --git a/crates/oci/src/packer/backend.rs b/crates/oci/src/packer/backend.rs index 45b03020..2218bc8c 100644 --- a/crates/oci/src/packer/backend.rs +++ b/crates/oci/src/packer/backend.rs @@ -7,12 +7,18 @@ use crate::{ }; use anyhow::{anyhow, Result}; use log::warn; -use tokio::{pin, process::Command, select}; +use tokio::{ + fs::File, + pin, + process::{Child, Command}, + select, +}; #[derive(Debug, Clone, Copy)] pub enum OciPackerBackendType { MkSquashfs, MkfsErofs, + Tar, } impl OciPackerBackendType { @@ -20,6 +26,7 @@ impl OciPackerBackendType { match self { OciPackerBackendType::MkSquashfs => OciPackedFormat::Squashfs, OciPackerBackendType::MkfsErofs => OciPackedFormat::Erofs, + OciPackerBackendType::Tar => OciPackedFormat::Tar, } } @@ -31,6 +38,7 @@ impl OciPackerBackendType { OciPackerBackendType::MkfsErofs => { Box::new(OciPackerMkfsErofs {}) as Box } + OciPackerBackendType::Tar => Box::new(OciPackerTar {}) as Box, } } } @@ -53,7 +61,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { }) .await; - let mut child = Command::new("mksquashfs") + let child = Command::new("mksquashfs") .arg("-") .arg(file) .arg("-comp") @@ -63,7 +71,9 @@ impl OciPackerBackend for OciPackerMkSquashfs { .stderr(Stdio::null()) .stdout(Stdio::null()) .spawn()?; + let mut child = ChildProcessKillGuard(child); let stdin = child + .0 .stdin .take() .ok_or(anyhow!("unable to acquire stdin stream"))?; @@ -74,7 +84,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { } Ok(()) })); - let wait = child.wait(); + let wait = child.0.wait(); pin!(wait); let status_result = loop { if let Some(inner) = writer.as_mut() { @@ -135,7 +145,7 @@ impl OciPackerBackend for OciPackerMkfsErofs { }) .await; - let mut child = Command::new("mkfs.erofs") + let child = Command::new("mkfs.erofs") .arg("-L") .arg("root") .arg("--tar=-") @@ -144,14 +154,16 @@ impl OciPackerBackend for OciPackerMkfsErofs { .stderr(Stdio::null()) .stdout(Stdio::null()) .spawn()?; + let mut child = ChildProcessKillGuard(child); let stdin = child + .0 .stdin .take() .ok_or(anyhow!("unable to acquire stdin stream"))?; let mut writer = Some(tokio::task::spawn( async move { vfs.write_to_tar(stdin).await }, )); - let wait = child.wait(); + let wait = child.0.wait(); pin!(wait); let status_result = loop { if let Some(inner) = writer.as_mut() { @@ -199,3 +211,38 @@ impl OciPackerBackend for OciPackerMkfsErofs { } } } + +pub struct OciPackerTar {} + +#[async_trait::async_trait] +impl OciPackerBackend for OciPackerTar { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }) + .await; + + let file = File::create(file).await?; + vfs.write_to_tar(file).await?; + + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }) + .await; + Ok(()) + } +} + +struct ChildProcessKillGuard(Child); + +impl Drop for ChildProcessKillGuard { + fn drop(&mut self) { + let _ = self.0.start_kill(); + } +} diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs index 460520a4..5e4d9c2d 100644 --- a/crates/oci/src/packer/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,4 +1,7 @@ -use crate::packer::{OciImagePacked, OciPackedFormat}; +use crate::{ + packer::{OciPackedFormat, OciPackedImage}, + schema::OciSchema, +}; use anyhow::Result; use log::debug; @@ -22,7 +25,7 @@ impl OciPackerCache { &self, digest: &str, format: OciPackedFormat, - ) -> Result> { + ) -> Result> { let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -38,17 +41,17 @@ impl OciPackerCache { && manifest_metadata.is_file() && config_metadata.is_file() { - let manifest_text = fs::read_to_string(&manifest_path).await?; - let manifest: ImageManifest = serde_json::from_str(&manifest_text)?; - let config_text = fs::read_to_string(&config_path).await?; - let config: ImageConfiguration = serde_json::from_str(&config_text)?; + let manifest_bytes = fs::read(&manifest_path).await?; + let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; + let config_bytes = fs::read(&config_path).await?; + let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; debug!("cache hit digest={}", digest); - Some(OciImagePacked::new( + Some(OciPackedImage::new( digest.to_string(), fs_path.clone(), format, - config, - manifest, + OciSchema::new(config_bytes, config), + OciSchema::new(manifest_bytes, manifest), )) } else { None @@ -60,7 +63,7 @@ impl OciPackerCache { ) } - pub async fn store(&self, packed: OciImagePacked) -> Result { + pub async fn store(&self, packed: OciPackedImage) -> Result { debug!("cache store digest={}", packed.digest); let mut fs_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -68,12 +71,10 @@ impl OciPackerCache { fs_path.push(format!("{}.{}", packed.digest, packed.format.extension())); manifest_path.push(format!("{}.manifest.json", packed.digest)); config_path.push(format!("{}.config.json", packed.digest)); - fs::copy(&packed.path, &fs_path).await?; - let manifest_text = serde_json::to_string_pretty(&packed.manifest)?; - fs::write(&manifest_path, manifest_text).await?; - let config_text = serde_json::to_string_pretty(&packed.config)?; - fs::write(&config_path, config_text).await?; - Ok(OciImagePacked::new( + fs::rename(&packed.path, &fs_path).await?; + fs::write(&config_path, packed.config.raw()).await?; + fs::write(&manifest_path, packed.manifest.raw()).await?; + Ok(OciPackedImage::new( packed.digest, fs_path.clone(), packed.format, diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs index da1c4b2d..dd510ab1 100644 --- a/crates/oci/src/packer/mod.rs +++ b/crates/oci/src/packer/mod.rs @@ -1,5 +1,7 @@ use std::path::PathBuf; +use crate::schema::OciSchema; + use self::backend::OciPackerBackendType; use oci_spec::image::{ImageConfiguration, ImageManifest}; @@ -7,11 +9,12 @@ pub mod backend; pub mod cache; pub mod service; -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Hash)] pub enum OciPackedFormat { #[default] Squashfs, Erofs, + Tar, } impl OciPackedFormat { @@ -19,6 +22,7 @@ impl OciPackedFormat { match self { OciPackedFormat::Squashfs => "squashfs", OciPackedFormat::Erofs => "erofs", + OciPackedFormat::Tar => "tar", } } @@ -26,28 +30,29 @@ impl OciPackedFormat { match self { OciPackedFormat::Squashfs => OciPackerBackendType::MkSquashfs, OciPackedFormat::Erofs => OciPackerBackendType::MkfsErofs, + OciPackedFormat::Tar => OciPackerBackendType::Tar, } } } #[derive(Clone)] -pub struct OciImagePacked { +pub struct OciPackedImage { pub digest: String, pub path: PathBuf, pub format: OciPackedFormat, - pub config: ImageConfiguration, - pub manifest: ImageManifest, + pub config: OciSchema, + pub manifest: OciSchema, } -impl OciImagePacked { +impl OciPackedImage { pub fn new( digest: String, path: PathBuf, format: OciPackedFormat, - config: ImageConfiguration, - manifest: ImageManifest, - ) -> OciImagePacked { - OciImagePacked { + config: OciSchema, + manifest: OciSchema, + ) -> OciPackedImage { + OciPackedImage { digest, path, format, diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 54cf4449..837feb0e 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -1,22 +1,40 @@ -use std::path::{Path, PathBuf}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt::Display, + path::{Path, PathBuf}, + sync::Arc, +}; use anyhow::{anyhow, Result}; +use tokio::{ + sync::{watch, Mutex}, + task::JoinHandle, +}; use crate::{ assemble::OciImageAssembler, - fetch::OciImageFetcher, + fetch::{OciImageFetcher, OciResolvedImage}, name::ImageName, progress::{OciBoundProgress, OciProgress, OciProgressContext}, registry::OciPlatform, }; -use super::{cache::OciPackerCache, OciImagePacked, OciPackedFormat}; +use log::{error, info, warn}; + +use super::{cache::OciPackerCache, OciPackedFormat, OciPackedImage}; + +pub struct OciPackerTask { + progress: OciBoundProgress, + watch: watch::Sender>>, + task: JoinHandle<()>, +} #[derive(Clone)] pub struct OciPackerService { seed: Option, platform: OciPlatform, cache: OciPackerCache, + tasks: Arc>>, } impl OciPackerService { @@ -29,6 +47,7 @@ impl OciPackerService { seed, cache: OciPackerCache::new(cache_dir)?, platform, + tasks: Arc::new(Mutex::new(HashMap::new())), }) } @@ -36,7 +55,7 @@ impl OciPackerService { &self, digest: &str, format: OciPackedFormat, - ) -> Result> { + ) -> Result> { self.cache.recall(digest, format).await } @@ -45,14 +64,98 @@ impl OciPackerService { name: ImageName, format: OciPackedFormat, progress_context: OciProgressContext, - ) -> Result { + ) -> Result { let progress = OciProgress::new(); let progress = OciBoundProgress::new(progress_context.clone(), progress); let fetcher = OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); let resolved = fetcher.resolve(name).await?; + let key = OciPackerTaskKey { + digest: resolved.digest.clone(), + format, + }; + let (progress_copy_task, mut receiver) = match self.tasks.lock().await.entry(key.clone()) { + Entry::Occupied(entry) => { + let entry = entry.get(); + ( + Some(entry.progress.also_update(progress_context).await), + entry.watch.subscribe(), + ) + } + + Entry::Vacant(entry) => { + let task = self + .clone() + .launch(key.clone(), format, resolved, fetcher, progress.clone()) + .await; + let (watch, receiver) = watch::channel(None); + + let task = OciPackerTask { + progress: progress.clone(), + task, + watch, + }; + entry.insert(task); + (None, receiver) + } + }; + + let _progress_task_guard = scopeguard::guard(progress_copy_task, |task| { + if let Some(task) = task { + task.abort(); + } + }); + + let _task_cancel_guard = scopeguard::guard(self.clone(), |service| { + service.maybe_cancel_task(key); + }); + + loop { + receiver.changed().await?; + let current = receiver.borrow_and_update(); + if current.is_some() { + return current + .as_ref() + .map(|x| x.as_ref().map_err(|err| anyhow!("{}", err)).cloned()) + .unwrap(); + } + } + } + + async fn launch( + self, + key: OciPackerTaskKey, + format: OciPackedFormat, + resolved: OciResolvedImage, + fetcher: OciImageFetcher, + progress: OciBoundProgress, + ) -> JoinHandle<()> { + info!("packer task {} started", key); + tokio::task::spawn(async move { + let _task_drop_guard = + scopeguard::guard((key.clone(), self.clone()), |(key, service)| { + service.ensure_task_gone(key); + }); + if let Err(error) = self + .task(key.clone(), format, resolved, fetcher, progress) + .await + { + self.finish(&key, Err(error)).await; + } + }) + } + + async fn task( + &self, + key: OciPackerTaskKey, + format: OciPackedFormat, + resolved: OciResolvedImage, + fetcher: OciImageFetcher, + progress: OciBoundProgress, + ) -> Result<()> { if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { - return Ok(cached); + self.finish(&key, Ok(cached)).await; + return Ok(()); } let assembler = OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; @@ -67,8 +170,7 @@ impl OciPackerService { packer .pack(progress, assembled.vfs.clone(), &target) .await?; - - let packed = OciImagePacked::new( + let packed = OciPackedImage::new( assembled.digest.clone(), file, format, @@ -76,6 +178,59 @@ impl OciPackerService { assembled.manifest.clone(), ); let packed = self.cache.store(packed).await?; - Ok(packed) + self.finish(&key, Ok(packed)).await; + Ok(()) + } + + async fn finish(&self, key: &OciPackerTaskKey, result: Result) { + let Some(task) = self.tasks.lock().await.remove(key) else { + error!("packer task {} was not found when task completed", key); + return; + }; + + match result.as_ref() { + Ok(_) => { + info!("packer task {} completed", key); + } + + Err(err) => { + warn!("packer task {} failed: {}", key, err); + } + } + + task.watch.send_replace(Some(result)); + } + + fn maybe_cancel_task(self, key: OciPackerTaskKey) { + tokio::task::spawn(async move { + let tasks = self.tasks.lock().await; + if let Some(task) = tasks.get(&key) { + if task.watch.is_closed() { + task.task.abort(); + } + } + }); + } + + fn ensure_task_gone(self, key: OciPackerTaskKey) { + tokio::task::spawn(async move { + let mut tasks = self.tasks.lock().await; + if let Some(task) = tasks.remove(&key) { + warn!("packer task {} aborted", key); + task.watch.send_replace(Some(Err(anyhow!("task aborted")))); + } + }); + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +struct OciPackerTaskKey { + digest: String, + format: OciPackedFormat, +} + +impl Display for OciPackerTaskKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}:{}", self.digest, self.format.extension())) } } diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index 43bdaeae..e5e22e3c 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,7 +1,11 @@ +use indexmap::IndexMap; use std::sync::Arc; +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinHandle, +}; -use indexmap::IndexMap; -use tokio::sync::{mpsc::Sender, Mutex}; +const OCI_PROGRESS_QUEUE_LEN: usize = 100; #[derive(Clone, Debug)] pub struct OciProgress { @@ -99,16 +103,25 @@ pub enum OciProgressLayerPhase { #[derive(Clone)] pub struct OciProgressContext { - sender: Sender, + sender: broadcast::Sender, } impl OciProgressContext { - pub fn new(sender: Sender) -> OciProgressContext { + pub fn create() -> (OciProgressContext, broadcast::Receiver) { + let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN); + (OciProgressContext::new(sender), receiver) + } + + pub fn new(sender: broadcast::Sender) -> OciProgressContext { OciProgressContext { sender } } pub fn update(&self, progress: &OciProgress) { - let _ = self.sender.try_send(progress.clone()); + let _ = self.sender.send(progress.clone()); + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() } } @@ -137,4 +150,20 @@ impl OciBoundProgress { function(&mut progress); self.context.update(&progress); } + + pub async fn also_update(&self, context: OciProgressContext) -> JoinHandle<()> { + let progress = self.instance.lock().await.clone(); + context.update(&progress); + let mut receiver = self.context.subscribe(); + tokio::task::spawn(async move { + while let Ok(progress) = receiver.recv().await { + match context.sender.send(progress) { + Ok(_) => {} + Err(_) => { + break; + } + } + } + }) + } } diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index 1b93e51c..86597f64 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -7,7 +7,7 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode}; use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; -use crate::progress::OciBoundProgress; +use crate::{progress::OciBoundProgress, schema::OciSchema}; #[derive(Clone, Debug)] pub struct OciPlatform { @@ -176,7 +176,7 @@ impl OciRegistryClient { &mut self, name: N, reference: R, - ) -> Result<(ImageManifest, String)> { + ) -> Result<(OciSchema, String)> { let url = self.url.join(&format!( "/v2/{}/manifests/{}", name.as_ref(), @@ -198,15 +198,16 @@ impl OciRegistryClient { .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? .to_str()? .to_string(); - let manifest = serde_json::from_str(&response.text().await?)?; - Ok((manifest, digest)) + let bytes = response.bytes().await?; + let manifest = serde_json::from_slice(&bytes)?; + Ok((OciSchema::new(bytes.to_vec(), manifest), digest)) } pub async fn get_manifest_with_digest, R: AsRef>( &mut self, name: N, reference: R, - ) -> Result<(ImageManifest, String)> { + ) -> Result<(OciSchema, String)> { let url = self.url.join(&format!( "/v2/{}/manifests/{}", name.as_ref(), @@ -244,8 +245,9 @@ impl OciRegistryClient { .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? .to_str()? .to_string(); - let manifest = serde_json::from_str(&response.text().await?)?; - Ok((manifest, digest)) + let bytes = response.bytes().await?; + let manifest = serde_json::from_slice(&bytes)?; + Ok((OciSchema::new(bytes.to_vec(), manifest), digest)) } fn pick_manifest(&mut self, index: ImageIndex) -> Option { diff --git a/crates/oci/src/schema.rs b/crates/oci/src/schema.rs new file mode 100644 index 00000000..04bc2030 --- /dev/null +++ b/crates/oci/src/schema.rs @@ -0,0 +1,29 @@ +use std::fmt::Debug; + +#[derive(Clone, Debug)] +pub struct OciSchema { + raw: Vec, + item: T, +} + +impl OciSchema { + pub fn new(raw: Vec, item: T) -> OciSchema { + OciSchema { raw, item } + } + + pub fn raw(&self) -> &[u8] { + &self.raw + } + + pub fn item(&self) -> &T { + &self.item + } + + pub fn into_raw(self) -> Vec { + self.raw + } + + pub fn into_item(self) -> T { + self.item + } +} diff --git a/crates/runtime/src/cfgblk.rs b/crates/runtime/src/cfgblk.rs index 0ae491f0..00f38074 100644 --- a/crates/runtime/src/cfgblk.rs +++ b/crates/runtime/src/cfgblk.rs @@ -1,7 +1,7 @@ use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; use krata::launchcfg::LaunchInfo; -use krataoci::packer::OciImagePacked; +use krataoci::packer::OciPackedImage; use log::trace; use std::fs; use std::fs::File; @@ -9,13 +9,13 @@ use std::path::PathBuf; use uuid::Uuid; pub struct ConfigBlock<'a> { - pub image: &'a OciImagePacked, + pub image: &'a OciPackedImage, pub file: PathBuf, pub dir: PathBuf, } impl ConfigBlock<'_> { - pub fn new<'a>(uuid: &Uuid, image: &'a OciImagePacked) -> Result> { + pub fn new<'a>(uuid: &Uuid, image: &'a OciPackedImage) -> Result> { let mut dir = std::env::temp_dir().clone(); dir.push(format!("krata-cfg-{}", uuid)); fs::create_dir_all(&dir)?; @@ -26,7 +26,7 @@ impl ConfigBlock<'_> { pub fn build(&self, launch_config: &LaunchInfo) -> Result<()> { trace!("build launch_config={:?}", launch_config); - let manifest = self.image.config.to_string()?; + let config = self.image.config.raw(); let launch = serde_json::to_string(launch_config)?; let mut writer = FilesystemWriter::default(); writer.push_dir( @@ -39,7 +39,7 @@ impl ConfigBlock<'_> { }, )?; writer.push_file( - manifest.as_bytes(), + config, "/image/config.json", NodeHeader { permissions: 384, diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index b0ca22d6..e76f8925 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,7 +10,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, }; -use krataoci::packer::OciImagePacked; +use krataoci::packer::OciPackedImage; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -30,7 +30,7 @@ pub struct GuestLaunchRequest { pub env: HashMap, pub run: Option>, pub debug: bool, - pub image: OciImagePacked, + pub image: OciPackedImage, } pub struct GuestLauncher {