Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(host): Blocking native client program #201

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bin/host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07" }
reqwest = "0.12"
tokio = { version = "1.37.0", features = ["full"] }
futures = "0.3"
clap = { version = "4.5.4", features = ["derive", "env"] }
serde = { version = "1.0.198", features = ["derive"] }
tracing-subscriber = "0.3.18"
command-fds = "0.3.0"
command-fds = { version = "0.3", features = ["tokio"] }
tempfile = "3.10"
26 changes: 25 additions & 1 deletion bin/host/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
//! This module contains all CLI-specific code for the host binary.

use crate::kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
};
use alloy_primitives::B256;
use clap::{ArgAction, Parser};
use serde::Serialize;
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;

mod parser;
pub(crate) use parser::parse_b256;
Expand Down Expand Up @@ -66,9 +71,28 @@ pub struct HostCli {
}

impl HostCli {
/// Returns `true` if the host is running in offline mode.
pub fn is_offline(&self) -> bool {
self.l1_node_address.is_none() ||
self.l2_node_address.is_none() ||
self.l1_beacon_address.is_none()
}

/// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is
/// configured to be created.
pub fn construct_kv_store(&self) -> SharedKeyValueStore {
let local_kv_store = LocalKeyValueStore::new(self.clone());

let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};

kv_store
}
}
222 changes: 157 additions & 65 deletions bin/host/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
use crate::{
cli::{init_tracing_subscriber, HostCli},
kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
},
server::PreimageServer,
};
use anyhow::{anyhow, Result};
use clap::Parser;
use command_fds::{CommandFdExt, FdMapping};
use fetcher::Fetcher;
use futures::FutureExt;
use kona_common::FileDescriptor;
use kona_preimage::{HintReader, OracleServer, PipeHandle};
use kv::KeyValueStore;
use std::{
io::{stderr, stdin, stdout},
os::fd::AsFd,
process::Command,
panic::AssertUnwindSafe,
sync::Arc,
};
use tokio::sync::RwLock;
use tokio::{
process::Command,
sync::{
watch::{Receiver, Sender},
RwLock,
},
task,
};
use tracing::{error, info};
use types::NativePipeFiles;

mod cli;
mod fetcher;
mod kv;
mod server;
mod types;
mod util;

#[tokio::main]
Expand Down Expand Up @@ -52,16 +59,7 @@ async fn start_server(cfg: HostCli) -> Result<()> {
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let local_kv_store = LocalKeyValueStore::new(cfg.clone());
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};
let kv_store = cfg.construct_kv_store();

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
Expand All @@ -81,66 +79,160 @@ async fn start_server(cfg: HostCli) -> Result<()> {
/// Starts the [PreimageServer] and the client program in separate threads. The client program is
/// ran natively in this mode.
async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
let (preimage_pipe, hint_pipe, mut files) = util::create_native_pipes()?;
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let local_kv_store = LocalKeyValueStore::new(cfg.clone());
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
let disk_kv_store = DiskKeyValueStore::new(data_dir.clone());
let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store);
Arc::new(RwLock::new(split_kv_store))
} else {
let mem_kv_store = MemoryKeyValueStore::new();
let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store);
Arc::new(RwLock::new(split_kv_store))
};
let (preimage_pipe, hint_pipe, files) = util::create_native_pipes()?;
let kv_store = cfg.construct_kv_store();

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set"));
let l1_provider =
util::http_provider(cfg.l1_node_address.as_ref().expect("Provider must be set"));
let l2_provider =
util::http_provider(cfg.l2_node_address.as_ref().expect("Provider must be set"));
Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider)))
});

// Create a channel to signal the server and the client program to exit.
let (tx_server, rx_server) = tokio::sync::watch::channel(());
let (tx_program, rx_program) = (tx_server.clone(), rx_server.clone());

// Create the server and start it.
let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);
let server_task = tokio::task::spawn(server.start());
let server_task = task::spawn(start_native_preimage_server(
kv_store,
fetcher,
preimage_pipe,
hint_pipe,
tx_server,
rx_server,
));

// Start the client program in a separate child process.
let program_task = tokio::task::spawn(async move {
let mut command = Command::new(cfg.exec);

// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
command
.fd_mappings(vec![
FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 },
FdMapping {
parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(),
child_fd: 1,
},
FdMapping {
parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(),
child_fd: 2,
},
FdMapping { parent_fd: files.remove(3).into(), child_fd: 3 },
FdMapping { parent_fd: files.remove(2).into(), child_fd: 4 },
FdMapping { parent_fd: files.remove(1).into(), child_fd: 5 },
FdMapping { parent_fd: files.remove(0).into(), child_fd: 6 },
])
.expect("No errors may occur when mapping file descriptors.");

Ok(command.status().map_err(|e| anyhow!(e))?.success())
});
let program_task = task::spawn(start_native_client_program(cfg, files, tx_program, rx_program));

// Execute both tasks and wait for them to complete.
info!("Starting preimage server and client program.");
let (server_res, program_res) =
tokio::try_join!(server_task, program_task).map_err(|e| anyhow!(e))?;
server_res?;
if !program_res? {
error!("Client program exited with a non-zero status.");
tokio::try_join!(
util::flatten_join_result(server_task),
util::flatten_join_result(program_task)
)
.map_err(|e| anyhow!(e))?;
info!("Preimage server and client program have joined.");

Ok(())
}

/// Starts the preimage server in a separate thread. The client program is ran natively in this
/// mode.
async fn start_native_preimage_server<KV>(
kv_store: Arc<RwLock<KV>>,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
preimage_pipe: PipeHandle,
hint_pipe: PipeHandle,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()>
where
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);

let server_pair_task = task::spawn(async move {
AssertUnwindSafe(server.start())
.catch_unwind()
.await
.map_err(|_| {
error!(target: "preimage_server", "Preimage server panicked");
anyhow!("Preimage server panicked")
})?
.map_err(|e| {
error!(target: "preimage_server", "Preimage server exited with an error");
anyhow!("Preimage server exited with an error: {:?}", e)
})
});
let rx_server_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_server_task => {
info!(target: "preimage_server", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(server_pair_task) => {
res?;
}
}
info!("Preimage server and client program have exited.");

// Signal to the client program that the server has exited.
let _ = tx.send(());

info!("Preimage server has exited.");
Ok(())
}

/// Starts the client program in a separate child process. The client program is ran natively in
/// this mode.
///
/// ## Takes
/// - `cfg`: The host configuration.
/// - `files`: The files that are used to communicate with the native client.
/// - `tx`: The sender to signal the preimage server to exit.
/// - `rx`: The receiver to wait for the preimage server to exit.
///
/// ## Returns
/// - `Ok(())` if the client program exits successfully.
/// - `Err(_)` if the client program exits with a non-zero status.
async fn start_native_client_program(
cfg: HostCli,
files: NativePipeFiles,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()> {
// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
let mut command = Command::new(cfg.exec);
command
.fd_mappings(vec![
FdMapping { parent_fd: stdin().as_fd().try_clone_to_owned().unwrap(), child_fd: 0 },
FdMapping { parent_fd: stdout().as_fd().try_clone_to_owned().unwrap(), child_fd: 1 },
FdMapping { parent_fd: stderr().as_fd().try_clone_to_owned().unwrap(), child_fd: 2 },
FdMapping { parent_fd: files.hint_writ.into(), child_fd: 3 },
FdMapping { parent_fd: files.hint_read.into(), child_fd: 4 },
FdMapping { parent_fd: files.preimage_writ.into(), child_fd: 5 },
FdMapping { parent_fd: files.preimage_read.into(), child_fd: 6 },
])
.expect("No errors may occur when mapping file descriptors.");

let exec_task = task::spawn(async move {
let status = command
.status()
.await
.map_err(|e| {
error!(target: "client_program", "Failed to execute client program: {:?}", e);
anyhow!("Failed to execute client program: {:?}", e)
})?
.success();
Ok::<_, anyhow::Error>(status)
});
let rx_program_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_program_task => {
info!(target: "client_program", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(exec_task) => {
if !(res?) {
// Signal to the preimage server that the client program has exited.
let _ = tx.send(());
error!(target: "client_program", "Client program exited with a non-zero status.");
return Err(anyhow!("Client program exited with a non-zero status."));
}
}
}

// Signal to the preimage server that the client program has exited.
let _ = tx.send(());

info!(target: "client_program", "Client program has exited.");
Ok(())
}
15 changes: 15 additions & 0 deletions bin/host/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! This module contains the types used in the host program.

use std::fs::File;

/// Represents the files that are used to communicate with the native client.
pub(crate) struct NativePipeFiles {
/// The file that the preimage oracle reads from.
pub preimage_read: File,
/// The file that the preimage oracle writes to.
pub preimage_writ: File,
/// The file that the hint reader reads from.
pub hint_read: File,
/// The file that the hint reader writes to.
pub hint_writ: File,
}
Loading
Loading