Skip to content

Commit

Permalink
fix(host): Blocking native client program
Browse files Browse the repository at this point in the history
## Overview

Improves the asynchronous logic in `kona-host` to handle gracefully
exiting the parallel host and client threads if either throw.
  • Loading branch information
clabby committed Jun 2, 2024
1 parent 512031f commit 96bc2aa
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 70 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bin/host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" }
reqwest = "0.12"
tokio = { version = "1.37.0", features = ["full"] }
futures = "0.3"
clap = { version = "4.5.4", features = ["derive", "env"] }
serde = { version = "1.0.198", features = ["derive"] }
tracing-subscriber = "0.3.18"
command-fds = "0.3.0"
command-fds = { version = "0.3", features = ["tokio"] }
tempfile = "3.10"
27 changes: 26 additions & 1 deletion bin/host/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use alloy_primitives::B256;
use clap::{ArgAction, Parser};
use serde::Serialize;
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;

mod parser;
pub(crate) use parser::parse_b256;
Expand All @@ -14,6 +15,11 @@ pub(crate) use types::Network;
mod tracing_util;
pub(crate) use tracing_util::init_tracing_subscriber;

use crate::kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
};

/// The host binary CLI application arguments.
#[derive(Parser, Serialize, Clone)]
pub struct HostCli {
Expand Down Expand Up @@ -66,9 +72,28 @@ pub struct HostCli {
}

impl HostCli {
/// Returns `true` if the host is running in offline mode.
pub fn is_offline(&self) -> bool {
self.l1_node_address.is_none() ||
self.l2_node_address.is_none() ||
self.l1_beacon_address.is_none()
}

/// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is
/// configured to be created.
pub fn construct_kv_store(&self) -> SharedKeyValueStore {
let local_kv_store = LocalKeyValueStore::new(self.clone());

let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};

kv_store
}
}
222 changes: 157 additions & 65 deletions bin/host/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
use crate::{
cli::{init_tracing_subscriber, HostCli},
kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
},
server::PreimageServer,
};
use anyhow::{anyhow, Result};
use clap::Parser;
use command_fds::{CommandFdExt, FdMapping};
use fetcher::Fetcher;
use futures::FutureExt;
use kona_common::FileDescriptor;
use kona_preimage::{HintReader, OracleServer, PipeHandle};
use kv::KeyValueStore;
use std::{
io::{stderr, stdin, stdout},
os::fd::AsFd,
process::Command,
panic::AssertUnwindSafe,
sync::Arc,
};
use tokio::sync::RwLock;
use tokio::{
process::Command,
sync::{
watch::{Receiver, Sender},
RwLock,
},
task,
};
use tracing::{error, info};
use types::NativePipeFiles;

mod cli;
mod fetcher;
mod kv;
mod server;
mod types;
mod util;

#[tokio::main]
Expand Down Expand Up @@ -52,16 +59,7 @@ async fn start_server(cfg: HostCli) -> Result<()> {
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let local_kv_store = LocalKeyValueStore::new(cfg.clone());
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};
let kv_store = cfg.construct_kv_store();

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
Expand All @@ -81,66 +79,160 @@ async fn start_server(cfg: HostCli) -> Result<()> {
/// Starts the [PreimageServer] and the client program in separate threads. The client program is
/// ran natively in this mode.
async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
let (preimage_pipe, hint_pipe, mut files) = util::create_native_pipes()?;
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let local_kv_store = LocalKeyValueStore::new(cfg.clone());
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};
let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?;
let kv_store = cfg.construct_kv_store();

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set"));
let l1_provider =
util::http_provider(cfg.l1_node_address.as_ref().expect("Provider must be set"));
let l2_provider =
util::http_provider(cfg.l2_node_address.as_ref().expect("Provider must be set"));
Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider)))
});

// Create a channel to signal the server and the client program to exit.
let (tx_server, rx_server) = tokio::sync::watch::channel(());
let (tx_program, rx_program) = (tx_server.clone(), rx_server.clone());

// Create the server and start it.
let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);
let server_task = tokio::task::spawn(server.start());
let server_task = task::spawn(start_native_preimage_server(
kv_store,
fetcher,
preimage_pipe,
hint_pipe,
tx_server,
rx_server,
));

// Start the client program in a separate child process.
let program_task = tokio::task::spawn(async move {
let mut command = Command::new(cfg.exec);

// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
command
.fd_mappings(vec![
FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 },
FdMapping {
parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(),
child_fd: 1,
},
FdMapping {
parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(),
child_fd: 2,
},
FdMapping { parent_fd: files.remove(3).into(), child_fd: 3 },
FdMapping { parent_fd: files.remove(2).into(), child_fd: 4 },
FdMapping { parent_fd: files.remove(1).into(), child_fd: 5 },
FdMapping { parent_fd: files.remove(0).into(), child_fd: 6 },
])
.expect("No errors may occur when mapping file descriptors.");

Ok(command.status().map_err(|e| anyhow!(e))?.success())
});
let program_task = task::spawn(start_native_client_program(cfg, files, tx_program, rx_program));

// Executen both tasks and wait for them to complete.
info!("Starting preimage server and client program.");
let (server_res, program_res) =
tokio::try_join!(server_task, program_task).map_err(|e| anyhow!(e))?;
server_res?;
if !program_res? {
error!("Client program exited with a non-zero status.");
tokio::try_join!(
util::flatten_join_result(server_task),
util::flatten_join_result(program_task)
)
.map_err(|e| anyhow!(e))?;
info!("Preimage server and client program have joined.");

Ok(())
}

/// Starts the preimage server in a separate thread. The client program is ran natively in this
/// mode.
async fn start_native_preimage_server<KV>(
kv_store: Arc<RwLock<KV>>,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
preimage_pipe: PipeHandle,
hint_pipe: PipeHandle,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()>
where
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);

let server_pair_task = task::spawn(async move {
AssertUnwindSafe(server.start())
.catch_unwind()
.await
.map_err(|_| {
error!(target: "preimage_server", "Preimage server panicked");
anyhow!("Preimage server panicked")
})?
.map_err(|e| {
error!(target: "preimage_server", "Preimage server exited with an error");
anyhow!("Preimage server exited with an error: {:?}", e)
})
});
let rx_server_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_server_task => {
info!(target: "preimage_server", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(server_pair_task) => {
res?;
}
}
info!("Preimage server and client program have exited.");

// Signal to the client program that the server has exited.
let _ = tx.send(());

info!("Preimage server has exited.");
Ok(())
}

/// Starts the client program in a separate child process. The client program is ran natively in
/// this mode.
///
/// ## Takes
/// - `cfg`: The host configuration.
/// - `files`: The files that are used to communicate with the native client.
/// - `tx`: The sender to signal the preimage server to exit.
/// - `rx`: The receiver to wait for the preimage server to exit.
///
/// ## Returns
/// - `Ok(())` if the client program exits successfully.
/// - `Err(_)` if the client program exits with a non-zero status.
async fn start_native_client_program(
cfg: HostCli,
files: NativePipeFiles,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()> {
// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
let mut command = Command::new(cfg.exec);
command
.fd_mappings(vec![
FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 },
FdMapping { parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), child_fd: 1 },
FdMapping { parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), child_fd: 2 },
FdMapping { parent_fd: files.hint_writ.into(), child_fd: 3 },
FdMapping { parent_fd: files.hint_read.into(), child_fd: 4 },
FdMapping { parent_fd: files.preimage_writ.into(), child_fd: 5 },
FdMapping { parent_fd: files.preimage_read.into(), child_fd: 6 },
])
.expect("No errors may occur when mapping file descriptors.");

let exec_task = task::spawn(async move {
let status = command
.status()
.await
.map_err(|e| {
error!(target: "client_program", "Failed to execute client program: {:?}", e);
anyhow!("Failed to execute client program: {:?}", e)
})?
.success();
Ok::<_, anyhow::Error>(status)
});
let rx_program_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_program_task => {
info!(target: "client_program", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(exec_task) => {
if !(res?) {
// Signal to the preimage server that the client program has exited.
let _ = tx.send(());
error!(target: "client_program", "Client program exited with a non-zero status.");
return Err(anyhow!("Client program exited with a non-zero status."));
}
}
}

// Signal to the preimage server that the client program has exited.
let _ = tx.send(());

info!(target: "client_program", "Client program has exited.");
Ok(())
}
15 changes: 15 additions & 0 deletions bin/host/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! This module contains the types used in the host program.

use std::fs::File;

/// Represents the files that are used to communicate with the native client.
pub(crate) struct NativePipeFiles {
/// The file that the preimage oracle reads from.
pub preimage_read: File,
/// The file that the preimage oracle writes to.
pub preimage_writ: File,
/// The file that the hint reader reads from.
pub hint_read: File,
/// The file that the hint reader writes to.
pub hint_writ: File,
}
Loading

0 comments on commit 96bc2aa

Please sign in to comment.