Skip to content

Commit

Permalink
feat: make storage keys accessible to tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Mar 24, 2024
1 parent 1fb1cb0 commit 28c70ff
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 79 deletions.
104 changes: 90 additions & 14 deletions crates/snot/src/schema/storage.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use std::{
collections::HashMap,
ops::Deref,
path::PathBuf,
process::Stdio,
sync::atomic::{AtomicUsize, Ordering},
};

use anyhow::ensure;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize};
use anyhow::{anyhow, ensure};
use indexmap::IndexMap;
use serde::{
de::{DeserializeOwned, Visitor},
Deserialize, Deserializer, Serialize,
};
use tokio::process::Command;
use tracing::warn;

use crate::state::GlobalState;

use super::nodes::KeySource;

/// A storage document. Explains how storage for a test should be set up.
#[derive(Deserialize, Debug, Clone)]
pub struct Document {
Expand Down Expand Up @@ -72,6 +79,21 @@ impl Default for GenesisGeneration {
}
}

// IndexMap<addr, private_key>
pub type AleoAddrMap = IndexMap<String, String>;

#[derive(Debug, Clone)]
pub struct LoadedStorage {
/// Storage ID
pub id: String,
/// Path to storage data
pub path: PathBuf,
/// committee lookup
pub committee: AleoAddrMap,
/// other accounts files lookup
pub accounts: HashMap<String, AleoAddrMap>,
}

#[derive(Deserialize, Debug, Clone)]
pub struct LedgerGeneration {
pub output: PathBuf,
Expand Down Expand Up @@ -134,13 +156,13 @@ impl From<FilenameString> for String {
}

impl Document {
pub async fn prepare(self, state: &GlobalState) -> anyhow::Result<()> {
pub async fn prepare(&self, state: &GlobalState) -> anyhow::Result<usize> {
static STORAGE_ID_INT: AtomicUsize = AtomicUsize::new(0);

let id = String::from(self.id);
let id = String::from(self.id.clone());

// ensure this ID isn't already prepared
if state.storage.read().await.contains_right(&id) {
if state.storage_ids.read().await.contains_right(&id) {
// TODO: we probably don't want to warn here. instead, it would be nice to
// hash/checksum the storage to compare it with the conflicting storage
warn!("a storage with the id {id} has already been prepared");
Expand All @@ -153,7 +175,7 @@ impl Document {

// TODO: respect self.prefer_existing

match self.generate {
match self.generate.clone() {
// generate the block and ledger if we have generation params
Some(mut generation) => 'generate: {
// warn if an existing block/ledger already exists
Expand All @@ -176,12 +198,13 @@ impl Document {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../target/release/snarkos-aot"),
);
let output = base.join(&generation.genesis.output);
let res = Command::new(bin)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.arg("genesis")
.arg("--output")
.arg(&generation.genesis.output)
.arg(&output)
.arg("--committee-size")
.arg(generation.genesis.committee.to_string())
.arg("--committee-output")
Expand All @@ -200,11 +223,8 @@ impl Document {
warn!("failed to run genesis generation command...");
}

if tokio::fs::try_exists(&generation.genesis.output)
.await
.is_err()
{
anyhow::bail!("failed to generate {:#?}", generation.genesis.output);
if tokio::fs::try_exists(&output).await.is_err() {
anyhow::bail!("failed to generate {:#?}", output);
}

let res = Command::new("tar")
Expand Down Expand Up @@ -261,11 +281,67 @@ impl Document {
}
}

let mut accounts = HashMap::new();
accounts.insert(
"accounts".to_owned(),
read_to_addrs(pick_additional_addr, base.join("accounts.json")).await?,
);

// todo: maybe update the loaded storage in global state if the hash
// of the storage document is different I guess...
// that might interfere with running tests, so I don't know

// add the prepared storage to the storage map
let mut storage_lock = state.storage.write().await;
let mut storage_lock = state.storage_ids.write().await;
let int_id = STORAGE_ID_INT.fetch_add(1, Ordering::Relaxed);
storage_lock.insert(int_id, id.to_owned());

Ok(())
let mut storage_lock = state.storage.write().await;
storage_lock.insert(
int_id,
LoadedStorage {
id: id.to_owned(),
path: base.clone(),
committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?,
accounts,
},
);

Ok(int_id)
}
}

fn pick_additional_addr(entry: (String, u64, Option<serde_json::Value>)) -> String {
entry.0
}
fn pick_commitee_addr(entry: (String, u64)) -> String {
entry.0
}

async fn read_to_addrs<T: DeserializeOwned>(
f: impl Fn(T) -> String,
file: PathBuf,
) -> anyhow::Result<AleoAddrMap> {
let data = tokio::fs::read_to_string(&file)
.await
.map_err(|e| anyhow!("error reading balances {file:?}: {e}"))?;
let parsed: IndexMap<String, T> =
serde_json::from_str(&data).map_err(|e| anyhow!("error parsing balances {file:?}: {e}"))?;

Ok(parsed.into_iter().map(|(k, v)| (k, f(v))).collect())
}

impl LoadedStorage {
pub fn lookup_keysource(&self, key: &KeySource) -> Option<String> {
match key {
KeySource::Literal(pk) => Some(pk.clone()),
KeySource::Committee(Some(i)) => self.committee.get_index(*i).map(|(_, pk)| pk.clone()),
KeySource::Committee(None) => None,
KeySource::Named(name, Some(i)) => self
.accounts
.get(name)
.and_then(|a| a.get_index(*i).map(|(_, pk)| pk.clone())),
KeySource::Named(_name, None) => None,
}
}
}
8 changes: 7 additions & 1 deletion crates/snot/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ async fn redirect_storage(
Path((storage_id, ty)): Path<(usize, StorageType)>,
state: State<AppState>,
) -> Response {
let Some(real_id) = state.storage.read().await.get_by_left(&storage_id).cloned() else {
let Some(real_id) = state
.storage_ids
.read()
.await
.get_by_left(&storage_id)
.cloned()
else {
return StatusCode::NOT_FOUND.into_response();
};

Expand Down
1 change: 1 addition & 0 deletions crates/snot/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub async fn start(cli: Cli) -> Result<()> {
cli,
db,
pool: Default::default(),
storage_ids: Default::default(),
storage: Default::default(),
tests_counter: Default::default(),
tests: Default::default(),
Expand Down
4 changes: 3 additions & 1 deletion crates/snot/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::sync::RwLock;

use crate::{
cli::Cli,
schema::storage::LoadedStorage,
server::jwt::{Claims, JWT_NONCE, JWT_SECRET},
testing::Test,
};
Expand All @@ -36,7 +37,8 @@ pub struct GlobalState {
pub db: Surreal<Db>,
pub pool: RwLock<HashMap<AgentId, Agent>>,
/// A map from ephemeral integer storage ID to actual storage ID.
pub storage: RwLock<BiMap<usize, String>>,
pub storage_ids: RwLock<BiMap<usize, String>>,
pub storage: RwLock<HashMap<usize, LoadedStorage>>,

pub tests_counter: AtomicUsize,
pub tests: RwLock<HashMap<usize, Test>>,
Expand Down
69 changes: 32 additions & 37 deletions crates/snot/src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::atomic::Ordering;

use anyhow::{anyhow, bail, ensure};
use bimap::BiMap;
use bimap::{BiHashMap, BiMap};
use futures_util::future::join_all;
use indexmap::{map::Entry, IndexMap};
use serde::Deserialize;
Expand All @@ -10,16 +10,15 @@ use tracing::{info, warn};

use crate::{
schema::{
nodes::{ExternalNode, KeySource, Node},
storage::FilenameString,
nodes::{ExternalNode, Node},
ItemDocument, NodeTargets,
},
state::GlobalState,
};

#[derive(Debug, Clone)]
pub struct Test {
pub storage_id: FilenameString,
pub storage_id: usize,
pub node_map: BiMap<NodeKey, TestPeer>,
pub initial_nodes: IndexMap<NodeKey, TestNode>,
// TODO: GlobalStorage.storage should maybe be here instead
Expand Down Expand Up @@ -62,22 +61,18 @@ impl Test {
) -> anyhow::Result<usize> {
let mut state_lock = state.tests.write().await;

let Some(storage_id) = documents.iter().find_map(|s| match s {
ItemDocument::Storage(storage) => Some(storage.id.clone()),
_ => None,
}) else {
bail!("no storage document found in test")
};

let mut test = Test {
storage_id,
node_map: Default::default(),
initial_nodes: Default::default(),
};
let mut storage_id = None;
let mut node_map = BiHashMap::default();
let mut initial_nodes = IndexMap::default();

for document in documents {
match document {
ItemDocument::Storage(storage) => storage.prepare(state).await?,
ItemDocument::Storage(storage) => {
let int_id = storage.prepare(state).await?;
if storage_id.is_none() {
storage_id = Some(int_id);
}
}
ItemDocument::Nodes(nodes) => {
// flatten replicas
for (doc_node_key, mut doc_node) in nodes.nodes {
Expand All @@ -97,7 +92,7 @@ impl Test {
// nodes in flattened_nodes have replicas unset
doc_node.replicas.take();

match test.initial_nodes.entry(node_key) {
match initial_nodes.entry(node_key) {
Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()),
Entry::Vacant(ent) => {
// replace the key with a new one
Expand All @@ -117,7 +112,7 @@ impl Test {
let num_online_agents = online_agents.clone().count();

ensure!(
num_online_agents >= test.initial_nodes.len(),
num_online_agents >= initial_nodes.len(),
"not enough online agents to satisfy node topology"
);

Expand All @@ -126,8 +121,8 @@ impl Test {
// agent best suited to be a node,
// instead of naively picking an agent to fill the needs of
// a node
test.node_map.extend(
test.initial_nodes
node_map.extend(
initial_nodes
.keys()
.cloned()
.zip(online_agents.map(|agent| TestPeer::Internal(agent.id()))),
Expand All @@ -136,12 +131,12 @@ impl Test {
// append external nodes to the node map

for (node_key, node) in &nodes.external {
match test.initial_nodes.entry(node_key.clone()) {
match initial_nodes.entry(node_key.clone()) {
Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()),
Entry::Vacant(ent) => ent.insert(TestNode::External(node.to_owned())),
};
}
test.node_map.extend(
node_map.extend(
nodes
.external
.keys()
Expand All @@ -154,7 +149,11 @@ impl Test {
}
}

// set the test on the global state
let test = Test {
storage_id: storage_id.ok_or_else(|| anyhow!("test is missing storage document"))?,
node_map,
initial_nodes,
};

let test_id = state.tests_counter.fetch_add(1, Ordering::Relaxed);
state_lock.insert(test_id, test);
Expand All @@ -166,8 +165,6 @@ impl Test {
Ok(test_id)
}

// TODO: cleanup by test id, rather than cleanup EVERY agent...

pub async fn cleanup(id: &usize, state: &GlobalState) -> anyhow::Result<()> {
// clear the test state
{
Expand Down Expand Up @@ -234,12 +231,11 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul
.ok_or_else(|| anyhow!("test not found"))?;

// get the numeric storage ID from the string storage ID
let storage_id = {
let storage_lock = state.storage.read().await;
match storage_lock.get_by_right(test.storage_id.as_str()) {
Some(id) => *id,
None => bail!("invalid storage ID specified for node"),
}
let storage_id = test.storage_id;

// obtain the actual storage
let Some(storage) = state.storage.read().await.get(&storage_id).cloned() else {
bail!("test {id} storage {storage_id} not found...")
};

let pool_lock = state.pool.read().await;
Expand Down Expand Up @@ -320,11 +316,10 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul

// resolve the peers and validators
let mut node_state = node.into_state(key.ty);
node_state.private_key = node.key.as_ref().map(|key| match key {
KeySource::Literal(pk) => pk.to_owned(),
KeySource::Committee(_i) => todo!(),
KeySource::Named(_, _) => todo!(),
});
node_state.private_key = node
.key
.as_ref()
.and_then(|key| storage.lookup_keysource(key));
node_state.peers = matching_nodes(key, &node.peers, false)?;
node_state.validators = matching_nodes(key, &node.validators, true)?;

Expand Down
Loading

0 comments on commit 28c70ff

Please sign in to comment.