From 9ee7e10a955b7de0d5fdf9e49a59b944d374b94a Mon Sep 17 00:00:00 2001 From: Meshiest Date: Sat, 23 Mar 2024 22:08:17 -0500 Subject: [PATCH 01/11] chore: tx cannon todos --- crates/aot/src/ledger/query.rs | 2 + crates/snot/src/cannon/mod.rs | 145 +++++++++++++++++++++++++++++++++ crates/snot/src/main.rs | 1 + crates/snot/src/server/api.rs | 2 - 4 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 crates/snot/src/cannon/mod.rs diff --git a/crates/aot/src/ledger/query.rs b/crates/aot/src/ledger/query.rs index 81076f5f..24550ff7 100644 --- a/crates/aot/src/ledger/query.rs +++ b/crates/aot/src/ledger/query.rs @@ -81,6 +81,8 @@ impl LedgerQuery { .route("/mainnet/block/hash/latest", get(Self::latest_hash)) .route("/mainnet/transaction/broadcast", post(Self::broadcast_tx)) .route("/block", post(Self::add_block)) + // TODO: for ahead of time ledger generation, support a /beacon_block endpoint to write beacon block + // TODO: api to get and decrypt records for a private key .with_state(Arc::new(state)); let listener = tokio::net::TcpListener::bind(SocketAddr::new(self.bind, self.port)).await?; diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs new file mode 100644 index 00000000..ad461b59 --- /dev/null +++ b/crates/snot/src/cannon/mod.rs @@ -0,0 +1,145 @@ +use std::collections::{HashSet, VecDeque}; + +use snot_common::state::NodeKey; +use tokio::process::Child; + +use crate::schema::NodeTargets; + +/* + +STEP ONE +cannon transaction source: (GEN OR PLAYBACK) +- AOT: storage file +- REALTIME: generate executions from available agents?? via rpc + + +STEP 2 +cannon query source: +/cannon//mainnet/latest/stateRoot forwards to one of the following: +- REALTIME-(GEN|PLAYBACK): (test_id, node-key) with a rest ports Client/Validator only +- AOT-GEN: ledger service locally (file mode) +- AOT-PLAYBACK: n/a + +STEP 3 +cannon broadcast ALWAYS HITS control plane at +/cannon//mainnet/transaction/broadcast +cannon TX OUTPUT pointing at +- REALTIME: (test_id, node-key) +- AOT: file + + +cannon rate +cannon buffer size +burst mode?? + +*/ + +/// Represents an instance of a local query service. +#[derive(Debug)] +struct LocalQueryService { + /// child process running the ledger query service + child: Child, + /// Ledger & genesis block to use + pub storage_id: usize, + /// port to host the service on (needs to be unused by other cannons and services) + /// this port will be use when forwarding requests to the local query service + pub port: u16, + + // TODO debate this + /// An optional node to sync blocks from... + /// necessary for private tx mode in realtime mode as this will have to + /// sync from a node that has a valid ledger + /// + /// When present, the cannon will update the ledger service from this node + /// if the node is out of sync, it will corrupt the ledger... + pub sync_from: Option<(NodeKey, usize)>, +} + +/// Used to determine the redirection for the following paths: +/// /cannon//mainnet/latest/stateRoot +/// /cannon//mainnet/transaction/broadcast +#[derive(Debug)] +enum LedgerQueryService { + /// Use the local ledger query service + Local(LocalQueryService), + /// Target a specific node (probably over rpc instead of reqwest lol...) + Node { target: NodeKey, test_id: usize }, +} + +/// Which service is providing the compute power for executing transactions +#[derive(Debug)] +enum ComputeTarget { + /// Use the agent pool to generate executions + AgentPool, + /// Use demox' API to generate executions + Demox, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub enum CreditsTxMode { + BondPublic, + UnbondPublic, + TransferPublic, + TransferPublicToPrivate, + // cannot run these in aot mode + TransferPrivate, + TransferPrivateToPublic, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub enum TxMode { + Credits(CreditsTxMode), + // TODO: Program(program, func, input types??) +} + +#[derive(Debug)] +enum TxSource { + /// Read transactions from a file + AoT { + storage_id: usize, + // filename for the tx list + name: String, + }, + /// Generate transactions in real time + RealTime { + query: LedgerQueryService, + compute: ComputeTarget, + + tx_modes: HashSet, + + /// buffer of transactions to send + tx_buffer: VecDeque, + + /// how many transactions to buffer before firing a burst + min_buffer_size: usize, + }, +} + +#[derive(Debug)] +enum TxSink { + /// Write transactions to a file + AoT { + storage_id: usize, + /// filename for the recording txs list + name: String, + }, + /// Send transactions to nodes in a test + RealTime { + target: NodeTargets, + test_id: usize, + + /// How long between each burst of transactions + burst_delay_ms: u32, + /// How many transactions to fire off in each burst + tx_per_burst: u32, + /// How long between each transaction in a burst + tx_delay_ms: u32, + }, +} + +/// Transaction cannon +#[derive(Debug)] +pub struct TestCannon { + source: TxSource, + sink: TxSink, +} diff --git a/crates/snot/src/main.rs b/crates/snot/src/main.rs index 3ea56a2a..b27461e9 100644 --- a/crates/snot/src/main.rs +++ b/crates/snot/src/main.rs @@ -5,6 +5,7 @@ use cli::Cli; use tracing::level_filters::LevelFilter; use tracing_subscriber::prelude::*; +pub mod cannon; pub mod cli; pub mod schema; pub mod server; diff --git a/crates/snot/src/server/api.rs b/crates/snot/src/server/api.rs index eec27195..28c1daff 100644 --- a/crates/snot/src/server/api.rs +++ b/crates/snot/src/server/api.rs @@ -70,8 +70,6 @@ async fn post_test_prepare(state: State, body: String) -> Response { // TODO: clean up existing test - // TODO: support concurrent tests + return test id - match Test::prepare(documents, &state).await { Ok(test_id) => (StatusCode::OK, Json(json!({ "id": test_id }))).into_response(), Err(e) => ( From 003e97d2421bc8e6ec01b6e9b0ede6fc1c232e2c Mon Sep 17 00:00:00 2001 From: Meshiest Date: Sun, 24 Mar 2024 12:36:00 -0500 Subject: [PATCH 02/11] feat(cannon): state root reverse proxy to agent/ledger --- Cargo.lock | 1 + crates/snot-agent/Cargo.toml | 2 +- crates/snot-agent/src/rpc.rs | 25 +++- crates/snot-common/src/rpc/agent.rs | 13 ++ crates/snot/Cargo.toml | 1 + crates/snot/src/cannon/mod.rs | 211 ++++++++++++++-------------- crates/snot/src/cannon/router.rs | 57 ++++++++ crates/snot/src/cannon/sink.rs | 44 ++++++ crates/snot/src/cannon/source.rs | 120 ++++++++++++++++ crates/snot/src/server/mod.rs | 3 + crates/snot/src/state.rs | 34 ++++- 11 files changed, 402 insertions(+), 109 deletions(-) create mode 100644 crates/snot/src/cannon/router.rs create mode 100644 crates/snot/src/cannon/sink.rs create mode 100644 crates/snot/src/cannon/source.rs diff --git a/Cargo.lock b/Cargo.lock index 37cfe712..18d3a962 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5274,6 +5274,7 @@ dependencies = [ "rand", "rand_chacha", "regex", + "reqwest 0.12.0", "serde", "serde_json", "serde_yaml", diff --git a/crates/snot-agent/Cargo.toml b/crates/snot-agent/Cargo.toml index dfdb142e..e3e37c83 100644 --- a/crates/snot-agent/Cargo.toml +++ b/crates/snot-agent/Cargo.toml @@ -15,7 +15,7 @@ futures-util.workspace = true http.workspace = true httpdate = "1.0.3" local-ip-address = "0.6.1" -reqwest = { version = "0.12.0", features = ["stream"] } +reqwest = { workspace = true, features = ["stream"] } snot-common = { path = "../snot-common" } tarpc.workspace = true tokio.workspace = true diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index a95fad25..87b2a1e7 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -2,7 +2,9 @@ use std::{collections::HashSet, net::IpAddr, ops::Deref, process::Stdio, sync::A use snot_common::{ rpc::{ - agent::{AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError}, + agent::{ + AgentError, AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError, + }, control::{ControlServiceRequest, ControlServiceResponse}, MuxMessage, }, @@ -364,4 +366,25 @@ impl AgentService for AgentRpcServer { self.state.internal_addrs.clone(), ) } + + async fn get_state_root(self, _: context::Context) -> Result { + if !matches!( + self.state.agent_state.read().await.deref(), + AgentState::Node(_, _) + ) { + return Err(AgentError::InvalidState); + } + + let url = format!( + "http://127.0.0.1:{}/mainnet/latest/stateRoot", + self.state.cli.rest + ); + let response = reqwest::get(&url) + .await + .map_err(|_| AgentError::FailedToMakeRequest)?; + response + .json() + .await + .map_err(|_| AgentError::FailedToParseJson) + } } diff --git a/crates/snot-common/src/rpc/agent.rs b/crates/snot-common/src/rpc/agent.rs index 08a48343..dbc5306d 100644 --- a/crates/snot-common/src/rpc/agent.rs +++ b/crates/snot-common/src/rpc/agent.rs @@ -19,6 +19,9 @@ pub trait AgentService { /// Control plane instructs the agent to reconcile towards a particular /// state. async fn reconcile(to: AgentState) -> Result<(), ReconcileError>; + + /// Get the state root from the running node + async fn get_state_root() -> Result; } #[derive(Debug, Error, Serialize, Deserialize)] @@ -32,3 +35,13 @@ pub enum ReconcileError { #[error("unknown error")] Unknown, } + +#[derive(Debug, Error, Serialize, Deserialize)] +pub enum AgentError { + #[error("invalid agent state")] + InvalidState, + #[error("failed to parse json")] + FailedToParseJson, + #[error("failed to make a request")] + FailedToMakeRequest, +} diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 85026380..6eeac400 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -18,6 +18,7 @@ lazy_static.workspace = true rand.workspace = true rand_chacha.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index ad461b59..8829ca1f 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -1,9 +1,20 @@ -use std::collections::{HashSet, VecDeque}; +pub mod router; +pub mod sink; +pub mod source; -use snot_common::state::NodeKey; -use tokio::process::Child; +use std::sync::Arc; -use crate::schema::NodeTargets; +use anyhow::{bail, ensure, Result}; + +use tokio::{ + sync::{mpsc::UnboundedSender, Mutex as AsyncMutex}, + task::AbortHandle, +}; +use tracing::warn; + +use crate::{cannon::source::LedgerQueryService, state::GlobalState}; + +use self::{sink::TxSink, source::TxSource}; /* @@ -34,112 +45,102 @@ burst mode?? */ -/// Represents an instance of a local query service. -#[derive(Debug)] -struct LocalQueryService { - /// child process running the ledger query service - child: Child, - /// Ledger & genesis block to use - pub storage_id: usize, - /// port to host the service on (needs to be unused by other cannons and services) - /// this port will be use when forwarding requests to the local query service - pub port: u16, - - // TODO debate this - /// An optional node to sync blocks from... - /// necessary for private tx mode in realtime mode as this will have to - /// sync from a node that has a valid ledger - /// - /// When present, the cannon will update the ledger service from this node - /// if the node is out of sync, it will corrupt the ledger... - pub sync_from: Option<(NodeKey, usize)>, -} - -/// Used to determine the redirection for the following paths: -/// /cannon//mainnet/latest/stateRoot -/// /cannon//mainnet/transaction/broadcast -#[derive(Debug)] -enum LedgerQueryService { - /// Use the local ledger query service - Local(LocalQueryService), - /// Target a specific node (probably over rpc instead of reqwest lol...) - Node { target: NodeKey, test_id: usize }, -} - -/// Which service is providing the compute power for executing transactions +/// Transaction cannon #[derive(Debug)] -enum ComputeTarget { - /// Use the agent pool to generate executions - AgentPool, - /// Use demox' API to generate executions - Demox, -} +pub struct TestCannon { + // a copy of the global state + global_state: Arc, -#[derive(Debug, Hash, PartialEq, Eq)] -pub enum CreditsTxMode { - BondPublic, - UnbondPublic, - TransferPublic, - TransferPublicToPrivate, - // cannot run these in aot mode - TransferPrivate, - TransferPrivateToPublic, -} + source: TxSource, + sink: TxSink, -#[derive(Debug, Hash, PartialEq, Eq)] -pub enum TxMode { - Credits(CreditsTxMode), - // TODO: Program(program, func, input types??) -} + /// channel to send transactions to the the task + tx_sender: UnboundedSender, -#[derive(Debug)] -enum TxSource { - /// Read transactions from a file - AoT { - storage_id: usize, - // filename for the tx list - name: String, - }, - /// Generate transactions in real time - RealTime { - query: LedgerQueryService, - compute: ComputeTarget, - - tx_modes: HashSet, - - /// buffer of transactions to send - tx_buffer: VecDeque, - - /// how many transactions to buffer before firing a burst - min_buffer_size: usize, - }, -} + /// The test_id associated with this cannon. + /// To point at an external node, create a topology with external node + test_id: Option, -#[derive(Debug)] -enum TxSink { - /// Write transactions to a file - AoT { - storage_id: usize, - /// filename for the recording txs list - name: String, - }, - /// Send transactions to nodes in a test - RealTime { - target: NodeTargets, - test_id: usize, - - /// How long between each burst of transactions - burst_delay_ms: u32, - /// How many transactions to fire off in each burst - tx_per_burst: u32, - /// How long between each transaction in a burst - tx_delay_ms: u32, - }, + // TODO: run the actual cannon in this task + task: AsyncMutex, } -/// Transaction cannon -#[derive(Debug)] -pub struct TestCannon { - source: TxSource, - sink: TxSink, +impl TestCannon { + pub fn new( + global_state: Arc, + source: TxSource, + sink: TxSink, + test_id: Option, + ) -> Result { + ensure!( + (source.needs_test_id() || sink.needs_test_id()) != test_id.is_some(), + "Test ID must be provided if either source or sink requires it" + ); + + // TODO: maybe Arc, then pass it to this task + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let tx_sender = tx.clone(); + + let handle = tokio::spawn(async move { + // TODO: write tx to sink at desired rate + let _tx = rx.recv().await; + + std::future::pending::<()>().await + }); + + Ok(Self { + global_state, + source, + sink, + test_id, + tx_sender, + task: AsyncMutex::new(handle.abort_handle()), + }) + } + + /// Called by axum to forward /cannon//mainnet/latest/stateRoot + /// to the ledger query service's /mainnet/latest/stateRoot + pub async fn proxy_state_root(&self) -> Result { + match &self.source { + TxSource::RealTime { query, .. } => match query { + LedgerQueryService::Local(qs) => qs.get_state_root().await, + LedgerQueryService::Node(key) => { + // test_id must be Some because LedgerQueryService::Node requires it + let Some(agent_id) = self + .global_state + .get_test_agent(self.test_id.unwrap(), key) + .await + else { + bail!("cannon target agent not found") + }; + + let Some(client) = self.global_state.get_client(agent_id).await else { + bail!("cannon target agent is offline") + }; + + // call client's rpc method to get the state root + // this will fail if the client is not running a node + client.get_state_root().await + } + }, + TxSource::AoTPlayback { .. } => { + bail!("cannon is configured to playback from file.") + } + } + } + + /// Called by axum to forward /cannon//mainnet/transaction/broadcast + /// to the desired sink + pub fn proxy_broadcast(&self, body: String) -> Result<()> { + match &self.source { + TxSource::RealTime { .. } => { + self.tx_sender.send(body)?; + } + TxSource::AoTPlayback { .. } => { + warn!("cannon received broadcasted transaction in playback mode. ignoring.") + } + } + Ok(()) + } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs new file mode 100644 index 00000000..489f9c76 --- /dev/null +++ b/crates/snot/src/cannon/router.rs @@ -0,0 +1,57 @@ +use axum::{ + extract::{Path, State}, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use reqwest::StatusCode; +use serde_json::json; + +use crate::state::AppState; + +pub(crate) fn redirect_cannon_routes() -> Router { + Router::new() + .route("/:id/mainnet/latest/stateRoot", get(state_root)) + .route("/:id/mainnet/transaction/broadcast", post(transaction)) +} + +async fn state_root(Path(cannon_id): Path, state: State) -> Response { + let Some(cannon) = ({ + let cannons = state.cannons.read().await; + cannons.get(&cannon_id).cloned() + }) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + match cannon.proxy_state_root().await { + // the nodes expect this state root to be string escaped json + Ok(root) => Json(json!(root)).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} + +async fn transaction( + Path(cannon_id): Path, + state: State, + body: String, +) -> Response { + let Some(cannon) = ({ + let cannons = state.cannons.read().await; + cannons.get(&cannon_id).cloned() + }) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + match cannon.proxy_broadcast(body) { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs new file mode 100644 index 00000000..67b2eac8 --- /dev/null +++ b/crates/snot/src/cannon/sink.rs @@ -0,0 +1,44 @@ +use tokio::process::Child; + +use crate::schema::NodeTargets; + +#[derive(Debug)] +pub enum TxSink { + /// Write transactions to a file + AoTRecord { + storage_id: usize, + /// filename for the recording txs list + name: String, + }, + /// Write transactions to a ledger query service + AoTAppend { + // information for running .. another ledger service + // solely for appending blocks to a ledger... + storage_id: usize, + child: Child, + port: u16, + + /// Number of transactions per block + tx_per_block: u32, + }, + /// Send transactions to nodes in a test + RealTime { + /// The nodes to send transactions to + /// + /// Requires cannon to have an associated test_id + target: NodeTargets, + + /// How long between each burst of transactions + burst_delay_ms: u32, + /// How many transactions to fire off in each burst + tx_per_burst: u32, + /// How long between each transaction in a burst + tx_delay_ms: u32, + }, +} + +impl TxSink { + pub fn needs_test_id(&self) -> bool { + matches!(self, Self::RealTime { .. }) + } +} diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs new file mode 100644 index 00000000..2850b9f1 --- /dev/null +++ b/crates/snot/src/cannon/source.rs @@ -0,0 +1,120 @@ +use std::collections::HashSet; + +use anyhow::Result; +use snot_common::state::NodeKey; +use tokio::process::Child; + +/// Represents an instance of a local query service. +#[derive(Debug)] +pub struct LocalQueryService { + /// child process running the ledger query service + child: Child, + /// Ledger & genesis block to use + pub storage_id: usize, + /// port to host the service on (needs to be unused by other cannons and services) + /// this port will be use when forwarding requests to the local query service + pub port: u16, + + // TODO debate this + /// An optional node to sync blocks from... + /// necessary for private tx mode in realtime mode as this will have to + /// sync from a node that has a valid ledger + /// + /// When present, the cannon will update the ledger service from this node + /// if the node is out of sync, it will corrupt the ledger... + /// + /// requires cannon to have an associated test_id + pub sync_from: Option, +} + +impl LocalQueryService { + // TODO: cache this when sync_from is false + /// Fetch the state root from the local query service + /// (non-cached) + pub async fn get_state_root(&self) -> Result { + let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", self.port); + let response = reqwest::get(&url).await?; + Ok(response.text().await?) + } +} + +/// Used to determine the redirection for the following paths: +/// /cannon//mainnet/latest/stateRoot +/// /cannon//mainnet/transaction/broadcast +#[derive(Debug)] +pub enum LedgerQueryService { + /// Use the local ledger query service + Local(LocalQueryService), + /// Target a specific node (probably over rpc instead of reqwest lol...) + /// + /// Requires cannon to have an associated test_id + Node(NodeKey), +} + +impl LedgerQueryService { + pub fn needs_test_id(&self) -> bool { + match self { + LedgerQueryService::Node(_) => true, + LedgerQueryService::Local(LocalQueryService { sync_from, .. }) => sync_from.is_some(), + _ => false, + } + } +} + +/// Which service is providing the compute power for executing transactions +#[derive(Debug)] +pub enum ComputeTarget { + /// Use the agent pool to generate executions + AgentPool, + /// Use demox' API to generate executions + Demox, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub enum CreditsTxMode { + BondPublic, + UnbondPublic, + TransferPublic, + TransferPublicToPrivate, + // cannot run these in aot mode + TransferPrivate, + TransferPrivateToPublic, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +pub enum TxMode { + Credits(CreditsTxMode), + // TODO: Program(program, func, input types??) +} + +#[derive(Debug)] +pub enum TxSource { + /// Read transactions from a file + AoTPlayback { + storage_id: usize, + // filename for the tx list + name: String, + // TODO: is this enum a config or state enum? + // if it solely config, we may need to put a nonblocking appender + // somewhere else + }, + /// Generate transactions in real time + RealTime { + query: LedgerQueryService, + compute: ComputeTarget, + + tx_modes: HashSet, + + /// how many transactions to buffer before firing a burst + min_buffer_size: usize, + }, +} + +impl TxSource { + pub fn needs_test_id(&self) -> bool { + match self { + TxSource::RealTime { query, .. } => query.needs_test_id(), + _ => false, + } + } +} diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index 5a3353ef..378c3cfd 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -29,6 +29,7 @@ use self::{ rpc::ControlRpcServer, }; use crate::{ + cannon::router::redirect_cannon_routes, cli::Cli, server::rpc::{MuxedMessageIncoming, MuxedMessageOutgoing}, state::{Agent, AppState, GlobalState}, @@ -51,6 +52,7 @@ pub async fn start(cli: Cli) -> Result<()> { storage: Default::default(), tests_counter: Default::default(), tests: Default::default(), + cannons: Default::default(), }; let app = Router::new() @@ -58,6 +60,7 @@ pub async fn start(cli: Cli) -> Result<()> { .nest("/api/v1", api::routes()) // /env//ledger/* - ledger query service reverse proxying /mainnet/latest/stateRoot .nest("/content", content::init_routes(&state).await) + .nest("/cannons", redirect_cannon_routes()) .with_state(Arc::new(state)) .layer( TraceLayer::new_for_http() diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index 74be5924..7d1d3055 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -13,17 +13,18 @@ use bimap::BiMap; use jwt::SignWithKey; use snot_common::{ rpc::agent::{AgentServiceClient, ReconcileError}, - state::{AgentState, PortConfig}, + state::{AgentState, NodeKey, PortConfig}, }; use surrealdb::{engine::local::Db, Surreal}; use tarpc::{client::RpcError, context}; use tokio::sync::RwLock; use crate::{ + cannon::TestCannon, cli::Cli, schema::storage::LoadedStorage, server::jwt::{Claims, JWT_NONCE, JWT_SECRET}, - testing::Test, + testing::{Test, TestPeer}, }; pub type AgentId = usize; @@ -39,6 +40,7 @@ pub struct GlobalState { /// A map from ephemeral integer storage ID to actual storage ID. pub storage_ids: RwLock>, pub storage: RwLock>, + pub cannons: RwLock>>, pub tests_counter: AtomicUsize, pub tests: RwLock>, @@ -183,6 +185,10 @@ impl AgentClient { pub async fn reconcile(&self, to: AgentState) -> Result, RpcError> { self.0.reconcile(context::current(), to).await } + + pub async fn get_state_root(&self) -> Result { + Ok(self.0.get_state_root(context::current()).await??) + } } #[derive(Debug, Clone)] @@ -255,4 +261,28 @@ impl GlobalState { }) .collect() } + + /// Lookup an rpc client by agent id. + /// Locks pools for reading + pub async fn get_client(&self, id: AgentId) -> Option { + self.pool + .read() + .await + .get(&id) + .and_then(|a| a.client_owned()) + } + + /// Lookup a test agent id by test id and node key. + /// Locks tests for reading + pub async fn get_test_agent(&self, test_id: usize, node: &NodeKey) -> Option { + self.tests + .read() + .await + .get(&test_id) + .and_then(|t| t.node_map.get_by_left(node)) + .and_then(|id| match id { + TestPeer::Internal(id) => Some(*id), + TestPeer::External => None, + }) + } } From e6b29bba3462ea4894f4615bf4ed2995659b8f92 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 12:23:25 -0500 Subject: [PATCH 03/11] refactor(cannon): move state out of config types --- crates/snot/src/cannon/mod.rs | 116 +++++++++++++++++++++++++----- crates/snot/src/cannon/net.rs | 7 ++ crates/snot/src/cannon/sink.rs | 11 ++- crates/snot/src/cannon/source.rs | 48 +++++++------ crates/snot/src/schema/storage.rs | 21 +++--- crates/snot/src/state.rs | 22 ++---- crates/snot/src/testing.rs | 21 ++++-- 7 files changed, 170 insertions(+), 76 deletions(-) create mode 100644 crates/snot/src/cannon/net.rs diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 8829ca1f..c9b5ed31 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -1,8 +1,12 @@ +mod net; pub mod router; pub mod sink; pub mod source; -use std::sync::Arc; +use std::{ + collections::HashSet, + sync::{atomic::AtomicU32, Arc}, +}; use anyhow::{bail, ensure, Result}; @@ -12,7 +16,12 @@ use tokio::{ }; use tracing::warn; -use crate::{cannon::source::LedgerQueryService, state::GlobalState}; +use crate::{ + cannon::source::LedgerQueryService, + schema::{storage::LoadedStorage, timeline::EventDuration}, + state::GlobalState, + testing::Test, +}; use self::{sink::TxSink, source::TxSource}; @@ -45,7 +54,8 @@ burst mode?? */ -/// Transaction cannon +/// Transaction cannon state +/// using the `TxSource` and `TxSink` for configuration. #[derive(Debug)] pub struct TestCannon { // a copy of the global state @@ -54,38 +64,98 @@ pub struct TestCannon { source: TxSource, sink: TxSink, - /// channel to send transactions to the the task - tx_sender: UnboundedSender, + /// How long this cannon will be fired for + duration: CannonDuration, - /// The test_id associated with this cannon. + /// The test_id/storage associated with this cannon. /// To point at an external node, create a topology with external node - test_id: Option, + /// To generate ahead-of-time, upload a test with a timeline referencing a + /// cannon pointing at a file + env: CannonEnv, + + /// Local query service port. Only present if the TxSource uses a local query source. + query_port: Option, // TODO: run the actual cannon in this task task: AsyncMutex, + + /// channel to send transactions to the the task + tx_sender: UnboundedSender, + fired_txs: AtomicU32, +} + +#[derive(Clone, Debug)] +struct CannonEnv { + test: Arc, + storage: Arc, +} + +#[derive(Clone, Debug)] +pub enum CannonDuration { + Forever, + Timeline(EventDuration), + Count(u32), } impl TestCannon { - pub fn new( + /// Create a new active transaction cannon + /// with the given source and sink. + /// + /// Locks the global state's tests and storage for reading. + pub async fn new( global_state: Arc, source: TxSource, sink: TxSink, - test_id: Option, + duration: CannonDuration, + test_id: usize, ) -> Result { ensure!( (source.needs_test_id() || sink.needs_test_id()) != test_id.is_some(), "Test ID must be provided if either source or sink requires it" ); - // TODO: maybe Arc, then pass it to this task + // mapping with async is ugly and blocking_read is scary + let env = { + let Some(test) = global_state.tests.read().await.get(&test_id).cloned() else { + bail!("test {test_id} not found") + }; + + let storage_lock = global_state.storage.read().await; + let Some(storage) = storage_lock.get(&test.storage_id).cloned() else { + bail!("test {test_id} storage {} not found", test.storage_id) + }; + + CannonEnv { test, storage } + }; + let env2 = env.clone(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx_sender = tx.clone(); + let query_port = source.get_query_port()?; + + let fired_txs = AtomicU32::new(0); + let handle = tokio::spawn(async move { // TODO: write tx to sink at desired rate let _tx = rx.recv().await; + // TODO: if a sink or a source uses node_keys or storage + // env will be used + println!("{}", env2.storage.id); + + // compare the tx id to an authorization id + let _pending_txs = HashSet::::new(); + + // TODO: if a local query service exists, spawn it here + // kill on drop + + // TODO: determine the rate that transactions need to be created + // based on the sink + + // TODO: if source is realtime, generate authorizations and + // send them to any available agent + std::future::pending::<()>().await }); @@ -93,9 +163,12 @@ impl TestCannon { global_state, source, sink, - test_id, + env, tx_sender, + query_port, task: AsyncMutex::new(handle.abort_handle()), + fired_txs, + duration, }) } @@ -104,13 +177,17 @@ impl TestCannon { pub async fn proxy_state_root(&self) -> Result { match &self.source { TxSource::RealTime { query, .. } => match query { - LedgerQueryService::Local(qs) => qs.get_state_root().await, + LedgerQueryService::Local(qs) => { + if let Some(port) = self.query_port { + qs.get_state_root(port).await + } else { + bail!("cannon is missing a query port") + } + } LedgerQueryService::Node(key) => { // test_id must be Some because LedgerQueryService::Node requires it - let Some(agent_id) = self - .global_state - .get_test_agent(self.test_id.unwrap(), key) - .await + let Some(agent_id) = + self.env.as_ref().and_then(|t| t.test.get_agent_by_key(key)) else { bail!("cannon target agent not found") }; @@ -144,3 +221,10 @@ impl TestCannon { Ok(()) } } + +impl Drop for TestCannon { + fn drop(&mut self) { + // cancel the task on drop + self.task.blocking_lock().abort(); + } +} diff --git a/crates/snot/src/cannon/net.rs b/crates/snot/src/cannon/net.rs new file mode 100644 index 00000000..6edb20e9 --- /dev/null +++ b/crates/snot/src/cannon/net.rs @@ -0,0 +1,7 @@ +use std::net::{Ipv4Addr, SocketAddrV4, TcpListener}; + +/// Get an available port on the local machine. +pub fn get_available_port() -> Option { + let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); + Some(TcpListener::bind(addr).ok()?.local_addr().ok()?.port()) +} diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 67b2eac8..b7888d98 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -1,12 +1,11 @@ -use tokio::process::Child; +use serde::Deserialize; use crate::schema::NodeTargets; -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub enum TxSink { /// Write transactions to a file AoTRecord { - storage_id: usize, /// filename for the recording txs list name: String, }, @@ -14,10 +13,8 @@ pub enum TxSink { AoTAppend { // information for running .. another ledger service // solely for appending blocks to a ledger... - storage_id: usize, - child: Child, - port: u16, - + // storage_id: usize, + // port: u16, /// Number of transactions per block tx_per_block: u32, }, diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 2850b9f1..18654e92 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -1,19 +1,19 @@ use std::collections::HashSet; -use anyhow::Result; +use anyhow::{anyhow, Result}; +use serde::Deserialize; use snot_common::state::NodeKey; -use tokio::process::Child; + +use super::net::get_available_port; /// Represents an instance of a local query service. -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub struct LocalQueryService { - /// child process running the ledger query service - child: Child, /// Ledger & genesis block to use - pub storage_id: usize, + // pub storage_id: usize, /// port to host the service on (needs to be unused by other cannons and services) /// this port will be use when forwarding requests to the local query service - pub port: u16, + // pub port: u16, // TODO debate this /// An optional node to sync blocks from... @@ -31,8 +31,8 @@ impl LocalQueryService { // TODO: cache this when sync_from is false /// Fetch the state root from the local query service /// (non-cached) - pub async fn get_state_root(&self) -> Result { - let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", self.port); + pub async fn get_state_root(&self, port: u16) -> Result { + let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", port); let response = reqwest::get(&url).await?; Ok(response.text().await?) } @@ -41,7 +41,7 @@ impl LocalQueryService { /// Used to determine the redirection for the following paths: /// /cannon//mainnet/latest/stateRoot /// /cannon//mainnet/transaction/broadcast -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub enum LedgerQueryService { /// Use the local ledger query service Local(LocalQueryService), @@ -56,13 +56,12 @@ impl LedgerQueryService { match self { LedgerQueryService::Node(_) => true, LedgerQueryService::Local(LocalQueryService { sync_from, .. }) => sync_from.is_some(), - _ => false, } } } /// Which service is providing the compute power for executing transactions -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub enum ComputeTarget { /// Use the agent pool to generate executions AgentPool, @@ -70,7 +69,7 @@ pub enum ComputeTarget { Demox, } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Deserialize)] pub enum CreditsTxMode { BondPublic, UnbondPublic, @@ -81,22 +80,18 @@ pub enum CreditsTxMode { TransferPrivateToPublic, } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Deserialize)] pub enum TxMode { Credits(CreditsTxMode), // TODO: Program(program, func, input types??) } -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub enum TxSource { /// Read transactions from a file AoTPlayback { - storage_id: usize, - // filename for the tx list + // filename from the storage for the tx list name: String, - // TODO: is this enum a config or state enum? - // if it solely config, we may need to put a nonblocking appender - // somewhere else }, /// Generate transactions in real time RealTime { @@ -117,4 +112,17 @@ impl TxSource { _ => false, } } + + /// Get an available port for the query service if applicable + pub fn get_query_port(&self) -> Result> { + matches!( + self, + TxSource::RealTime { + query: LedgerQueryService::Local(_), + .. + } + ) + .then(|| get_available_port().ok_or(anyhow!("could not get an available port"))) + .transpose() + } } diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index 33b1ab85..e877404c 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -3,7 +3,10 @@ use std::{ ops::Deref, path::PathBuf, process::Stdio, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use anyhow::{anyhow, ensure}; @@ -296,16 +299,14 @@ impl Document { let int_id = STORAGE_ID_INT.fetch_add(1, Ordering::Relaxed); storage_lock.insert(int_id, id.to_owned()); + let storage = LoadedStorage { + id: id.to_owned(), + path: base.clone(), + committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?, + accounts, + }; let mut storage_lock = state.storage.write().await; - storage_lock.insert( - int_id, - LoadedStorage { - id: id.to_owned(), - path: base.clone(), - committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?, - accounts, - }, - ); + storage_lock.insert(int_id, Arc::new(storage)); Ok(int_id) } diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index 7d1d3055..b05cfdce 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -13,7 +13,7 @@ use bimap::BiMap; use jwt::SignWithKey; use snot_common::{ rpc::agent::{AgentServiceClient, ReconcileError}, - state::{AgentState, NodeKey, PortConfig}, + state::{AgentState, PortConfig}, }; use surrealdb::{engine::local::Db, Surreal}; use tarpc::{client::RpcError, context}; @@ -24,7 +24,7 @@ use crate::{ cli::Cli, schema::storage::LoadedStorage, server::jwt::{Claims, JWT_NONCE, JWT_SECRET}, - testing::{Test, TestPeer}, + testing::Test, }; pub type AgentId = usize; @@ -39,11 +39,11 @@ pub struct GlobalState { pub pool: RwLock>, /// A map from ephemeral integer storage ID to actual storage ID. pub storage_ids: RwLock>, - pub storage: RwLock>, + pub storage: RwLock>>, pub cannons: RwLock>>, pub tests_counter: AtomicUsize, - pub tests: RwLock>, + pub tests: RwLock>>, } /// This is the representation of a public addr or a list of internal addrs. @@ -271,18 +271,4 @@ impl GlobalState { .get(&id) .and_then(|a| a.client_owned()) } - - /// Lookup a test agent id by test id and node key. - /// Locks tests for reading - pub async fn get_test_agent(&self, test_id: usize, node: &NodeKey) -> Option { - self.tests - .read() - .await - .get(&test_id) - .and_then(|t| t.node_map.get_by_left(node)) - .and_then(|id| match id { - TestPeer::Internal(id) => Some(*id), - TestPeer::External => None, - }) - } } diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index 94234357..041f5943 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -1,4 +1,7 @@ -use std::{fmt::Display, sync::atomic::Ordering}; +use std::{ + fmt::Display, + sync::{atomic::Ordering, Arc}, +}; use anyhow::{anyhow, bail, ensure}; use bimap::{BiHashMap, BiMap}; @@ -172,7 +175,7 @@ impl Test { }; let test_id = state.tests_counter.fetch_add(1, Ordering::Relaxed); - state_lock.insert(test_id, test); + state_lock.insert(test_id, Arc::new(test)); drop(state_lock); // reconcile the nodes @@ -243,6 +246,14 @@ impl Test { Ok(()) } + + /// Lookup a test agent id by node key. + pub fn get_agent_by_key(&self, key: &NodeKey) -> Option { + self.node_map.get_by_left(key).and_then(|id| match id { + TestPeer::Internal(id) => Some(*id), + TestPeer::External => None, + }) + } } /// Reconcile all associated nodes with their initial state. @@ -331,11 +342,11 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul }; // get the internal agent ID from the node key - let Some(TestPeer::Internal(id)) = test.node_map.get_by_left(key) else { + let Some(id) = test.get_agent_by_key(key) else { bail!("expected internal agent peer for node with key {key}") }; - let Some(client) = pool_lock.get(id).and_then(|a| a.client_owned()) else { + let Some(client) = pool_lock.get(&id).and_then(|a| a.client_owned()) else { continue; }; @@ -349,7 +360,7 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul node_state.validators = matching_nodes(key, &node.validators, true)?; let agent_state = AgentState::Node(storage_id, node_state); - agent_ids.push(*id); + agent_ids.push(id); handles.push(tokio::spawn(async move { client .reconcile(agent_state.clone()) From 26de0beab3fecc6559e75b7e1676dae11a236850 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 12:30:40 -0500 Subject: [PATCH 04/11] fix(cannon): fix compile errors --- crates/snot/src/cannon/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index c9b5ed31..2a754da0 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -109,10 +109,10 @@ impl TestCannon { duration: CannonDuration, test_id: usize, ) -> Result { - ensure!( - (source.needs_test_id() || sink.needs_test_id()) != test_id.is_some(), - "Test ID must be provided if either source or sink requires it" - ); + // ensure!( + // (source.needs_test_id() || sink.needs_test_id()) != test_id.is_some(), + // "Test ID must be provided if either source or sink requires it" + // ); // mapping with async is ugly and blocking_read is scary let env = { @@ -186,9 +186,7 @@ impl TestCannon { } LedgerQueryService::Node(key) => { // test_id must be Some because LedgerQueryService::Node requires it - let Some(agent_id) = - self.env.as_ref().and_then(|t| t.test.get_agent_by_key(key)) - else { + let Some(agent_id) = self.env.test.get_agent_by_key(key) else { bail!("cannon target agent not found") }; From 43aa8ef414410019fc0c3152024c86af6580efd5 Mon Sep 17 00:00:00 2001 From: gluax <16431709+gluax@users.noreply.github.com> Date: Mon, 25 Mar 2024 17:30:22 -0700 Subject: [PATCH 05/11] feat(snot): initial planning and changes for remote genesis blocks --- Cargo.lock | 2 + crates/snot/Cargo.toml | 1 + crates/snot/src/schema/storage.rs | 69 ++++++++++++++++++++----------- specs/test-1-validators.yaml | 2 +- specs/test-4-clients-canary.yaml | 37 +++++++++++++++++ 5 files changed, 85 insertions(+), 26 deletions(-) create mode 100644 specs/test-4-clients-canary.yaml diff --git a/Cargo.lock b/Cargo.lock index 18d3a962..d9014737 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5287,6 +5287,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", + "url", "wildmatch 2.3.3", ] @@ -6314,6 +6315,7 @@ dependencies = [ "form_urlencoded", "idna 0.5.0", "percent-encoding", + "serde", ] [[package]] diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 6eeac400..6027c503 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -33,4 +33,5 @@ tower-http.workspace = true tracing-appender.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +url = { workspace = true, features = ["serde"] } wildmatch = "2.3.3" diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index e877404c..a2df73f7 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -18,9 +18,8 @@ use serde::{ use tokio::process::Command; use tracing::warn; -use crate::state::GlobalState; - use super::nodes::KeySource; +use crate::state::GlobalState; /// A storage document. Explains how storage for a test should be set up. #[derive(Deserialize, Debug, Clone)] @@ -32,6 +31,7 @@ pub struct Document { #[serde(default)] pub prefer_existing: bool, pub generate: Option, + pub connect: Option, } /// Data generation instructions. @@ -46,10 +46,19 @@ pub struct StorageGeneration { #[serde(default)] pub ledger: LedgerGeneration, + #[serde(default)] + pub accounts: Vec, + #[serde(default)] pub transactions: Vec, } +#[derive(Deserialize, Debug, Clone)] +pub struct Accounts { + pub file: PathBuf, + pub total: u64, +} + // TODO: I don't know what this type should look like #[derive(Deserialize, Debug, Clone)] pub struct Transaction { @@ -159,7 +168,7 @@ impl From for String { } impl Document { - pub async fn prepare(&self, state: &GlobalState) -> anyhow::Result { + pub async fn prepare(self, state: &GlobalState) -> anyhow::Result { static STORAGE_ID_INT: AtomicUsize = AtomicUsize::new(0); let id = String::from(self.id.clone()); @@ -178,7 +187,7 @@ impl Document { // TODO: respect self.prefer_existing - match self.generate.clone() { + match self.generate { // generate the block and ledger if we have generation params Some(mut generation) => 'generate: { // warn if an existing block/ledger already exists @@ -202,28 +211,38 @@ impl Document { .join("../../target/release/snarkos-aot"), ); let output = base.join(&generation.genesis.output); - let res = Command::new(bin) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .arg("genesis") - .arg("--output") - .arg(&output) - .arg("--committee-size") - .arg(generation.genesis.committee.to_string()) - .arg("--committee-output") - .arg(base.join("committee.json")) - .arg("--additional-accounts") - .arg(generation.genesis.additional_accounts.to_string()) - .arg("--additional-accounts-output") - .arg(base.join("accounts.json")) - .arg("--ledger") - .arg(base.join("ledger")) - .spawn()? - .wait() - .await?; - if !res.success() { - warn!("failed to run genesis generation command..."); + match self.connect { + Some(url) => { + let res = reqwest::get(url).await?.error_for_status()?.bytes().await?; + + tokio::fs::write(&output, res).await?; + } + None => { + let res = Command::new(bin) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .arg("genesis") + .arg("--output") + .arg(&output) + .arg("--committee-size") + .arg(generation.genesis.committee.to_string()) + .arg("--committee-output") + .arg(base.join("committee.json")) + .arg("--additional-accounts") + .arg(generation.genesis.additional_accounts.to_string()) + .arg("--additional-accounts-output") + .arg(base.join("accounts.json")) + .arg("--ledger") + .arg(base.join("ledger")) + .spawn()? + .wait() + .await?; + + if !res.success() { + warn!("failed to run genesis generation command..."); + } + } } if tokio::fs::try_exists(&output).await.is_err() { diff --git a/specs/test-1-validators.yaml b/specs/test-1-validators.yaml index 417d6125..71612908 100644 --- a/specs/test-1-validators.yaml +++ b/specs/test-1-validators.yaml @@ -12,7 +12,7 @@ generate: --- version: nodes.snarkos.testing.monadic.us/v1 -name: 4-validators +name: 1-validator nodes: # validator/test: diff --git a/specs/test-4-clients-canary.yaml b/specs/test-4-clients-canary.yaml new file mode 100644 index 00000000..efac6c2a --- /dev/null +++ b/specs/test-4-clients-canary.yaml @@ -0,0 +1,37 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: canary-clients +name: canary-clients + +fetch: https://pub-d74d58a8616c4d54bc1a948b4d001970.r2.dev/block.genesis + +generate: + path: ./tests/canary-clients + + accounts: + - file: "accounts.json" + total: 5 + +--- +version: nodes.snarkos.testing.monadic.us/v1 +name: 4-clients-canary + +external: + validator/1@canarynet: + node: 11.12.13.14:4130 + + validator/2@canarynet: + node: 11.12.13.14:4130 + + client/1@canarynet: + node: 11.12.13.14:4130 + +nodes: + client/test: + key: accounts.$ + replicas: 4 + height: 0 + validators: [] + peers: ["*/*@canarynet"] + From 74e4adb97993ce35c69d5932c6242f3be63cf710 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 19:40:04 -0500 Subject: [PATCH 06/11] fix(agent): missing json in reqwest --- crates/snot-agent/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/snot-agent/Cargo.toml b/crates/snot-agent/Cargo.toml index e3e37c83..bcb50eac 100644 --- a/crates/snot-agent/Cargo.toml +++ b/crates/snot-agent/Cargo.toml @@ -15,7 +15,7 @@ futures-util.workspace = true http.workspace = true httpdate = "1.0.3" local-ip-address = "0.6.1" -reqwest = { workspace = true, features = ["stream"] } +reqwest = { workspace = true, features = ["stream", "json"] } snot-common = { path = "../snot-common" } tarpc.workspace = true tokio.workspace = true From 3300e806b6ce8e01c9c712e8e3b1c7c245e65a25 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 19:41:37 -0500 Subject: [PATCH 07/11] refactor: move test storage into arc, load cannons into test --- crates/snot-common/src/state.rs | 3 +- crates/snot/src/cannon/mod.rs | 36 +++----------------- crates/snot/src/cannon/sink.rs | 8 +---- crates/snot/src/cannon/source.rs | 33 +++++++++--------- crates/snot/src/schema/cannon.rs | 13 +++++++ crates/snot/src/schema/mod.rs | 4 +++ crates/snot/src/schema/storage.rs | 17 ++++------ crates/snot/src/schema/timeline.rs | 15 ++++++--- crates/snot/src/testing.rs | 54 +++++++++++++++++++----------- 9 files changed, 91 insertions(+), 92 deletions(-) create mode 100644 crates/snot/src/schema/cannon.rs diff --git a/crates/snot-common/src/state.rs b/crates/snot-common/src/state.rs index 754d0150..98c68453 100644 --- a/crates/snot-common/src/state.rs +++ b/crates/snot-common/src/state.rs @@ -16,7 +16,8 @@ pub enum AgentState { #[default] // A node in the inventory can function as a transaction cannon Inventory, - Node(StorageId, NodeState), + /// Test id mapping to node state + Node(usize, NodeState), } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 2a754da0..1ec08675 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -8,7 +8,7 @@ use std::{ sync::{atomic::AtomicU32, Arc}, }; -use anyhow::{bail, ensure, Result}; +use anyhow::{bail, Result}; use tokio::{ sync::{mpsc::UnboundedSender, Mutex as AsyncMutex}, @@ -16,12 +16,7 @@ use tokio::{ }; use tracing::warn; -use crate::{ - cannon::source::LedgerQueryService, - schema::{storage::LoadedStorage, timeline::EventDuration}, - state::GlobalState, - testing::Test, -}; +use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Test}; use self::{sink::TxSink, source::TxSource}; @@ -64,9 +59,6 @@ pub struct TestCannon { source: TxSource, sink: TxSink, - /// How long this cannon will be fired for - duration: CannonDuration, - /// The test_id/storage associated with this cannon. /// To point at an external node, create a topology with external node /// To generate ahead-of-time, upload a test with a timeline referencing a @@ -87,14 +79,6 @@ pub struct TestCannon { #[derive(Clone, Debug)] struct CannonEnv { test: Arc, - storage: Arc, -} - -#[derive(Clone, Debug)] -pub enum CannonDuration { - Forever, - Timeline(EventDuration), - Count(u32), } impl TestCannon { @@ -106,26 +90,15 @@ impl TestCannon { global_state: Arc, source: TxSource, sink: TxSink, - duration: CannonDuration, test_id: usize, ) -> Result { - // ensure!( - // (source.needs_test_id() || sink.needs_test_id()) != test_id.is_some(), - // "Test ID must be provided if either source or sink requires it" - // ); - // mapping with async is ugly and blocking_read is scary let env = { let Some(test) = global_state.tests.read().await.get(&test_id).cloned() else { bail!("test {test_id} not found") }; - let storage_lock = global_state.storage.read().await; - let Some(storage) = storage_lock.get(&test.storage_id).cloned() else { - bail!("test {test_id} storage {} not found", test.storage_id) - }; - - CannonEnv { test, storage } + CannonEnv { test } }; let env2 = env.clone(); @@ -142,7 +115,7 @@ impl TestCannon { // TODO: if a sink or a source uses node_keys or storage // env will be used - println!("{}", env2.storage.id); + println!("{}", env2.test.storage.id); // compare the tx id to an authorization id let _pending_txs = HashSet::::new(); @@ -168,7 +141,6 @@ impl TestCannon { query_port, task: AsyncMutex::new(handle.abort_handle()), fired_txs, - duration, }) } diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index b7888d98..8065dd4f 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -2,7 +2,7 @@ use serde::Deserialize; use crate::schema::NodeTargets; -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub enum TxSink { /// Write transactions to a file AoTRecord { @@ -33,9 +33,3 @@ pub enum TxSink { tx_delay_ms: u32, }, } - -impl TxSink { - pub fn needs_test_id(&self) -> bool { - matches!(self, Self::RealTime { .. }) - } -} diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 18654e92..2d7fb880 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -1,13 +1,15 @@ -use std::collections::HashSet; +use std::{collections::HashSet, default}; use anyhow::{anyhow, Result}; use serde::Deserialize; use snot_common::state::NodeKey; +use crate::schema::nodes::KeySource; + use super::net::get_available_port; /// Represents an instance of a local query service. -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct LocalQueryService { /// Ledger & genesis block to use // pub storage_id: usize, @@ -41,7 +43,7 @@ impl LocalQueryService { /// Used to determine the redirection for the following paths: /// /cannon//mainnet/latest/stateRoot /// /cannon//mainnet/transaction/broadcast -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub enum LedgerQueryService { /// Use the local ledger query service Local(LocalQueryService), @@ -61,15 +63,16 @@ impl LedgerQueryService { } /// Which service is providing the compute power for executing transactions -#[derive(Debug, Deserialize)] +#[derive(Default, Clone, Debug, Deserialize)] pub enum ComputeTarget { /// Use the agent pool to generate executions + #[default] AgentPool, /// Use demox' API to generate executions Demox, } -#[derive(Debug, Hash, PartialEq, Eq, Deserialize)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] pub enum CreditsTxMode { BondPublic, UnbondPublic, @@ -80,13 +83,13 @@ pub enum CreditsTxMode { TransferPrivateToPublic, } -#[derive(Debug, Hash, PartialEq, Eq, Deserialize)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] pub enum TxMode { Credits(CreditsTxMode), // TODO: Program(program, func, input types??) } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub enum TxSource { /// Read transactions from a file AoTPlayback { @@ -98,21 +101,19 @@ pub enum TxSource { query: LedgerQueryService, compute: ComputeTarget, + /// defaults to TransferPublic tx_modes: HashSet, - /// how many transactions to buffer before firing a burst - min_buffer_size: usize, + /// private keys for making transactions + /// defaults to committee keys + private_keys: Vec, + /// addreses for transaction targets + /// defaults to committee addresses + addresses: Vec, }, } impl TxSource { - pub fn needs_test_id(&self) -> bool { - match self { - TxSource::RealTime { query, .. } => query.needs_test_id(), - _ => false, - } - } - /// Get an available port for the query service if applicable pub fn get_query_port(&self) -> Result> { matches!( diff --git a/crates/snot/src/schema/cannon.rs b/crates/snot/src/schema/cannon.rs new file mode 100644 index 00000000..3d0ff4d5 --- /dev/null +++ b/crates/snot/src/schema/cannon.rs @@ -0,0 +1,13 @@ +use serde::Deserialize; + +use crate::cannon::{sink::TxSink, source::TxSource}; + +/// A document describing the node infrastructure for a test. +#[derive(Deserialize, Debug, Clone)] +pub struct Document { + pub name: String, + pub description: Option, + + pub source: TxSource, + pub sink: TxSink, +} diff --git a/crates/snot/src/schema/mod.rs b/crates/snot/src/schema/mod.rs index fef59d3f..9a504717 100644 --- a/crates/snot/src/schema/mod.rs +++ b/crates/snot/src/schema/mod.rs @@ -9,6 +9,7 @@ use serde::{ use snot_common::state::{NodeKey, NodeType}; use wildmatch::WildMatch; +pub mod cannon; pub mod infrastructure; pub mod nodes; pub mod outcomes; @@ -38,6 +39,9 @@ pub enum ItemDocument { #[serde(rename = "outcomes.snarkos.testing.monadic.us/v1")] Outcomes(Box), + + #[serde(rename = "cannon.snarkos.testing.monadic.us/v1")] + Cannon(Box), } /// One or more deserialized node targets. Composed of one or more diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index a2df73f7..345a2fef 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -168,7 +168,7 @@ impl From for String { } impl Document { - pub async fn prepare(self, state: &GlobalState) -> anyhow::Result { + pub async fn prepare(self, state: &GlobalState) -> anyhow::Result> { static STORAGE_ID_INT: AtomicUsize = AtomicUsize::new(0); let id = String::from(self.id.clone()); @@ -189,7 +189,7 @@ impl Document { match self.generate { // generate the block and ledger if we have generation params - Some(mut generation) => 'generate: { + Some(generation) => 'generate: { // warn if an existing block/ledger already exists if exists { // TODO: is this the behavior we want? @@ -200,11 +200,6 @@ impl Document { tokio::fs::create_dir_all(&base).await?; } - generation.genesis = GenesisGeneration { - output: base.join(generation.genesis.output), - ..generation.genesis - }; - // generate the genesis block using the aot cli let bin = std::env::var("AOT_BIN").map(PathBuf::from).unwrap_or( PathBuf::from(env!("CARGO_MANIFEST_DIR")) @@ -318,16 +313,16 @@ impl Document { let int_id = STORAGE_ID_INT.fetch_add(1, Ordering::Relaxed); storage_lock.insert(int_id, id.to_owned()); - let storage = LoadedStorage { + let storage = Arc::new(LoadedStorage { id: id.to_owned(), path: base.clone(), committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?, accounts, - }; + }); let mut storage_lock = state.storage.write().await; - storage_lock.insert(int_id, Arc::new(storage)); + storage_lock.insert(int_id, storage.clone()); - Ok(int_id) + Ok(storage) } } diff --git a/crates/snot/src/schema/timeline.rs b/crates/snot/src/schema/timeline.rs index 76192a37..ad97e164 100644 --- a/crates/snot/src/schema/timeline.rs +++ b/crates/snot/src/schema/timeline.rs @@ -1,10 +1,13 @@ -use std::{fmt, path::PathBuf, time::Duration}; +use std::{fmt, time::Duration}; use indexmap::IndexMap; use serde::{ de::{Error, Visitor}, Deserialize, Deserializer, }; +use snot_common::state::NodeKey; + +use super::NodeTargets; /// A document describing a test's event timeline. #[derive(Deserialize, Debug, Clone)] @@ -176,8 +179,10 @@ impl<'de> Deserialize<'de> for EventDuration { #[derive(Deserialize, Debug, Clone)] pub struct TxCannon { - pub target: String, - pub source: PathBuf, - pub total: u64, - pub tps: u32, + pub name: String, + pub count: u64, + /// overwrite the query's source node + pub query: Option, + /// overwrite the cannon sink target + pub target: Option, } diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index 041f5943..d7184da2 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -1,6 +1,10 @@ use std::{ + collections::HashMap, fmt::Display, - sync::{atomic::Ordering, Arc}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, }; use anyhow::{anyhow, bail, ensure}; @@ -12,19 +16,27 @@ use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ + cannon::{sink::TxSink, source::TxSource, TestCannon}, schema::{ nodes::{ExternalNode, Node}, + storage::LoadedStorage, ItemDocument, NodeTargets, }, state::GlobalState, }; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Test { - pub storage_id: usize, + pub storage: Arc, pub node_map: BiMap, pub initial_nodes: IndexMap, - // TODO: GlobalStorage.storage should maybe be here instead + + /// Map of transaction files to their respective counters + pub transaction_counters: HashMap, + /// Map of cannon ids to their cannon configurations + pub cannon_configs: HashMap, + /// Map of cannon ids to their cannon instances + pub cannons: HashMap>, } #[derive(Debug, Clone)] @@ -73,18 +85,25 @@ impl Test { ) -> anyhow::Result { let mut state_lock = state.tests.write().await; - let mut storage_id = None; + let mut storage = None; let mut node_map = BiHashMap::default(); let mut initial_nodes = IndexMap::default(); + let mut cannon_configs = HashMap::new(); + let mut cannons = HashMap::new(); for document in documents { match document { - ItemDocument::Storage(storage) => { - let int_id = storage.prepare(state).await?; - if storage_id.is_none() { - storage_id = Some(int_id); + ItemDocument::Storage(doc) => { + if storage.is_none() { + storage = Some(doc.prepare(state).await?); + } else { + bail!("multiple storage documents found in test") } } + ItemDocument::Cannon(cannon) => { + cannon_configs.insert(cannon.name.to_owned(), (cannon.source, cannon.sink)); + cannons.insert(cannon.name, Vec::new()); + } ItemDocument::Nodes(nodes) => { // flatten replicas for (doc_node_key, mut doc_node) in nodes.nodes { @@ -169,9 +188,12 @@ impl Test { } let test = Test { - storage_id: storage_id.ok_or_else(|| anyhow!("test is missing storage document"))?, + storage: storage.ok_or_else(|| anyhow!("test is missing storage document"))?, node_map, initial_nodes, + transaction_counters: HashMap::new(), + cannon_configs, + cannons, }; let test_id = state.tests_counter.fetch_add(1, Ordering::Relaxed); @@ -266,14 +288,6 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul .get(id) .ok_or_else(|| anyhow!("test not found"))?; - // get the numeric storage ID from the string storage ID - let storage_id = test.storage_id; - - // obtain the actual storage - let Some(storage) = state.storage.read().await.get(&storage_id).cloned() else { - bail!("test {id} storage {storage_id} not found...") - }; - let pool_lock = state.pool.read().await; // Lookup agent peers given a node key @@ -355,11 +369,11 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul node_state.private_key = node .key .as_ref() - .and_then(|key| storage.lookup_keysource(key)); + .and_then(|key| test.storage.lookup_keysource(key)); node_state.peers = matching_nodes(key, &node.peers, false)?; node_state.validators = matching_nodes(key, &node.validators, true)?; - let agent_state = AgentState::Node(storage_id, node_state); + let agent_state = AgentState::Node(id, node_state); agent_ids.push(id); handles.push(tokio::spawn(async move { client From 5b325785baf78bd011d2f3594f4b5eba2c6bb8f5 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 19:46:22 -0500 Subject: [PATCH 08/11] refactor: s/test/env --- crates/snot/src/cannon/mod.rs | 25 ++++----- crates/snot/src/cannon/sink.rs | 4 +- crates/snot/src/cannon/source.rs | 15 +---- crates/snot/src/server/api.rs | 12 ++-- crates/snot/src/server/mod.rs | 4 +- crates/snot/src/state.rs | 10 ++-- crates/snot/src/testing.rs | 94 ++++++++++++++++---------------- 7 files changed, 74 insertions(+), 90 deletions(-) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 1ec08675..8ee5bb77 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -16,7 +16,7 @@ use tokio::{ }; use tracing::warn; -use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Test}; +use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Environment}; use self::{sink::TxSink, source::TxSource}; @@ -52,7 +52,7 @@ burst mode?? /// Transaction cannon state /// using the `TxSource` and `TxSink` for configuration. #[derive(Debug)] -pub struct TestCannon { +pub struct CannonInstance { // a copy of the global state global_state: Arc, @@ -63,7 +63,7 @@ pub struct TestCannon { /// To point at an external node, create a topology with external node /// To generate ahead-of-time, upload a test with a timeline referencing a /// cannon pointing at a file - env: CannonEnv, + env: Arc, /// Local query service port. Only present if the TxSource uses a local query source. query_port: Option, @@ -76,12 +76,7 @@ pub struct TestCannon { fired_txs: AtomicU32, } -#[derive(Clone, Debug)] -struct CannonEnv { - test: Arc, -} - -impl TestCannon { +impl CannonInstance { /// Create a new active transaction cannon /// with the given source and sink. /// @@ -94,11 +89,11 @@ impl TestCannon { ) -> Result { // mapping with async is ugly and blocking_read is scary let env = { - let Some(test) = global_state.tests.read().await.get(&test_id).cloned() else { + let Some(env) = global_state.envs.read().await.get(&test_id).cloned() else { bail!("test {test_id} not found") }; - CannonEnv { test } + env }; let env2 = env.clone(); @@ -115,7 +110,7 @@ impl TestCannon { // TODO: if a sink or a source uses node_keys or storage // env will be used - println!("{}", env2.test.storage.id); + println!("{}", env2.storage.id); // compare the tx id to an authorization id let _pending_txs = HashSet::::new(); @@ -157,8 +152,8 @@ impl TestCannon { } } LedgerQueryService::Node(key) => { - // test_id must be Some because LedgerQueryService::Node requires it - let Some(agent_id) = self.env.test.get_agent_by_key(key) else { + // env_id must be Some because LedgerQueryService::Node requires it + let Some(agent_id) = self.env.get_agent_by_key(key) else { bail!("cannon target agent not found") }; @@ -192,7 +187,7 @@ impl TestCannon { } } -impl Drop for TestCannon { +impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop self.task.blocking_lock().abort(); diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 8065dd4f..c8cff9e7 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -18,11 +18,11 @@ pub enum TxSink { /// Number of transactions per block tx_per_block: u32, }, - /// Send transactions to nodes in a test + /// Send transactions to nodes in a env RealTime { /// The nodes to send transactions to /// - /// Requires cannon to have an associated test_id + /// Requires cannon to have an associated env_id target: NodeTargets, /// How long between each burst of transactions diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 2d7fb880..14b00c71 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, default}; +use std::collections::HashSet; use anyhow::{anyhow, Result}; use serde::Deserialize; @@ -25,7 +25,7 @@ pub struct LocalQueryService { /// When present, the cannon will update the ledger service from this node /// if the node is out of sync, it will corrupt the ledger... /// - /// requires cannon to have an associated test_id + /// requires cannon to have an associated env_id pub sync_from: Option, } @@ -49,19 +49,10 @@ pub enum LedgerQueryService { Local(LocalQueryService), /// Target a specific node (probably over rpc instead of reqwest lol...) /// - /// Requires cannon to have an associated test_id + /// Requires cannon to have an associated env_id Node(NodeKey), } -impl LedgerQueryService { - pub fn needs_test_id(&self) -> bool { - match self { - LedgerQueryService::Node(_) => true, - LedgerQueryService::Local(LocalQueryService { sync_from, .. }) => sync_from.is_some(), - } - } -} - /// Which service is providing the compute power for executing transactions #[derive(Default, Clone, Debug, Deserialize)] pub enum ComputeTarget { diff --git a/crates/snot/src/server/api.rs b/crates/snot/src/server/api.rs index 28c1daff..9a7bcbdf 100644 --- a/crates/snot/src/server/api.rs +++ b/crates/snot/src/server/api.rs @@ -9,7 +9,7 @@ use serde::Deserialize; use serde_json::json; use super::AppState; -use crate::testing::Test; +use crate::testing::Environment; pub(super) fn routes() -> Router { Router::new() @@ -54,7 +54,7 @@ async fn get_agents(state: State) -> impl IntoResponse { } async fn post_test_prepare(state: State, body: String) -> Response { - let documents = match Test::deserialize(&body) { + let documents = match Environment::deserialize(&body) { Ok(documents) => documents, Err(e) => { return ( @@ -70,8 +70,8 @@ async fn post_test_prepare(state: State, body: String) -> Response { // TODO: clean up existing test - match Test::prepare(documents, &state).await { - Ok(test_id) => (StatusCode::OK, Json(json!({ "id": test_id }))).into_response(), + match Environment::prepare(documents, &state).await { + Ok(env_id) => (StatusCode::OK, Json(json!({ "id": env_id }))).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("{e}") })), @@ -81,10 +81,10 @@ async fn post_test_prepare(state: State, body: String) -> Response { } async fn delete_test( - Path(test_id): Path, + Path(env_id): Path, State(state): State, ) -> impl IntoResponse { - match Test::cleanup(&test_id, &state).await { + match Environment::cleanup(&env_id, &state).await { Ok(_) => StatusCode::OK.into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index 378c3cfd..4586f257 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -50,8 +50,8 @@ pub async fn start(cli: Cli) -> Result<()> { pool: Default::default(), storage_ids: Default::default(), storage: Default::default(), - tests_counter: Default::default(), - tests: Default::default(), + envs_counter: Default::default(), + envs: Default::default(), cannons: Default::default(), }; diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index b05cfdce..241d20fb 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -20,11 +20,11 @@ use tarpc::{client::RpcError, context}; use tokio::sync::RwLock; use crate::{ - cannon::TestCannon, + cannon::CannonInstance, cli::Cli, schema::storage::LoadedStorage, server::jwt::{Claims, JWT_NONCE, JWT_SECRET}, - testing::Test, + testing::Environment, }; pub type AgentId = usize; @@ -40,10 +40,10 @@ pub struct GlobalState { /// A map from ephemeral integer storage ID to actual storage ID. pub storage_ids: RwLock>, pub storage: RwLock>>, - pub cannons: RwLock>>, + pub cannons: RwLock>>, - pub tests_counter: AtomicUsize, - pub tests: RwLock>>, + pub envs_counter: AtomicUsize, + pub envs: RwLock>>, } /// This is the representation of a public addr or a list of internal addrs. diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index d7184da2..74bb2fc5 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -16,7 +16,7 @@ use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ - cannon::{sink::TxSink, source::TxSource, TestCannon}, + cannon::{sink::TxSink, source::TxSource, CannonInstance}, schema::{ nodes::{ExternalNode, Node}, storage::LoadedStorage, @@ -26,22 +26,22 @@ use crate::{ }; #[derive(Debug)] -pub struct Test { +pub struct Environment { pub storage: Arc, - pub node_map: BiMap, - pub initial_nodes: IndexMap, + pub node_map: BiMap, + pub initial_nodes: IndexMap, /// Map of transaction files to their respective counters pub transaction_counters: HashMap, /// Map of cannon ids to their cannon configurations pub cannon_configs: HashMap, /// Map of cannon ids to their cannon instances - pub cannons: HashMap>, + pub cannons: HashMap>, } #[derive(Debug, Clone)] /// The effective test state of a node. -pub enum TestNode { +pub enum EnvNode { Internal(Node), External(ExternalNode), } @@ -50,21 +50,21 @@ pub enum TestNode { /// A way of looking up a peer in the test state. /// Could technically use AgentPeer like this but it would have needless port /// information -pub enum TestPeer { +pub enum EnvPeer { Internal(AgentId), External, } -impl Display for TestPeer { +impl Display for EnvPeer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TestPeer::Internal(id) => write!(f, "agent {id}"), - TestPeer::External => write!(f, "external node"), + EnvPeer::Internal(id) => write!(f, "agent {id}"), + EnvPeer::External => write!(f, "external node"), } } } -impl Test { +impl Environment { /// Deserialize (YAML) many documents into a `Vec` of documents. pub fn deserialize(str: &str) -> Result, anyhow::Error> { serde_yaml::Deserializer::from_str(str) @@ -77,13 +77,13 @@ impl Test { /// Prepare a test. This will set the current test on the GlobalState. /// - /// **This will error if the current test is not unset before calling to + /// **This will error if the current env is not unset before calling to /// ensure tests are properly cleaned up.** pub async fn prepare( documents: Vec, state: &GlobalState, ) -> anyhow::Result { - let mut state_lock = state.tests.write().await; + let mut state_lock = state.envs.write().await; let mut storage = None; let mut node_map = BiHashMap::default(); @@ -97,7 +97,7 @@ impl Test { if storage.is_none() { storage = Some(doc.prepare(state).await?); } else { - bail!("multiple storage documents found in test") + bail!("multiple storage documents found in env") } } ItemDocument::Cannon(cannon) => { @@ -131,7 +131,7 @@ impl Test { if let Some(key) = node.key.take() { node.key = Some(key.with_index(i)) } - ent.insert(TestNode::Internal(node)) + ent.insert(EnvNode::Internal(node)) } }; } @@ -158,7 +158,7 @@ impl Test { initial_nodes .keys() .cloned() - .zip(available_agent.map(|agent| TestPeer::Internal(agent.id()))), + .zip(available_agent.map(|agent| EnvPeer::Internal(agent.id()))), ); info!("delegated {} nodes to agents", node_map.len()); @@ -171,7 +171,7 @@ impl Test { for (node_key, node) in &nodes.external { match initial_nodes.entry(node_key.clone()) { Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()), - Entry::Vacant(ent) => ent.insert(TestNode::External(node.to_owned())), + Entry::Vacant(ent) => ent.insert(EnvNode::External(node.to_owned())), }; } node_map.extend( @@ -179,7 +179,7 @@ impl Test { .external .keys() .cloned() - .map(|k| (k, TestPeer::External)), + .map(|k| (k, EnvPeer::External)), ) } @@ -187,8 +187,8 @@ impl Test { } } - let test = Test { - storage: storage.ok_or_else(|| anyhow!("test is missing storage document"))?, + let env = Environment { + storage: storage.ok_or_else(|| anyhow!("env is missing storage document"))?, node_map, initial_nodes, transaction_counters: HashMap::new(), @@ -196,34 +196,34 @@ impl Test { cannons, }; - let test_id = state.tests_counter.fetch_add(1, Ordering::Relaxed); - state_lock.insert(test_id, Arc::new(test)); + let env_id = state.envs_counter.fetch_add(1, Ordering::Relaxed); + state_lock.insert(env_id, Arc::new(env)); drop(state_lock); // reconcile the nodes - initial_reconcile(&test_id, state).await?; + initial_reconcile(&env_id, state).await?; - Ok(test_id) + Ok(env_id) } pub async fn cleanup(id: &usize, state: &GlobalState) -> anyhow::Result<()> { - // clear the test state - info!("clearing test {id} state..."); - let Some(test) = ({ - let mut state_lock = state.tests.write().await; + // clear the env state + info!("clearing env {id} state..."); + let Some(env) = ({ + let mut state_lock = state.envs.write().await; state_lock.remove(id) }) else { - bail!("test {id} not found") + bail!("env {id} not found") }; // reconcile all online agents let (ids, handles): (Vec<_>, Vec<_>) = { let agents = state.pool.read().await; - test.node_map + env.node_map .right_values() - // find all agents associated with the test + // find all agents associated with the env .filter_map(|peer| match peer { - TestPeer::Internal(id) => agents.get(id), + EnvPeer::Internal(id) => agents.get(id), _ => None, }) // map the agents to rpc clients @@ -269,11 +269,11 @@ impl Test { Ok(()) } - /// Lookup a test agent id by node key. + /// Lookup a env agent id by node key. pub fn get_agent_by_key(&self, key: &NodeKey) -> Option { self.node_map.get_by_left(key).and_then(|id| match id { - TestPeer::Internal(id) => Some(*id), - TestPeer::External => None, + EnvPeer::Internal(id) => Some(*id), + EnvPeer::External => None, }) } } @@ -283,19 +283,17 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul let mut handles = vec![]; let mut agent_ids = vec![]; { - let tests_lock = state.tests.read().await; - let test = tests_lock - .get(id) - .ok_or_else(|| anyhow!("test not found"))?; + let envs_lock = state.envs.read().await; + let env = envs_lock.get(id).ok_or_else(|| anyhow!("env not found"))?; let pool_lock = state.pool.read().await; // Lookup agent peers given a node key - let node_to_agent = |key: &NodeKey, node: &TestPeer, is_validator: bool| { + let node_to_agent = |key: &NodeKey, node: &EnvPeer, is_validator: bool| { // get the internal agent ID from the node key match node { // internal peers are mapped to internal agents - TestPeer::Internal(id) => { + EnvPeer::Internal(id) => { let Some(agent) = pool_lock.get(id) else { bail!("agent {id} not found in pool") }; @@ -310,8 +308,8 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul )) } // external peers are mapped to external nodes - TestPeer::External => { - let Some(TestNode::External(external)) = test.initial_nodes.get(key) else { + EnvPeer::External => { + let Some(EnvNode::External(external)) = env.initial_nodes.get(key) else { bail!("external node with key {key} not found") }; @@ -343,20 +341,20 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul // alternatively, use a more efficient data structure for // storing node keys - test.node_map + env.node_map .iter() .filter(|(k, _)| *k != key && target.matches(k)) .map(|(k, v)| node_to_agent(k, v, is_validator)) .collect() }; - for (key, node) in &test.initial_nodes { - let TestNode::Internal(node) = node else { + for (key, node) in &env.initial_nodes { + let EnvNode::Internal(node) = node else { continue; }; // get the internal agent ID from the node key - let Some(id) = test.get_agent_by_key(key) else { + let Some(id) = env.get_agent_by_key(key) else { bail!("expected internal agent peer for node with key {key}") }; @@ -369,7 +367,7 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul node_state.private_key = node .key .as_ref() - .and_then(|key| test.storage.lookup_keysource(key)); + .and_then(|key| env.storage.lookup_keysource(key)); node_state.peers = matching_nodes(key, &node.peers, false)?; node_state.validators = matching_nodes(key, &node.validators, true)?; From df0e68cd502a089aac8bba34616c8152f2258bb9 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 19:52:25 -0500 Subject: [PATCH 09/11] refactor: enum renaming for clarity --- crates/snot/src/cannon/mod.rs | 4 ++-- crates/snot/src/cannon/sink.rs | 20 ++++++++++---------- crates/snot/src/cannon/source.rs | 2 +- crates/snot/src/schema/timeline.rs | 4 ++-- specs/test-4-validators.yaml | 2 +- test-spec.yaml | 7 +++++++ 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 8ee5bb77..f9044152 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -166,7 +166,7 @@ impl CannonInstance { client.get_state_root().await } }, - TxSource::AoTPlayback { .. } => { + TxSource::Playback { .. } => { bail!("cannon is configured to playback from file.") } } @@ -179,7 +179,7 @@ impl CannonInstance { TxSource::RealTime { .. } => { self.tx_sender.send(body)?; } - TxSource::AoTPlayback { .. } => { + TxSource::Playback { .. } => { warn!("cannon received broadcasted transaction in playback mode. ignoring.") } } diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index c8cff9e7..688618d4 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -5,19 +5,19 @@ use crate::schema::NodeTargets; #[derive(Clone, Debug, Deserialize)] pub enum TxSink { /// Write transactions to a file - AoTRecord { + Record { /// filename for the recording txs list name: String, }, - /// Write transactions to a ledger query service - AoTAppend { - // information for running .. another ledger service - // solely for appending blocks to a ledger... - // storage_id: usize, - // port: u16, - /// Number of transactions per block - tx_per_block: u32, - }, + //// Write transactions to a ledger query service + // AoTAppend { + // // information for running .. another ledger service + // // solely for appending blocks to a ledger... + // // storage_id: usize, + // // port: u16, + // /// Number of transactions per block + // tx_per_block: u32, + // }, /// Send transactions to nodes in a env RealTime { /// The nodes to send transactions to diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 14b00c71..f55f78e1 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -83,7 +83,7 @@ pub enum TxMode { #[derive(Clone, Debug, Deserialize)] pub enum TxSource { /// Read transactions from a file - AoTPlayback { + Playback { // filename from the storage for the tx list name: String, }, diff --git a/crates/snot/src/schema/timeline.rs b/crates/snot/src/schema/timeline.rs index ad97e164..bc577a58 100644 --- a/crates/snot/src/schema/timeline.rs +++ b/crates/snot/src/schema/timeline.rs @@ -45,7 +45,7 @@ pub enum Action { /// Update the given nodes to an offline state Offline(NodeTarget), /// Fire transactions from a source file at a target node - Cannon(Vec), + Cannon(Vec), /// Set the height of some nodes' ledgers Height(IndexMap), } @@ -178,7 +178,7 @@ impl<'de> Deserialize<'de> for EventDuration { } #[derive(Deserialize, Debug, Clone)] -pub struct TxCannon { +pub struct SpawnCannon { pub name: String, pub count: u64, /// overwrite the query's source node diff --git a/specs/test-4-validators.yaml b/specs/test-4-validators.yaml index 2e77ad50..7385dc92 100644 --- a/specs/test-4-validators.yaml +++ b/specs/test-4-validators.yaml @@ -10,7 +10,7 @@ name: 4-validators nodes: validator/test: - replicas: 4 + replicas: 2 key: committee.$ height: 0 validators: [validator/*] diff --git a/test-spec.yaml b/test-spec.yaml index 8c9ab1ed..0c396482 100644 --- a/test-spec.yaml +++ b/test-spec.yaml @@ -6,8 +6,14 @@ name: private tx, 500 rounds, 4 accounts description: | This ledger was built to test private transactions +fetch: https://example.com/genesis.block + # instructions for generating this test generate: + accounts: + - file: accounts.json + total: 5 + # genesis generation genesis: seed: 0 @@ -15,6 +21,7 @@ generate: committee-balances: 10_000_000_000_000 additional-accounts: 10 additional-balances: 100_000_000_000 + # ledger setup ledger: blocks: 100 From c3b355039748bc57e7bcee2d5d1e3cd32af98e2e Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 20:03:32 -0500 Subject: [PATCH 10/11] refactor(cannons): move cannons out of global state into env state --- crates/snot/src/cannon/router.rs | 31 +++++++++++++++++++++---------- crates/snot/src/server/mod.rs | 1 - crates/snot/src/state.rs | 2 -- crates/snot/src/testing.rs | 13 +++++++------ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index 489f9c76..239b4b2d 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -11,18 +11,25 @@ use crate::state::AppState; pub(crate) fn redirect_cannon_routes() -> Router { Router::new() - .route("/:id/mainnet/latest/stateRoot", get(state_root)) - .route("/:id/mainnet/transaction/broadcast", post(transaction)) + .route("/:env/:id/mainnet/latest/stateRoot", get(state_root)) + .route("/:env/:id/mainnet/transaction/broadcast", post(transaction)) } -async fn state_root(Path(cannon_id): Path, state: State) -> Response { - let Some(cannon) = ({ - let cannons = state.cannons.read().await; - cannons.get(&cannon_id).cloned() +async fn state_root( + Path((env_id, cannon_id)): Path<(usize, usize)>, + state: State, +) -> Response { + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() }) else { return StatusCode::NOT_FOUND.into_response(); }; + let Some(cannon) = env.cannons.get(&cannon_id) else { + return StatusCode::NOT_FOUND.into_response(); + }; + match cannon.proxy_state_root().await { // the nodes expect this state root to be string escaped json Ok(root) => Json(json!(root)).into_response(), @@ -35,17 +42,21 @@ async fn state_root(Path(cannon_id): Path, state: State) -> Res } async fn transaction( - Path(cannon_id): Path, + Path((env_id, cannon_id)): Path<(usize, usize)>, state: State, body: String, ) -> Response { - let Some(cannon) = ({ - let cannons = state.cannons.read().await; - cannons.get(&cannon_id).cloned() + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() }) else { return StatusCode::NOT_FOUND.into_response(); }; + let Some(cannon) = env.cannons.get(&cannon_id) else { + return StatusCode::NOT_FOUND.into_response(); + }; + match cannon.proxy_broadcast(body) { Ok(_) => StatusCode::OK.into_response(), Err(e) => ( diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index 4586f257..a69d8641 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -52,7 +52,6 @@ pub async fn start(cli: Cli) -> Result<()> { storage: Default::default(), envs_counter: Default::default(), envs: Default::default(), - cannons: Default::default(), }; let app = Router::new() diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index 241d20fb..031a54d9 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -20,7 +20,6 @@ use tarpc::{client::RpcError, context}; use tokio::sync::RwLock; use crate::{ - cannon::CannonInstance, cli::Cli, schema::storage::LoadedStorage, server::jwt::{Claims, JWT_NONCE, JWT_SECRET}, @@ -40,7 +39,6 @@ pub struct GlobalState { /// A map from ephemeral integer storage ID to actual storage ID. pub storage_ids: RwLock>, pub storage: RwLock>>, - pub cannons: RwLock>>, pub envs_counter: AtomicUsize, pub envs: RwLock>>, diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index 74bb2fc5..ba949a62 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, fmt::Display, sync::{ - atomic::{AtomicU32, Ordering}, + atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, }, }; @@ -35,8 +35,10 @@ pub struct Environment { pub transaction_counters: HashMap, /// Map of cannon ids to their cannon configurations pub cannon_configs: HashMap, + /// To help generate the id of the new cannon. + pub cannons_counter: AtomicUsize, /// Map of cannon ids to their cannon instances - pub cannons: HashMap>, + pub cannons: HashMap, } #[derive(Debug, Clone)] @@ -89,7 +91,6 @@ impl Environment { let mut node_map = BiHashMap::default(); let mut initial_nodes = IndexMap::default(); let mut cannon_configs = HashMap::new(); - let mut cannons = HashMap::new(); for document in documents { match document { @@ -102,7 +103,6 @@ impl Environment { } ItemDocument::Cannon(cannon) => { cannon_configs.insert(cannon.name.to_owned(), (cannon.source, cannon.sink)); - cannons.insert(cannon.name, Vec::new()); } ItemDocument::Nodes(nodes) => { // flatten replicas @@ -191,9 +191,10 @@ impl Environment { storage: storage.ok_or_else(|| anyhow!("env is missing storage document"))?, node_map, initial_nodes, - transaction_counters: HashMap::new(), + transaction_counters: Default::default(), cannon_configs, - cannons, + cannons_counter: Default::default(), + cannons: Default::default(), }; let env_id = state.envs_counter.fetch_add(1, Ordering::Relaxed); From 025762f2fe86f85f503797c46374406cdaf5597a Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 25 Mar 2024 20:17:21 -0500 Subject: [PATCH 11/11] fix(cannon): fix reference cycle --- crates/snot/src/cannon/mod.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index f9044152..6773b4d0 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -5,7 +5,7 @@ pub mod source; use std::{ collections::HashSet, - sync::{atomic::AtomicU32, Arc}, + sync::{atomic::AtomicU32, Arc, Weak}, }; use anyhow::{bail, Result}; @@ -63,7 +63,7 @@ pub struct CannonInstance { /// To point at an external node, create a topology with external node /// To generate ahead-of-time, upload a test with a timeline referencing a /// cannon pointing at a file - env: Arc, + env: Weak, /// Local query service port. Only present if the TxSource uses a local query source. query_port: Option, @@ -131,7 +131,7 @@ impl CannonInstance { global_state, source, sink, - env, + env: Arc::downgrade(&env), tx_sender, query_port, task: AsyncMutex::new(handle.abort_handle()), @@ -152,8 +152,12 @@ impl CannonInstance { } } LedgerQueryService::Node(key) => { + let Some(env) = self.env.upgrade() else { + unreachable!("called from a place where env is present") + }; + // env_id must be Some because LedgerQueryService::Node requires it - let Some(agent_id) = self.env.get_agent_by_key(key) else { + let Some(agent_id) = env.get_agent_by_key(key) else { bail!("cannon target agent not found") };