diff --git a/Cargo.lock b/Cargo.lock index 427a4bfce..0e5c3ac78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,6 +851,7 @@ checksum = "7bb11bd1378bf3731b182997b40cefe00aba6a6cc74042c8318c1b271d3badf7" dependencies = [ "nix", "thiserror", + "tokio", ] [[package]] @@ -1655,6 +1656,7 @@ dependencies = [ "anyhow", "clap", "command-fds", + "futures", "kona-common", "kona-mpt", "kona-preimage", diff --git a/bin/host/Cargo.toml b/bin/host/Cargo.toml index 2e7aac860..d9a8a54b4 100644 --- a/bin/host/Cargo.toml +++ b/bin/host/Cargo.toml @@ -29,8 +29,9 @@ alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" } alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" } reqwest = "0.12" tokio = { version = "1.37.0", features = ["full"] } +futures = "0.3" clap = { version = "4.5.4", features = ["derive", "env"] } serde = { version = "1.0.198", features = ["derive"] } tracing-subscriber = "0.3.18" -command-fds = "0.3.0" +command-fds = { version = "0.3", features = ["tokio"] } tempfile = "3.10" diff --git a/bin/host/src/cli/mod.rs b/bin/host/src/cli/mod.rs index e844eefaf..dc8d1dcc4 100644 --- a/bin/host/src/cli/mod.rs +++ b/bin/host/src/cli/mod.rs @@ -3,7 +3,8 @@ use alloy_primitives::B256; use clap::{ArgAction, Parser}; use serde::Serialize; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; +use tokio::sync::RwLock; mod parser; pub(crate) use parser::parse_b256; @@ -14,6 +15,11 @@ pub(crate) use types::Network; mod tracing_util; pub(crate) use tracing_util::init_tracing_subscriber; +use crate::kv::{ + DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore, + SplitKeyValueStore, +}; + /// The host binary CLI application arguments. #[derive(Parser, Serialize, Clone)] pub struct HostCli { @@ -66,9 +72,28 @@ pub struct HostCli { } impl HostCli { + /// Returns `true` if the host is running in offline mode. pub fn is_offline(&self) -> bool { self.l1_node_address.is_none() || self.l2_node_address.is_none() || self.l1_beacon_address.is_none() } + + /// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is + /// configured to be created. + pub fn construct_kv_store(&self) -> SharedKeyValueStore { + let local_kv_store = LocalKeyValueStore::new(self.clone()); + + let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir { + let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); + Arc::new(RwLock::new(split_kv_store)) + } else { + let mem_kv_store = MemoryKeyValueStore::new(); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); + Arc::new(RwLock::new(split_kv_store)) + }; + + kv_store + } } diff --git a/bin/host/src/main.rs b/bin/host/src/main.rs index 81d1b3475..be5817c02 100644 --- a/bin/host/src/main.rs +++ b/bin/host/src/main.rs @@ -1,30 +1,37 @@ use crate::{ cli::{init_tracing_subscriber, HostCli}, - kv::{ - DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore, - SplitKeyValueStore, - }, server::PreimageServer, }; use anyhow::{anyhow, Result}; use clap::Parser; use command_fds::{CommandFdExt, FdMapping}; use fetcher::Fetcher; +use futures::FutureExt; use kona_common::FileDescriptor; use kona_preimage::{HintReader, OracleServer, PipeHandle}; +use kv::KeyValueStore; use std::{ io::{stderr, stdin, stdout}, os::fd::AsFd, - process::Command, + panic::AssertUnwindSafe, sync::Arc, }; -use tokio::sync::RwLock; +use tokio::{ + process::Command, + sync::{ + watch::{Receiver, Sender}, + RwLock, + }, + task, +}; use tracing::{error, info}; +use types::NativePipeFiles; mod cli; mod fetcher; mod kv; mod server; +mod types; mod util; #[tokio::main] @@ -52,16 +59,7 @@ async fn start_server(cfg: HostCli) -> Result<()> { let oracle_server = OracleServer::new(preimage_pipe); let hint_reader = HintReader::new(hint_pipe); - let local_kv_store = LocalKeyValueStore::new(cfg.clone()); - let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir { - let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); - Arc::new(RwLock::new(split_kv_store)) - } else { - let mem_kv_store = MemoryKeyValueStore::new(); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); - Arc::new(RwLock::new(split_kv_store)) - }; + let kv_store = cfg.construct_kv_store(); let fetcher = (!cfg.is_offline()).then(|| { let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set")); @@ -81,66 +79,160 @@ async fn start_server(cfg: HostCli) -> Result<()> { /// Starts the [PreimageServer] and the client program in separate threads. The client program is /// ran natively in this mode. async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { - let (preimage_pipe, hint_pipe, mut files) = util::create_native_pipes()?; - let oracle_server = OracleServer::new(preimage_pipe); - let hint_reader = HintReader::new(hint_pipe); - - let local_kv_store = LocalKeyValueStore::new(cfg.clone()); - let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir { - let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); - Arc::new(RwLock::new(split_kv_store)) - } else { - let mem_kv_store = MemoryKeyValueStore::new(); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); - Arc::new(RwLock::new(split_kv_store)) - }; + let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?; + let kv_store = cfg.construct_kv_store(); let fetcher = (!cfg.is_offline()).then(|| { - let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set")); - let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set")); + let l1_provider = + util::http_provider(cfg.l1_node_address.as_ref().expect("Provider must be set")); + let l2_provider = + util::http_provider(cfg.l2_node_address.as_ref().expect("Provider must be set")); Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider))) }); + // Create a channel to signal the server and the client program to exit. + let (tx_server, rx_server) = tokio::sync::watch::channel(()); + let (tx_program, rx_program) = (tx_server.clone(), rx_server.clone()); + // Create the server and start it. - let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher); - let server_task = tokio::task::spawn(server.start()); + let server_task = task::spawn(start_native_preimage_server( + kv_store, + fetcher, + preimage_pipe, + hint_pipe, + tx_server, + rx_server, + )); // Start the client program in a separate child process. - let program_task = tokio::task::spawn(async move { - let mut command = Command::new(cfg.exec); - - // Map the file descriptors to the standard streams and the preimage oracle and hint - // reader's special file descriptors. - command - .fd_mappings(vec![ - FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 }, - FdMapping { - parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), - child_fd: 1, - }, - FdMapping { - parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), - child_fd: 2, - }, - FdMapping { parent_fd: files.remove(3).into(), child_fd: 3 }, - FdMapping { parent_fd: files.remove(2).into(), child_fd: 4 }, - FdMapping { parent_fd: files.remove(1).into(), child_fd: 5 }, - FdMapping { parent_fd: files.remove(0).into(), child_fd: 6 }, - ]) - .expect("No errors may occur when mapping file descriptors."); - - Ok(command.status().map_err(|e| anyhow!(e))?.success()) - }); + let program_task = task::spawn(start_native_client_program(cfg, files, tx_program, rx_program)); + // Executen both tasks and wait for them to complete. info!("Starting preimage server and client program."); - let (server_res, program_res) = - tokio::try_join!(server_task, program_task).map_err(|e| anyhow!(e))?; - server_res?; - if !program_res? { - error!("Client program exited with a non-zero status."); + tokio::try_join!( + util::flatten_join_result(server_task), + util::flatten_join_result(program_task) + ) + .map_err(|e| anyhow!(e))?; + info!("Preimage server and client program have joined."); + + Ok(()) +} + +/// Starts the preimage server in a separate thread. The client program is ran natively in this +/// mode. +async fn start_native_preimage_server( + kv_store: Arc>, + fetcher: Option>>>, + preimage_pipe: PipeHandle, + hint_pipe: PipeHandle, + tx: Sender<()>, + mut rx: Receiver<()>, +) -> Result<()> +where + KV: KeyValueStore + Send + Sync + ?Sized + 'static, +{ + let oracle_server = OracleServer::new(preimage_pipe); + let hint_reader = HintReader::new(hint_pipe); + + let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher); + + let server_pair_task = task::spawn(async move { + AssertUnwindSafe(server.start()) + .catch_unwind() + .await + .map_err(|_| { + error!(target: "preimage_server", "Preimage server panicked"); + anyhow!("Preimage server panicked") + })? + .map_err(|e| { + error!(target: "preimage_server", "Preimage server exited with an error"); + anyhow!("Preimage server exited with an error: {:?}", e) + }) + }); + let rx_server_task = task::spawn(async move { rx.changed().await }); + + // Block the current task until either the client program exits or the server exits. + tokio::select! { + _ = rx_server_task => { + info!(target: "preimage_server", "Received shutdown signal from preimage server task.") + }, + res = util::flatten_join_result(server_pair_task) => { + res?; + } } - info!("Preimage server and client program have exited."); + // Signal to the client program that the server has exited. + let _ = tx.send(()); + + info!("Preimage server has exited."); + Ok(()) +} + +/// Starts the client program in a separate child process. The client program is ran natively in +/// this mode. +/// +/// ## Takes +/// - `cfg`: The host configuration. +/// - `files`: The files that are used to communicate with the native client. +/// - `tx`: The sender to signal the preimage server to exit. +/// - `rx`: The receiver to wait for the preimage server to exit. +/// +/// ## Returns +/// - `Ok(())` if the client program exits successfully. +/// - `Err(_)` if the client program exits with a non-zero status. +async fn start_native_client_program( + cfg: HostCli, + files: NativePipeFiles, + tx: Sender<()>, + mut rx: Receiver<()>, +) -> Result<()> { + // Map the file descriptors to the standard streams and the preimage oracle and hint + // reader's special file descriptors. + let mut command = Command::new(cfg.exec); + command + .fd_mappings(vec![ + FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 }, + FdMapping { parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), child_fd: 1 }, + FdMapping { parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), child_fd: 2 }, + FdMapping { parent_fd: files.hint_writ.into(), child_fd: 3 }, + FdMapping { parent_fd: files.hint_read.into(), child_fd: 4 }, + FdMapping { parent_fd: files.preimage_writ.into(), child_fd: 5 }, + FdMapping { parent_fd: files.preimage_read.into(), child_fd: 6 }, + ]) + .expect("No errors may occur when mapping file descriptors."); + + let exec_task = task::spawn(async move { + let status = command + .status() + .await + .map_err(|e| { + error!(target: "client_program", "Failed to execute client program: {:?}", e); + anyhow!("Failed to execute client program: {:?}", e) + })? + .success(); + Ok::<_, anyhow::Error>(status) + }); + let rx_program_task = task::spawn(async move { rx.changed().await }); + + // Block the current task until either the client program exits or the server exits. + tokio::select! { + _ = rx_program_task => { + info!(target: "client_program", "Received shutdown signal from preimage server task.") + }, + res = util::flatten_join_result(exec_task) => { + if !(res?) { + // Signal to the preimage server that the client program has exited. + let _ = tx.send(()); + error!(target: "client_program", "Client program exited with a non-zero status."); + return Err(anyhow!("Client program exited with a non-zero status.")); + } + } + } + + // Signal to the preimage server that the client program has exited. + let _ = tx.send(()); + + info!(target: "client_program", "Client program has exited."); Ok(()) } diff --git a/bin/host/src/types.rs b/bin/host/src/types.rs new file mode 100644 index 000000000..4254b403d --- /dev/null +++ b/bin/host/src/types.rs @@ -0,0 +1,15 @@ +//! This module contains the types used in the host program. + +use std::fs::File; + +/// Represents the files that are used to communicate with the native client. +pub(crate) struct NativePipeFiles { + /// The file that the preimage oracle reads from. + pub preimage_read: File, + /// The file that the preimage oracle writes to. + pub preimage_writ: File, + /// The file that the hint reader reads from. + pub hint_read: File, + /// The file that the hint reader writes to. + pub hint_writ: File, +} diff --git a/bin/host/src/util.rs b/bin/host/src/util.rs index 9b2459a4c..d14a00b6a 100644 --- a/bin/host/src/util.rs +++ b/bin/host/src/util.rs @@ -1,6 +1,6 @@ //! Contains utility functions and helpers for the host program. -use crate::fetcher::HintType; +use crate::{fetcher::HintType, types::NativePipeFiles}; use alloy_primitives::{hex, Bytes}; use alloy_provider::ReqwestProvider; use alloy_rpc_client::RpcClient; @@ -11,6 +11,7 @@ use kona_preimage::PipeHandle; use reqwest::Client; use std::{fs::File, os::fd::AsRawFd}; use tempfile::tempfile; +use tokio::task::JoinHandle; /// Parses a hint from a string. /// @@ -38,7 +39,7 @@ pub(crate) fn create_temp_files() -> Result<(File, File)> { /// Create a pair of pipes for the preimage oracle and hint reader. Also returns the files that are /// used to create the pipes, which must be kept alive until the pipes are closed. -pub(crate) fn create_native_pipes() -> Result<(PipeHandle, PipeHandle, Vec)> { +pub(crate) fn create_native_pipes() -> Result<(PipeHandle, PipeHandle, NativePipeFiles)> { let (po_reader, po_writer) = create_temp_files()?; let (hint_reader, hint_writer) = create_temp_files()?; let preimage_pipe = PipeHandle::new( @@ -58,7 +59,14 @@ pub(crate) fn create_native_pipes() -> Result<(PipeHandle, PipeHandle, Vec ), ); - Ok((preimage_pipe, hint_pipe, vec![po_reader, po_writer, hint_reader, hint_writer])) + let files = NativePipeFiles { + preimage_read: po_reader, + preimage_writ: po_writer, + hint_read: hint_reader, + hint_writ: hint_writer, + }; + + Ok((preimage_pipe, hint_pipe, files)) } /// Returns an HTTP provider for the given URL. @@ -67,3 +75,17 @@ pub(crate) fn http_provider(url: &str) -> ReqwestProvider { let http = Http::::new(url); ReqwestProvider::new(RpcClient::new(http, true)) } + +/// Flattens the result of a [JoinHandle] into a single result. +pub(crate) async fn flatten_join_result( + handle: JoinHandle>, +) -> Result +where + E: std::fmt::Display, +{ + match handle.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(anyhow!("{}", err)), + Err(err) => anyhow::bail!(err), + } +}