Skip to content

Commit

Permalink
feat(preimage): Async server components
Browse files Browse the repository at this point in the history
Makes the server components in `kona-preimage` async to allow for remote
data fetching in the `host` program.
  • Loading branch information
clabby committed May 29, 2024
1 parent 8cf2095 commit bf79769
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ kona-primitives = { path = "../primitives", version = "0.0.1" }
alloy-sol-types = { version = "0.7.1", default-features = false }
op-alloy-consensus = { git = "https://github.com/clabby/op-alloy", branch = "refcell/consensus-port", default-features = false }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
async-trait = "0.1.77"
async-trait = "0.1.80"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }
Expand Down
2 changes: 1 addition & 1 deletion crates/plasma/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ kona-derive = { path = "../derive" }
# External
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
alloy-primitives = { workspace = true, features = ["rlp"] }
async-trait = "0.1.77"
async-trait = "0.1.80"

# `serde` feature dependencies
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/preimage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ alloy-primitives.workspace = true
# local
kona-common = { path = "../common", version = "0.0.1" }

# external
async-trait = "0.1.80"

[dev-dependencies]
tokio = { version = "1.36.0", features = ["full"] }
tempfile = "3.10.0"
48 changes: 32 additions & 16 deletions crates/preimage/src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{traits::HintWriterClient, HintReaderServer, PipeHandle};
use alloc::{string::String, vec};
use alloc::{boxed::Box, string::String, vec};
use anyhow::Result;
use core::future::Future;
use tracing::{debug, error};

/// A [HintWriter] is a high-level interface to the hint pipe. It provides a way to write hints to
Expand Down Expand Up @@ -58,8 +59,13 @@ impl HintReader {
}
}

#[async_trait::async_trait]
impl HintReaderServer for HintReader {
fn next_hint(&self, mut route_hint: impl FnMut(String) -> Result<()>) -> Result<()> {
async fn next_hint<F, Fut>(&mut self, mut route_hint: F) -> Result<()>
where
F: FnMut(String) -> Fut + Send,
Fut: Future<Output = Result<()>> + Send,
{
// Read the length of the raw hint payload.
let mut len_buf = [0u8; 4];
self.pipe_handle.read_exact(&mut len_buf)?;
Expand All @@ -74,7 +80,7 @@ impl HintReaderServer for HintReader {
debug!(target: "hint_reader", "Successfully read hint: \"{payload}\"");

// Route the hint
if let Err(e) = route_hint(payload) {
if let Err(e) = route_hint(payload).await {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00])?;

Expand All @@ -90,15 +96,18 @@ impl HintReaderServer for HintReader {
Ok(())
}
}

#[cfg(test)]
mod test {
extern crate std;

use super::*;
use alloc::vec::Vec;
use alloc::{sync::Arc, vec::Vec};
use core::pin::Pin;
use kona_common::FileDescriptor;
use std::{fs::File, os::fd::AsRawFd};
use tempfile::tempfile;
use tokio::sync::Mutex;

/// Test struct containing the [HintReader] and [HintWriter]. The [File]s are stored in this
/// struct so that they are not dropped until the end of the test.
Expand Down Expand Up @@ -132,20 +141,27 @@ mod test {
const MOCK_DATA: &str = "test-hint 0xfacade";

let sys = client_and_host();
let (hint_writer, hint_reader) = (sys.hint_writer, sys.hint_reader);
let (hint_writer, mut hint_reader) = (sys.hint_writer, sys.hint_reader);
let incoming_hints = Arc::new(Mutex::new(Vec::new()));

let client = tokio::task::spawn(async move { hint_writer.write(MOCK_DATA) });
let host = tokio::task::spawn(async move {
let mut v = Vec::new();
let route_hint = |hint: String| {
v.push(hint.clone());
Ok(())
};
hint_reader.next_hint(route_hint).unwrap();

assert_eq!(v.len(), 1);

v.remove(0)
let host = tokio::task::spawn({
let incoming_hints_ref = Arc::clone(&incoming_hints);
async move {
let route_hint =
move |hint: String| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let hints = Arc::clone(&incoming_hints_ref);
Box::pin(async move {
hints.lock().await.push(hint.clone());
Ok(())
})
};
hint_reader.next_hint(&route_hint).await.unwrap();

let mut hints = incoming_hints.lock().await;
assert_eq!(hints.len(), 1);
hints.remove(0)
}
});

let (_, h) = tokio::join!(client, host);
Expand Down
47 changes: 34 additions & 13 deletions crates/preimage/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer};
use alloc::vec::Vec;
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use anyhow::{bail, Result};
use core::future::Future;
use tracing::debug;

/// An [OracleReader] is a high-level interface to the preimage oracle.
Expand Down Expand Up @@ -85,11 +86,13 @@ impl OracleServer {
}
}

#[async_trait::async_trait]
impl PreimageOracleServer for OracleServer {
fn next_preimage_request<'a>(
&self,
mut get_preimage: impl FnMut(PreimageKey) -> Result<&'a Vec<u8>>,
) -> Result<()> {
async fn next_preimage_request<F, Fut>(&mut self, mut get_preimage: F) -> Result<()>
where
F: FnMut(PreimageKey) -> Fut + Send,
Fut: Future<Output = Result<Arc<Vec<u8>>>> + Send,
{
// Read the preimage request from the client, and throw early if there isn't is any.
let mut buf = [0u8; 32];
self.pipe_handle.read_exact(&mut buf)?;
Expand All @@ -98,7 +101,7 @@ impl PreimageOracleServer for OracleServer {
debug!(target: "oracle_server", "Fetching preimage for key {preimage_key}");

// Fetch the preimage value from the preimage getter.
let value = get_preimage(preimage_key)?;
let value = get_preimage(preimage_key).await?;

// Write the length as a big-endian u64 followed by the data.
let data = [(value.len() as u64).to_be_bytes().as_ref(), value.as_ref()]
Expand All @@ -121,9 +124,11 @@ mod test {
use super::*;
use crate::PreimageKeyType;
use alloy_primitives::keccak256;
use core::pin::Pin;
use kona_common::FileDescriptor;
use std::{collections::HashMap, fs::File, os::fd::AsRawFd};
use tempfile::tempfile;
use tokio::sync::Mutex;

/// Test struct containing the [OracleReader] and a [OracleServer] for the host, plus the open
/// [File]s. The [File]s are stored in this struct so that they are not dropped until the
Expand Down Expand Up @@ -167,12 +172,15 @@ mod test {
let key_b: PreimageKey =
PreimageKey::new(*keccak256(MOCK_DATA_B), PreimageKeyType::Keccak256);

let mut preimages = HashMap::new();
preimages.insert(key_a, MOCK_DATA_A.to_vec());
preimages.insert(key_b, MOCK_DATA_B.to_vec());
let preimages = {
let mut preimages = HashMap::new();
preimages.insert(key_a, Arc::new(MOCK_DATA_A.to_vec()));
preimages.insert(key_b, Arc::new(MOCK_DATA_B.to_vec()));
Arc::new(Mutex::new(preimages))
};

let sys = client_and_host();
let (oracle_reader, oracle_server) = (sys.oracle_reader, sys.oracle_server);
let (oracle_reader, mut oracle_server) = (sys.oracle_reader, sys.oracle_server);

let client = tokio::task::spawn(async move {
let contents_a = oracle_reader.get(key_a).unwrap();
Expand All @@ -185,11 +193,24 @@ mod test {
(contents_a, contents_b)
});
let host = tokio::task::spawn(async move {
let get_preimage =
|key| preimages.get(&key).ok_or(anyhow::anyhow!("Preimage not available"));
#[allow(clippy::type_complexity)]
let get_preimage = move |key: PreimageKey| -> Pin<
Box<dyn Future<Output = Result<Arc<Vec<u8>>>> + Send>,
> {
let preimages = Arc::clone(&preimages);
Box::pin(async move {
// Simulate fetching preimage data
preimages
.lock()
.await
.get(&key)
.ok_or(anyhow::anyhow!("Preimage not available"))
.cloned()
})
};

loop {
if oracle_server.next_preimage_request(get_preimage).is_err() {
if oracle_server.next_preimage_request(&get_preimage).await.is_err() {
break;
}
}
Expand Down
18 changes: 12 additions & 6 deletions crates/preimage/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::PreimageKey;
use alloc::{string::String, vec::Vec};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use anyhow::Result;
use core::future::Future;

/// A [PreimageOracleClient] is a high-level interface to read data from the host, keyed by a
/// [PreimageKey].
Expand Down Expand Up @@ -36,26 +37,31 @@ pub trait HintWriterClient {

/// A [PreimageOracleServer] is a high-level interface to accept read requests from the client and
/// write the preimage data to the client pipe.
#[async_trait::async_trait]
pub trait PreimageOracleServer {
/// Get the next preimage request and return the response to the client.
///
/// # Returns
/// - `Ok(())` if the data was successfully written into the client pipe.
/// - `Err(_)` if the data could not be written to the client.
fn next_preimage_request<'a>(
&self,
get_preimage: impl FnMut(PreimageKey) -> Result<&'a Vec<u8>>,
) -> Result<()>;
async fn next_preimage_request<F, Fut>(&mut self, get_preimage: F) -> Result<()>
where
F: FnMut(PreimageKey) -> Fut + Send,
Fut: Future<Output = Result<Arc<Vec<u8>>>> + Send;
}

/// A [HintReaderServer] is a high-level interface to read preimage hints from the
/// [HintWriterClient] and prepare them for consumption by the client program.
#[async_trait::async_trait]
pub trait HintReaderServer {
/// Get the next hint request and return the acknowledgement to the client.
///
/// # Returns
/// - `Ok(())` if the hint was received and the client was notified of the host's
/// acknowledgement.
/// - `Err(_)` if the hint was not received correctly.
fn next_hint(&self, route_hint: impl FnMut(String) -> Result<()>) -> Result<()>;
async fn next_hint<F, Fut>(&mut self, route_hint: F) -> Result<()>
where
F: FnMut(String) -> Fut + Send,
Fut: Future<Output = Result<()>> + Send;
}

0 comments on commit bf79769

Please sign in to comment.