Skip to content

Commit

Permalink
feat(host): Disk backed KV store
Browse files Browse the repository at this point in the history
## Overview

Adds a disk-backed KV store for the host program, activated when
`--data-dir` is passed.
  • Loading branch information
clabby committed May 29, 2024
1 parent 48ba4e0 commit 05a7716
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 22 deletions.
10 changes: 5 additions & 5 deletions bin/host/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod precompiles;
/// The [Fetcher] struct is responsible for fetching preimages from a remote source.
pub struct Fetcher<KV>
where
KV: KeyValueStore,
KV: KeyValueStore + ?Sized,
{
/// Key-value store for preimages.
kv_store: Arc<RwLock<KV>>,
Expand All @@ -37,7 +37,7 @@ where

impl<KV> Fetcher<KV>
where
KV: KeyValueStore,
KV: KeyValueStore + ?Sized,
{
/// Create a new [Fetcher] with the given [KeyValueStore].
pub fn new(
Expand All @@ -60,9 +60,9 @@ where

// Acquire a read lock on the key-value store.
let kv_lock = self.kv_store.read().await;
let mut preimage = kv_lock.get(key).cloned();
let mut preimage = kv_lock.get(key);

// Drop the read lock before beginning the loop.
// Drop the read lock before beginning the retry loop.
drop(kv_lock);

// Use a loop to keep retrying the prefetch as long as the key is not found
Expand All @@ -71,7 +71,7 @@ where
self.prefetch(hint).await?;

let kv_lock = self.kv_store.read().await;
preimage = kv_lock.get(key).cloned();
preimage = kv_lock.get(key);
}

preimage.ok_or_else(|| anyhow!("Preimage not found."))
Expand Down
35 changes: 35 additions & 0 deletions bin/host/src/kv/disk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Contains a concrete implementation of the [KeyValueStore] trait that stores data on disk.
//!
//! Data is stored in a directory, with a separate file for each key. The key is the filename, and
//! the value is the raw contents of the file.

use super::KeyValueStore;
use alloy_primitives::hex;
use std::{fs, path::PathBuf};

/// A simple, synchronous key-value store that stores data on disk.
#[derive(Default, Clone, Debug, Eq, PartialEq)]
pub struct DiskKeyValueStore {
data_directory: PathBuf,
}

impl DiskKeyValueStore {
/// Create a new [DiskKeyValueStore] with the given data directory.
pub fn new(data_directory: PathBuf) -> Self {
Self { data_directory }
}
}

impl KeyValueStore for DiskKeyValueStore {
fn get(&self, key: alloy_primitives::B256) -> Option<Vec<u8>> {
let path = self.data_directory.join(format!("{}.bin", hex::encode(key)));
fs::create_dir_all(&self.data_directory).ok()?;
fs::read(path).ok()
}

fn set(&mut self, key: alloy_primitives::B256, value: Vec<u8>) {
let path = self.data_directory.join(format!("{}.bin", hex::encode(key)));
fs::create_dir_all(&self.data_directory).expect("Failed to create directory");
fs::write(path, value.as_slice()).expect("Failed to write data to disk");
}
}
7 changes: 3 additions & 4 deletions bin/host/src/kv/mem.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! Contains a concrete implementation of the [KeyValueStore] trait that stores data in memory.

use alloy_primitives::B256;

use super::KeyValueStore;
use alloy_primitives::B256;
use std::collections::HashMap;

/// A simple, synchronous key-value store that stores data in memory. This is useful for testing and
Expand All @@ -20,8 +19,8 @@ impl MemoryKeyValueStore {
}

impl KeyValueStore for MemoryKeyValueStore {
fn get(&self, key: B256) -> Option<&Vec<u8>> {
self.store.get(&key)
fn get(&self, key: B256) -> Option<Vec<u8>> {
self.store.get(&key).cloned()
}

fn set(&mut self, key: B256, value: Vec<u8>) {
Expand Down
10 changes: 9 additions & 1 deletion bin/host/src/kv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
//! This module contains the [KeyValueStore] trait and concrete implementations of it.

use alloy_primitives::B256;
use std::sync::Arc;
use tokio::sync::RwLock;

mod mem;
pub use mem::MemoryKeyValueStore;

mod disk;
pub use disk::DiskKeyValueStore;

/// A type alias for a shared key-value store.
pub type SharedKeyValueStore = Arc<RwLock<dyn KeyValueStore + Send + Sync>>;

/// Describes the interface of a simple, synchronous key-value store.
pub trait KeyValueStore {
/// Get the value associated with the given key.
fn get(&self, key: B256) -> Option<&Vec<u8>>;
fn get(&self, key: B256) -> Option<Vec<u8>>;

/// Set the value associated with the given key.
fn set(&mut self, key: B256, value: Vec<u8>);
Expand Down
24 changes: 15 additions & 9 deletions bin/host/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
cli::{init_tracing_subscriber, HostCli},
kv::MemoryKeyValueStore,
kv::{DiskKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore},
server::PreimageServer,
};
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -49,18 +49,21 @@ async fn start_server(cfg: HostCli) -> Result<()> {
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

// TODO: Optional disk store if `cli.data_dir` is set.
let mem_kv_store = Arc::new(RwLock::new(MemoryKeyValueStore::new()));
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
Arc::new(RwLock::new(DiskKeyValueStore::new(data_dir.clone())))
} else {
Arc::new(RwLock::new(MemoryKeyValueStore::new()))
};

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set"));
Arc::new(RwLock::new(Fetcher::new(mem_kv_store.clone(), l1_provider, l2_provider)))
Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider)))
});

// Start the server and wait for it to complete.
info!("Starting preimage server.");
let server = PreimageServer::new(oracle_server, hint_reader, mem_kv_store, fetcher);
let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);
server.start().await?;
info!("Preimage server has exited.");

Expand All @@ -74,17 +77,20 @@ async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
let oracle_server = OracleServer::new(preimage_pipe);
let hint_reader = HintReader::new(hint_pipe);

// TODO: Optional disk store if `cli.data_dir` is set.
let mem_kv_store = Arc::new(RwLock::new(MemoryKeyValueStore::new()));
let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = cfg.data_dir {
Arc::new(RwLock::new(DiskKeyValueStore::new(data_dir.clone())))
} else {
Arc::new(RwLock::new(MemoryKeyValueStore::new()))
};

let fetcher = (!cfg.is_offline()).then(|| {
let l1_provider = util::http_provider(&cfg.l1_node_address.expect("Provider must be set"));
let l2_provider = util::http_provider(&cfg.l2_node_address.expect("Provider must be set"));
Arc::new(RwLock::new(Fetcher::new(mem_kv_store.clone(), l1_provider, l2_provider)))
Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider)))
});

// Create the server and start it.
let server = PreimageServer::new(oracle_server, hint_reader, mem_kv_store, fetcher);
let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);
let server_task = tokio::task::spawn(server.start());

// Start the client program in a separate child process.
Expand Down
5 changes: 2 additions & 3 deletions bin/host/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct PreimageServer<P, H, KV>
where
P: PreimageOracleServer,
H: HintReaderServer,
KV: KeyValueStore,
KV: KeyValueStore + ?Sized,
{
/// The oracle server.
oracle_server: P,
Expand All @@ -30,7 +30,7 @@ impl<P, H, KV> PreimageServer<P, H, KV>
where
P: PreimageOracleServer + Send + Sync + 'static,
H: HintReaderServer + Send + Sync + 'static,
KV: KeyValueStore + Send + Sync + 'static,
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
{
/// Create a new [PreimageServer] with the given [PreimageOracleServer],
/// [HintReaderServer], and [KeyValueStore]. Holds onto the file descriptors for the pipes
Expand Down Expand Up @@ -85,7 +85,6 @@ where
.await
.get(key.into())
.ok_or_else(|| anyhow!("Preimage not found"))
.cloned()
})
}
};
Expand Down

0 comments on commit 05a7716

Please sign in to comment.