From 3c696d187511159ba1099692972dcdbb32e269d6 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Tue, 26 Mar 2024 00:53:36 -0500 Subject: [PATCH 1/8] feat(cannon): fs drain, auth gen, agent auth exec, local ledger service --- Cargo.lock | 2 + Cargo.toml | 1 + crates/snot-agent/Cargo.toml | 2 +- crates/snot-agent/src/rpc.rs | 29 +++++ crates/snot-common/Cargo.toml | 1 + crates/snot-common/src/rpc/agent.rs | 13 ++ crates/snot/Cargo.toml | 1 + crates/snot/src/cannon/authorized.rs | 61 +++++++++ crates/snot/src/cannon/fs_drain.rs | 44 +++++++ crates/snot/src/cannon/mod.rs | 177 ++++++++++++++++++++------- crates/snot/src/cannon/source.rs | 58 ++++++++- crates/snot/src/cli.rs | 3 + crates/snot/src/schema/storage.rs | 44 +++++++ crates/snot/src/testing.rs | 28 ++++- 14 files changed, 409 insertions(+), 55 deletions(-) create mode 100644 crates/snot/src/cannon/authorized.rs create mode 100644 crates/snot/src/cannon/fs_drain.rs diff --git a/Cargo.lock b/Cargo.lock index 9e6dae30..1cfc2458 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5352,6 +5352,7 @@ dependencies = [ "bincode", "clap", "duration-str", + "external-ip", "futures-util", "hmac", "indexmap 2.2.5", @@ -5420,6 +5421,7 @@ dependencies = [ "lazy_static", "regex", "serde", + "serde_json", "tarpc", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 463a7891..53937234 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ bimap = "0.6" bincode = "1.3" clap = { version = "4.5", features = ["derive"] } colored = "2" +external-ip = "4.2.0" futures = "0.3" futures-util = "0.3.30" http = "1.1" diff --git a/crates/snot-agent/Cargo.toml b/crates/snot-agent/Cargo.toml index 60ac4d85..dc657148 100644 --- a/crates/snot-agent/Cargo.toml +++ b/crates/snot-agent/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" anyhow.workspace = true bincode.workspace = true clap.workspace = true -external-ip = "4.2.0" +external-ip.workspace = true futures.workspace = true futures-util.workspace = true http.workspace = true diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index 31110521..6e49148b 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -406,4 +406,33 @@ impl AgentService for AgentRpcServer { AgentMetric::Tps => metrics.tps.get(), } } + + async fn execute_authorization( + self, + _: context::Context, + _env_id: usize, + query: String, + auth: String, + ) -> Result<(), AgentError> { + info!("executing authorization..."); + // TODO: ensure binary associated with this env_id is present + + let res = Command::new(self.state.cli.path.join(SNARKOS_FILE)) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .arg("execute") + .arg("--query") + .arg(&format!("http://{}{query}", self.state.endpoint)) + .arg(auth) + .spawn() + .map_err(|_| AgentError::FailedToSpawnProcess)? + .wait() + .await + .map_err(|_| AgentError::ProcessFailed)?; + + if !res.success() { + return Err(AgentError::ProcessFailed); + } + Ok(()) + } } diff --git a/crates/snot-common/Cargo.toml b/crates/snot-common/Cargo.toml index feae152d..9e9d4cba 100644 --- a/crates/snot-common/Cargo.toml +++ b/crates/snot-common/Cargo.toml @@ -8,6 +8,7 @@ futures.workspace = true lazy_static.workspace = true regex.workspace = true serde.workspace = true +serde_json.workspace = true tarpc.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/snot-common/src/rpc/agent.rs b/crates/snot-common/src/rpc/agent.rs index 77f3c156..32cc73b4 100644 --- a/crates/snot-common/src/rpc/agent.rs +++ b/crates/snot-common/src/rpc/agent.rs @@ -22,6 +22,15 @@ pub trait AgentService { /// Get the state root from the running node async fn get_state_root() -> Result; + + /// Locally execute an authorization, using the given query + /// environment id is passed so the agent can determine which aot binary to use + async fn execute_authorization( + env_id: usize, + query: String, + auth: String, + ) -> Result<(), AgentError>; + async fn get_metric(metric: AgentMetric) -> f64; } @@ -45,6 +54,10 @@ pub enum AgentError { FailedToParseJson, #[error("failed to make a request")] FailedToMakeRequest, + #[error("failed to spawn a process")] + FailedToSpawnProcess, + #[error("process failed")] + ProcessFailed, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AgentMetric { diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 6027c503..a49f799a 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -10,6 +10,7 @@ bimap.workspace = true bincode.workspace = true clap.workspace = true duration-str = { version = "0.7", default-features = false } +external-ip.workspace = true futures-util.workspace = true hmac = "0.12.1" indexmap.workspace = true diff --git a/crates/snot/src/cannon/authorized.rs b/crates/snot/src/cannon/authorized.rs new file mode 100644 index 00000000..33946477 --- /dev/null +++ b/crates/snot/src/cannon/authorized.rs @@ -0,0 +1,61 @@ +use std::path::PathBuf; + +use anyhow::{ensure, Result}; +use tokio::process::Command; + +#[derive(Clone, Debug)] +pub enum Authorize { + TransferPublic { + private_key: String, + recipient: String, + amount: u64, + priority_fee: u64, + }, +} + +impl Authorize { + pub async fn run(self, bin: &PathBuf) -> Result { + let mut command = Command::new(bin); + command + .stdout(std::io::stdout()) + .stderr(std::io::stderr()) + .arg("aot") + .arg("authorize"); + + match self { + Self::TransferPublic { + private_key, + recipient, + amount, + priority_fee, + } => { + command + .arg("transfer-public") + .arg("--private-key") + .arg(private_key) + .arg("--recipient") + .arg(recipient) + .arg("--amount") + .arg(amount.to_string()) + .arg("--priority-fee") + .arg(priority_fee.to_string()); + } + } + + command.arg("--broadcast").arg("true"); + + let res = command.output().await?; + + let blob: serde_json::Value = serde_json::from_slice(&res.stdout)?; + + ensure!(blob.is_object(), "expected JSON object in response"); + ensure!( + blob.get("function").is_some() + && blob.get("fee").is_some() + && blob.get("broadcast").is_some(), + "expected function, fee, and broadcast fields in response" + ); + + Ok(blob) + } +} diff --git a/crates/snot/src/cannon/fs_drain.rs b/crates/snot/src/cannon/fs_drain.rs new file mode 100644 index 00000000..dbbefec0 --- /dev/null +++ b/crates/snot/src/cannon/fs_drain.rs @@ -0,0 +1,44 @@ +use std::{ + fs::File, + io::{BufRead, BufReader}, + sync::{Arc, Mutex}, +}; + +use anyhow::{bail, Result}; + +use crate::schema::storage::LoadedStorage; + +#[derive(Debug)] +pub struct TransactionDrain(Mutex>>); + +impl TransactionDrain { + /// Create a new transaction drain + pub fn new(storage: Arc, source: &str) -> Result { + let source = storage.path.join(source); + + let Ok(f) = File::open(&source) else { + bail!("error opening transaction source file: {source:?}"); + }; + + Ok(Self(Mutex::new(Some(BufReader::new(f))))) + } + + /// Read the next line from the transaction drain + pub fn next(&self) -> Result> { + let Ok(mut lock) = self.0.lock() else { + bail!("error locking transaction drain"); + }; + + if lock.is_none() { + return Ok(None); + } + + let mut buf = String::new(); + // read a line and clear the lock on EOF + if lock.as_mut().unwrap().read_line(&mut buf)? == 0 { + *lock = None; + return Ok(None); + } + Ok(Some(buf)) + } +} diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 9465bc2f..07badcd1 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -1,22 +1,30 @@ +pub mod authorized; +pub mod fs_drain; mod net; pub mod router; pub mod sink; pub mod source; use std::{ - collections::HashSet, - sync::{atomic::AtomicU32, Arc, Weak}, + collections::{HashSet, VecDeque}, + process::Stdio, + sync::{atomic::AtomicUsize, Arc, Weak}, }; -use anyhow::{bail, Result}; - +use anyhow::{bail, ensure, Result}; +use serde_json::json; use tokio::{ - sync::{mpsc::UnboundedSender, Mutex as AsyncMutex}, - task::AbortHandle, + process::Command, + sync::{mpsc::UnboundedSender, Mutex as AsyncMutex, OnceCell}, + task::{AbortHandle, JoinHandle}, }; use tracing::warn; -use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Environment}; +use crate::{ + cannon::source::{ComputeTarget, LedgerQueryService}, + state::GlobalState, + testing::Environment, +}; use self::{sink::TxSink, source::TxSource}; @@ -73,7 +81,24 @@ pub struct CannonInstance { /// channel to send transactions to the the task tx_sender: UnboundedSender, - fired_txs: AtomicU32, + fired_txs: AtomicUsize, +} + +async fn get_host(state: &GlobalState) -> Option { + static ONCE: OnceCell> = OnceCell::const_new(); + match state.cli.hostname.as_ref() { + Some(host) => Some(host.to_owned()), + None => ONCE + .get_or_init(|| async { + let sources: external_ip::Sources = external_ip::get_http_sources(); + let consensus = external_ip::ConsensusBuilder::new() + .add_sources(sources) + .build(); + consensus.get_consensus().await.map(|a| a.to_string()) + }) + .await + .to_owned(), + } } impl CannonInstance { @@ -83,63 +108,127 @@ impl CannonInstance { /// Locks the global state's tests and storage for reading. pub async fn new( global_state: Arc, + cannon_id: usize, env: Arc, source: TxSource, sink: TxSink, ) -> Result { - let env2 = env.clone(); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let tx_sender = tx.clone(); let query_port = source.get_query_port()?; - let source2 = source.clone(); - - let fired_txs = AtomicU32::new(0); - - let handle = tokio::spawn(async move { - // TODO: write tx to sink at desired rate - let _tx = rx.recv().await; - - // spawn child process for ledger service if the source is local - let _child = if let Some(_port) = query_port { - // TODO: spawn ledger service - // kill on drop - // LedgerQueryService::Local(qs).run(port) - - // generate the genesis block using the aot cli - // let bin = std::env::var("AOT_BIN").map(PathBuf::from).unwrap_or( - // PathBuf::from(env!("CARGO_MANIFEST_DIR")) - // .join("../../target/release/snarkos-aot"), - // ); - // Command::new() - todo!() - }; - - // if tx source is playback, read lines from the transaction file - if let TxSource::Playback { name: _name } = source2 { - // TODO: use the env.transaction_counters to determine which - // line to read from the file + let env2 = env.clone(); + let source2 = source.clone(); + let state = global_state.clone(); + + let fired_txs = AtomicUsize::new(0); + + // buffer for transactions + let tx_queue = VecDeque::::new(); + + // spawn child process for ledger service if the source is local + let mut child = if let Some(_port) = query_port { + // TODO: make a copy of this ledger dir to prevent locks + let child = Command::new(&env.aot_bin) + .kill_on_drop(true) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .arg("ledger") + .arg("-l") + .arg(env.storage.path.join("ledger")) + .arg("-g") + .arg(env.storage.path.join("genesis.block")) + .arg("query") + .spawn() + .map_err(|e| anyhow::anyhow!("error spawning query service: {e}"))?; + Some(child) + } else { + None + }; + + // when in playback mode, ensure the drain exists + let (drain, query_path) = match &source { + TxSource::Playback { name } => { + let drain = env.tx_drains.get(name).cloned(); + ensure!(drain.is_some(), "transaction drain not found: {name}"); + (drain, None) + } + TxSource::RealTime { compute, .. } => { + let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); + let query = match compute { + // agents already know the host of the control plane + ComputeTarget::AgentPool => suffix, + // demox needs to locate it + ComputeTarget::Demox { .. } => { + let Some(host) = get_host(&state).await else { + bail!("no --host configured for demox based cannon"); + }; + format!("http://{host}:{}{suffix}", global_state.cli.port) + } + }; + (None, Some(query)) } + }; + + let handle: JoinHandle> = tokio::spawn(async move { + // effectively make this be the source of pooling requests + + let gen_tx = || async { + match &source2 { + TxSource::Playback { .. } => { + // if tx source is playback, read lines from the transaction file + let Some(transaction) = drain.unwrap().next()? else { + bail!("source out of transactions") + }; + tx.send(transaction)?; + Ok(()) + } + TxSource::RealTime { compute, .. } => { + // TODO: if source is realtime, generate authorizations and + // send them to any available agent + + let auth = source2.get_auth(&env2)?.run(&env2.aot_bin).await?; + match compute { + ComputeTarget::AgentPool => { + todo!("find an agent, call the .execute_authorization api") + } + ComputeTarget::Demox { url } => { + let _body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "generateTransaction", + "params": { + "authorization": serde_json::to_string(&auth["authorization"])?, + "fee": serde_json::to_string(&auth["fee"])?, + "url": query_path, + "broadcast": true, + } + }); + + todo!("post on {url}") + } + } + } + } + }; - println!("{}", env2.storage.id); + // TODO: build a buffer deep enough to satisfy a few seconds of transactions + // TODO: as the buffer is drained, queue up more generated transactions // compare the tx id to an authorization id let _pending_txs = HashSet::::new(); // env2.storage.lookup_keysource_pk(key) - // TODO: if a local query service exists, spawn it here - // kill on drop - // TODO: determine the rate that transactions need to be created // based on the sink - // TODO: if source is realtime, generate authorizations and - // send them to any available agent + if let Some(mut child) = child.take() { + child.wait().await?; + } - std::future::pending::<()>().await + Ok(()) }); Ok(Self { diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index f55f78e1..4bf7be0f 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use serde::Deserialize; use snot_common::state::NodeKey; -use crate::schema::nodes::KeySource; +use crate::{schema::nodes::KeySource, testing::Environment}; -use super::net::get_available_port; +use super::{authorized::Authorize, net::get_available_port}; /// Represents an instance of a local query service. #[derive(Clone, Debug, Deserialize)] @@ -60,7 +60,7 @@ pub enum ComputeTarget { #[default] AgentPool, /// Use demox' API to generate executions - Demox, + Demox { url: String }, } #[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] @@ -117,4 +117,54 @@ impl TxSource { .then(|| get_available_port().ok_or(anyhow!("could not get an available port"))) .transpose() } + + pub fn get_auth(&self, env: &Environment) -> Result { + match self { + TxSource::RealTime { + tx_modes, + private_keys, + addresses, + .. + } => { + let sample_pk = || { + private_keys + .get(rand::random::() % private_keys.len()) + .and_then(|k| env.storage.sample_keysource_pk(k)) + .ok_or(bail!("error selecting a valid private key")) + }; + let sample_addr = || { + addresses + .get(rand::random::() % addresses.len()) + .and_then(|k| env.storage.sample_keysource_addr(k)) + .ok_or(bail!("error selecting a valid private key")) + }; + + let Some(mode) = tx_modes + .iter() + .nth(rand::random::() % tx_modes.len()) + else { + bail!("no tx modes available for this cannon instance??") + }; + + let auth = match mode { + TxMode::Credits(credit) => match credit { + CreditsTxMode::BondPublic => todo!(), + CreditsTxMode::UnbondPublic => todo!(), + CreditsTxMode::TransferPublic => Authorize::TransferPublic { + private_key: sample_pk()?, + recipient: sample_addr()?, + amount: 1, + priority_fee: 0, + }, + CreditsTxMode::TransferPublicToPrivate => todo!(), + CreditsTxMode::TransferPrivate => todo!(), + CreditsTxMode::TransferPrivateToPublic => todo!(), + }, + }; + + Ok(auth) + } + _ => Err(anyhow!("cannot authorize playback transactions")), + } + } } diff --git a/crates/snot/src/cli.rs b/crates/snot/src/cli.rs index 1a38f866..2c1dfd1a 100644 --- a/crates/snot/src/cli.rs +++ b/crates/snot/src/cli.rs @@ -11,4 +11,7 @@ pub struct Cli { #[arg(long, default_value = "snot-control-data")] /// Path to the directory containing the stored data pub path: PathBuf, + + #[arg(long)] + pub hostname: Option, } diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index 93056bab..6ae5fb26 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -374,4 +374,48 @@ impl LoadedStorage { KeySource::Named(_name, None) => None, } } + + pub fn sample_keysource_pk(&self, key: &KeySource) -> Option { + match key { + KeySource::Literal(pk) => Some(pk.clone()), + KeySource::Committee(Some(i)) => self.committee.get_index(*i).map(|(_, pk)| pk.clone()), + KeySource::Committee(None) => self + .committee + .values() + .nth(rand::random::() % self.committee.len()) + .cloned(), + KeySource::Named(name, Some(i)) => self + .accounts + .get(name) + .and_then(|a| a.get_index(*i).map(|(_, pk)| pk.clone())), + KeySource::Named(name, None) => self.accounts.get(name).and_then(|a| { + a.values() + .nth(rand::random::() % self.accounts.len()) + .cloned() + }), + } + } + + pub fn sample_keysource_addr(&self, key: &KeySource) -> Option { + match key { + KeySource::Literal(addr) => Some(addr.clone()), + KeySource::Committee(Some(i)) => { + self.committee.get_index(*i).map(|(addr, _)| addr.clone()) + } + KeySource::Committee(None) => self + .committee + .keys() + .nth(rand::random::() % self.committee.len()) + .cloned(), + KeySource::Named(name, Some(i)) => self + .accounts + .get(name) + .and_then(|a| a.get_index(*i).map(|(addr, _)| addr.clone())), + KeySource::Named(name, None) => self.accounts.get(name).and_then(|a| { + a.keys() + .nth(rand::random::() % self.accounts.len()) + .cloned() + }), + } + } } diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index fe9ff723..881d6f20 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -3,7 +3,7 @@ use std::{ fmt::Display, path::PathBuf, sync::{ - atomic::{AtomicU32, AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, }; @@ -17,7 +17,7 @@ use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ - cannon::{sink::TxSink, source::TxSource, CannonInstance}, + cannon::{fs_drain::TransactionDrain, sink::TxSink, source::TxSource, CannonInstance}, schema::{ nodes::{ExternalNode, Node}, storage::LoadedStorage, @@ -28,13 +28,14 @@ use crate::{ #[derive(Debug)] pub struct Environment { + pub id: usize, pub storage: Arc, pub node_map: BiMap, pub initial_nodes: IndexMap, pub aot_bin: PathBuf, /// Map of transaction files to their respective counters - pub transaction_counters: HashMap, + pub tx_drains: HashMap>, /// Map of cannon ids to their cannon configurations pub cannon_configs: HashMap, /// To help generate the id of the new cannon. @@ -93,6 +94,7 @@ impl Environment { let mut node_map = BiHashMap::default(); let mut initial_nodes = IndexMap::default(); let mut cannon_configs = HashMap::new(); + let mut tx_drains = HashMap::new(); for document in documents { match document { @@ -189,11 +191,26 @@ impl Environment { } } + let storage = storage.ok_or_else(|| anyhow!("env is missing storage document"))?; + + // review cannon configurations to ensure all playback sources have a real file + // backing them + for (source, _) in cannon_configs.values() { + if let TxSource::Playback { name } = source { + tx_drains.insert( + name.to_owned(), + Arc::new(TransactionDrain::new(storage.clone(), name)?), + ); + } + } + + let env_id = state.envs_counter.fetch_add(1, Ordering::Relaxed); let env = Environment { - storage: storage.ok_or_else(|| anyhow!("env is missing storage document"))?, + id: env_id, + storage, node_map, initial_nodes, - transaction_counters: Default::default(), + tx_drains, cannon_configs, cannons_counter: Default::default(), cannons: Default::default(), @@ -208,7 +225,6 @@ impl Environment { }, }; - let env_id = state.envs_counter.fetch_add(1, Ordering::Relaxed); state_lock.insert(env_id, Arc::new(env)); drop(state_lock); From 3116dc8cf7689043ab3cf366531889f8879b62b9 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Tue, 26 Mar 2024 13:41:03 -0500 Subject: [PATCH 2/8] fix(cannon): fix invalid aot args --- crates/snot/src/cannon/authorized.rs | 2 +- crates/snot/src/cannon/mod.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/snot/src/cannon/authorized.rs b/crates/snot/src/cannon/authorized.rs index 33946477..c1c8a83a 100644 --- a/crates/snot/src/cannon/authorized.rs +++ b/crates/snot/src/cannon/authorized.rs @@ -42,7 +42,7 @@ impl Authorize { } } - command.arg("--broadcast").arg("true"); + command.arg("--broadcast"); let res = command.output().await?; diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 1e8db9c6..fa5cd966 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -128,7 +128,7 @@ impl CannonInstance { let tx_queue = VecDeque::::new(); // spawn child process for ledger service if the source is local - let mut child = if let Some(_port) = query_port { + let mut child = if let Some(port) = query_port { // TODO: make a copy of this ledger dir to prevent locks let child = Command::new(&env.aot_bin) .kill_on_drop(true) @@ -140,6 +140,11 @@ impl CannonInstance { .arg("-g") .arg(env.storage.path.join("genesis.block")) .arg("query") + .arg("--port") + .arg(port.to_string()) + .arg("--bind") + .arg("127.0.0.1") // only bind to localhost as this is a private process + .arg("--readonly") .spawn() .map_err(|e| anyhow::anyhow!("error spawning query service: {e}"))?; Some(child) From de7a2832a4266506afeeeee6bf3b3da440e87e62 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Wed, 27 Mar 2024 01:29:47 -0500 Subject: [PATCH 3/8] feat(cannon): tx gen timing, tx writer --- crates/aot/src/ledger/truncate.rs | 24 ++-- crates/snot-agent/src/rpc.rs | 26 ++++ crates/snot-common/src/rpc/agent.rs | 3 + .../snot/src/cannon/{fs_drain.rs => file.rs} | 34 ++++- crates/snot/src/cannon/mod.rs | 124 +++++++++++++++--- crates/snot/src/cannon/sink.rs | 88 +++++++++++++ crates/snot/src/env/mod.rs | 62 ++++++--- crates/snot/src/state.rs | 38 +++++- 8 files changed, 350 insertions(+), 49 deletions(-) rename crates/snot/src/cannon/{fs_drain.rs => file.rs} (53%) diff --git a/crates/aot/src/ledger/truncate.rs b/crates/aot/src/ledger/truncate.rs index 263509c1..2dcda1d8 100644 --- a/crates/aot/src/ledger/truncate.rs +++ b/crates/aot/src/ledger/truncate.rs @@ -46,7 +46,7 @@ impl Truncate { for i in 1..target_height { let block = db_ledger.get_block(i)?; let buf = block.to_bytes_le()?; - tracing::info!("Writing block {i}... {}", buf.len()); + // println!("Writing block {i}... {}", buf.len()); unistd::write(&write_fd, &(buf.len() as u32).to_le_bytes())?; unistd::write(&write_fd, &buf)?; @@ -77,14 +77,22 @@ impl Truncate { while read < amount as usize { read += unistd::read(read_fd, &mut buf[read..])?; } - tracing::info!( - "Reading block {}... {}", - db_ledger.latest_height() + 1, - buf.len() - ); - let block = Block::from_bytes_le(&buf)?; - db_ledger.advance_to_next_block(&block)?; + if db_ledger.latest_height() + 1 != block.height() { + println!( + "Skipping block {}, waiting for {}", + block.height(), + db_ledger.latest_height() + 1, + ); + } else { + println!( + "Reading block {}... {}", + db_ledger.latest_height() + 1, + buf.len() + ); + + db_ledger.advance_to_next_block(&block)?; + } } unistd::close(read_fd.as_raw_fd())?; diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index 161db360..42ee9d5a 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -405,6 +405,32 @@ impl AgentService for AgentRpcServer { .map_err(|_| AgentError::FailedToParseJson) } + async fn broadcast_tx(self, _: context::Context, tx: String) -> Result<(), AgentError> { + if !matches!( + self.state.agent_state.read().await.deref(), + AgentState::Node(_, _) + ) { + return Err(AgentError::InvalidState); + } + + let url = format!( + "http://127.0.0.1:{}/mainnet/transaction/broadcast", + self.state.cli.rest + ); + let response = reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .body(tx) + .send() + .await + .map_err(|_| AgentError::FailedToMakeRequest)?; + if response.status().is_success() { + Ok(()) + } else { + Err(AgentError::FailedToMakeRequest) + } + } + async fn get_metric(self, _: context::Context, metric: AgentMetric) -> f64 { let metrics = self.state.metrics.read().await; diff --git a/crates/snot-common/src/rpc/agent.rs b/crates/snot-common/src/rpc/agent.rs index 32cc73b4..89f558e1 100644 --- a/crates/snot-common/src/rpc/agent.rs +++ b/crates/snot-common/src/rpc/agent.rs @@ -23,6 +23,9 @@ pub trait AgentService { /// Get the state root from the running node async fn get_state_root() -> Result; + /// Broadcast a transaction locally + async fn broadcast_tx(tx: String) -> Result<(), AgentError>; + /// Locally execute an authorization, using the given query /// environment id is passed so the agent can determine which aot binary to use async fn execute_authorization( diff --git a/crates/snot/src/cannon/fs_drain.rs b/crates/snot/src/cannon/file.rs similarity index 53% rename from crates/snot/src/cannon/fs_drain.rs rename to crates/snot/src/cannon/file.rs index dbbefec0..14e65073 100644 --- a/crates/snot/src/cannon/fs_drain.rs +++ b/crates/snot/src/cannon/file.rs @@ -1,6 +1,6 @@ use std::{ fs::File, - io::{BufRead, BufReader}, + io::{BufRead, BufReader, BufWriter, Write}, sync::{Arc, Mutex}, }; @@ -42,3 +42,35 @@ impl TransactionDrain { Ok(Some(buf)) } } + +#[derive(Debug)] +pub struct TransactionSink(Mutex>>); + +impl TransactionSink { + /// Create a new transaction sink + pub fn new(storage: Arc, target: &str) -> Result { + let target = storage.path.join(target); + + let Ok(f) = File::options().create(true).append(true).open(&target) else { + bail!("error opening transaction target file: {target:?}"); + }; + + Ok(Self(Mutex::new(Some(BufWriter::new(f))))) + } + + /// Write a line to the transaction sink + pub fn write(&self, line: &str) -> Result<()> { + let Ok(mut lock) = self.0.lock() else { + bail!("error locking transaction sink"); + }; + + if lock.is_none() { + return Ok(()); + } + + let writer = lock.as_mut().unwrap(); + writer.write_all(line.as_bytes())?; + writer.write_all(b"\n")?; + Ok(()) + } +} diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index fa5cd966..cb93a547 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -1,5 +1,5 @@ pub mod authorized; -pub mod fs_drain; +pub mod file; mod net; pub mod router; pub mod sink; @@ -13,6 +13,7 @@ use std::{ use anyhow::{bail, ensure, Result}; use serde_json::json; +use snot_common::state::{AgentPeer, AgentState}; use tokio::{ process::Command, sync::{mpsc::UnboundedSender, Mutex as AsyncMutex, OnceCell}, @@ -23,7 +24,7 @@ use tracing::warn; use self::{sink::TxSink, source::TxSource}; use crate::{ cannon::source::{ComputeTarget, LedgerQueryService}, - env::Environment, + env::{Environment, PortType}, state::GlobalState, }; @@ -78,6 +79,7 @@ pub struct CannonInstance { // TODO: run the actual cannon in this task task: AsyncMutex, + child: Option, /// channel to send transactions to the the task tx_sender: UnboundedSender, @@ -118,8 +120,9 @@ impl CannonInstance { let query_port = source.get_query_port()?; - let env2 = env.clone(); + let env2 = Arc::downgrade(&env); let source2 = source.clone(); + let sink2 = sink.clone(); let state = global_state.clone(); let fired_txs = AtomicUsize::new(0); @@ -128,7 +131,7 @@ impl CannonInstance { let tx_queue = VecDeque::::new(); // spawn child process for ledger service if the source is local - let mut child = if let Some(port) = query_port { + let child = if let Some(port) = query_port { // TODO: make a copy of this ledger dir to prevent locks let child = Command::new(&env.aot_bin) .kill_on_drop(true) @@ -153,11 +156,11 @@ impl CannonInstance { }; // when in playback mode, ensure the drain exists - let (drain, query_path) = match &source { + let (drain_pipe, query_path) = match &source { TxSource::Playback { name } => { - let drain = env.tx_drains.get(name).cloned(); - ensure!(drain.is_some(), "transaction drain not found: {name}"); - (drain, None) + let pipe = env.tx_pipe.drains.get(name).cloned(); + ensure!(pipe.is_some(), "transaction drain not found: {name}"); + (pipe, None) } TxSource::RealTime { compute, .. } => { let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); @@ -176,6 +179,15 @@ impl CannonInstance { } }; + let sink_pipe = match &sink { + TxSink::Record { name } => { + let pipe = env.tx_pipe.sinks.get(name).cloned(); + ensure!(pipe.is_some(), "transaction sink not found: {name}"); + pipe + } + _ => None, + }; + let handle: JoinHandle> = tokio::spawn(async move { // effectively make this be the source of pooling requests @@ -183,7 +195,7 @@ impl CannonInstance { match &source2 { TxSource::Playback { .. } => { // if tx source is playback, read lines from the transaction file - let Some(transaction) = drain.unwrap().next()? else { + let Some(transaction) = drain_pipe.unwrap().next()? else { bail!("source out of transactions") }; tx.send(transaction)?; @@ -193,10 +205,39 @@ impl CannonInstance { // TODO: if source is realtime, generate authorizations and // send them to any available agent - let auth = source2.get_auth(&env2)?.run(&env2.aot_bin).await?; + let env = env2 + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))?; + + let auth = source2.get_auth(&env)?.run(&env.aot_bin).await?; match compute { ComputeTarget::AgentPool => { - todo!("find an agent, call the .execute_authorization api") + // find a client, mark it as busy + let Some((client, _busy)) = + state.pool.read().await.values().find_map(|a| { + if !a.is_busy() + && matches!(a.state(), AgentState::Inventory) + { + a.client_owned().map(|c| (c, a.make_busy())) + } else { + None + } + }) + else { + bail!("no agents available to execute authorization") + }; + + // execute the authorization + client + .execute_authorization( + env2.upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))? + .id, + query_path.unwrap(), + serde_json::from_value(auth)?, + ) + .await?; + Ok(()) } ComputeTarget::Demox { url } => { let _body = json!({ @@ -206,7 +247,7 @@ impl CannonInstance { "params": { "authorization": serde_json::to_string(&auth["authorization"])?, "fee": serde_json::to_string(&auth["fee"])?, - "url": query_path, + "url": query_path.unwrap(), "broadcast": true, } }); @@ -218,21 +259,61 @@ impl CannonInstance { } }; - // TODO: build a buffer deep enough to satisfy a few seconds of transactions - // TODO: as the buffer is drained, queue up more generated transactions - // compare the tx id to an authorization id let _pending_txs = HashSet::::new(); - // env2.storage.lookup_keysource_pk(key) - - // TODO: determine the rate that transactions need to be created - // based on the sink + // build a timer that keeps track of the expected sink speed + let mut timer = sink2.timer(10000); - if let Some(mut child) = child.take() { - child.wait().await?; + loop { + tokio::select! { + _ = timer.next() => { + // gen_tx.clone()().await?; + todo!("queue a transaction generation") + } + Some(tx) = rx.recv() => { + match &sink2 { + TxSink::Record { .. } => { + sink_pipe.clone().unwrap().write(&tx)?; + } + TxSink::RealTime { target, .. } => { + let pool = state.pool.read().await; + let nodes = env2.upgrade().ok_or_else(|| anyhow::anyhow!("env dropped"))? + .matching_nodes(target, &pool, PortType::Rest) + .collect::>(); + let Some(node) = nodes.get(rand::random::() % nodes.len()) else { + bail!("no nodes available to broadcast transactions") + }; + match node { + AgentPeer::Internal(id, _) => { + let Some(client) = pool[id].client_owned() else { + bail!("target node was offline"); + }; + + client.broadcast_tx(tx).await?; + } + AgentPeer::External(addr) => { + let url = format!("http://{addr}/mainnet/transaction/broadcast"); + ensure!( + reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .body(tx) + .send() + .await? + .status() + .is_success(), + "failed to post transaction to external target node {addr}" + ); + } + } + } + } + } + } } + #[allow(unreachable_code)] Ok(()) }); @@ -243,6 +324,7 @@ impl CannonInstance { env: Arc::downgrade(&env), tx_sender, query_port, + child, task: AsyncMutex::new(handle.abort_handle()), fired_txs, }) diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 688618d4..4d45febc 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -1,3 +1,5 @@ +use std::{future, time::Duration}; + use serde::Deserialize; use crate::schema::NodeTargets; @@ -33,3 +35,89 @@ pub enum TxSink { tx_delay_ms: u32, }, } + +impl TxSink { + pub fn timer(&self, count: usize) -> Timer { + match self { + TxSink::Record { .. } => Timer { + state: TimerState::Active(0), + count, + burst_rate: Duration::from_secs(1), + burst_size: 1, + fire_rate: Duration::ZERO, + }, + TxSink::RealTime { + burst_delay_ms, + tx_per_burst, + tx_delay_ms, + .. + } => Timer { + state: TimerState::Active(*tx_per_burst as usize), + count, + burst_rate: Duration::from_millis(*burst_delay_ms as u64), + burst_size: *tx_per_burst, + fire_rate: Duration::from_millis(*tx_delay_ms as u64), + }, + } + } +} + +pub struct Timer { + count: usize, + burst_rate: Duration, + burst_size: u32, + fire_rate: Duration, + state: TimerState, +} + +enum TimerState { + /// wait the `fire_rate` duration + Active(usize), + /// wait the `burst_rate` duration + Waiting, + /// wait forever + Done, +} + +impl Timer { + /* + + example for burst 6, size 3, + wait is `=`, active is `-,` fire is `>` + [======>-->-->======>-->-->======] + + + */ + + pub async fn next(&mut self) { + self.state = match self.state { + TimerState::Active(remaining) => { + tokio::time::sleep(self.fire_rate).await; + + // we reach this point by having waited before, so we remove one + match remaining.saturating_sub(1) { + // if the count was 1, wait the full burst time + 0 => TimerState::Waiting, + // if the count was nonzero, wait at least 1 more fire time + n => TimerState::Active(n), + } + } + TimerState::Waiting => { + self.count.saturating_sub(1); + tokio::time::sleep(self.burst_rate).await; + match self.count { + // if count is empty, the next sleep will be permanent + 0 => TimerState::Done, + + _ => match self.burst_size { + // if the burst size is 0, do a full burst wait + 0 => TimerState::Waiting, + // if the burst size is nonzero, wait for the shorter burst latency + _ => TimerState::Active(self.burst_size as usize), + }, + } + } + TimerState::Done => future::pending().await, + }; + } +} diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index 173345f4..19796b8b 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -19,7 +19,12 @@ use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ - cannon::{fs_drain::TransactionDrain, sink::TxSink, source::TxSource, CannonInstance}, + cannon::{ + file::{TransactionDrain, TransactionSink}, + sink::TxSink, + source::TxSource, + CannonInstance, + }, schema::{ nodes::{ExternalNode, Node}, storage::LoadedStorage, @@ -38,7 +43,7 @@ pub struct Environment { pub aot_bin: PathBuf, /// Map of transaction files to their respective counters - pub tx_drains: HashMap>, + pub tx_pipe: TxPipes, /// Map of cannon ids to their cannon configurations pub cannon_configs: HashMap, /// To help generate the id of the new cannon. @@ -49,6 +54,12 @@ pub struct Environment { pub timeline: Vec, } +#[derive(Debug, Clone, Default)] +pub struct TxPipes { + pub drains: HashMap>, + pub sinks: HashMap>, +} + #[derive(Debug, Clone)] /// The effective test state of a node. pub enum EnvNode { @@ -74,6 +85,12 @@ impl Display for EnvPeer { } } +pub enum PortType { + Node, + Bft, + Rest, +} + impl Environment { /// Deserialize (YAML) many documents into a `Vec` of documents. pub fn deserialize(str: &str) -> Result, anyhow::Error> { @@ -101,7 +118,7 @@ impl Environment { let mut node_map = BiHashMap::default(); let mut initial_nodes = IndexMap::default(); let mut cannon_configs = HashMap::new(); - let mut tx_drains = HashMap::new(); + let mut tx_pipe = TxPipes::default(); let mut timeline = vec![]; for document in documents { @@ -207,15 +224,22 @@ impl Environment { let storage = storage.ok_or_else(|| anyhow!("env is missing storage document"))?; - // review cannon configurations to ensure all playback sources have a real file - // backing them - for (source, _) in cannon_configs.values() { + // review cannon configurations to ensure all playback sources and sinks + // have a real file backing them + for (source, sink) in cannon_configs.values() { if let TxSource::Playback { name } = source { - tx_drains.insert( + tx_pipe.drains.insert( name.to_owned(), Arc::new(TransactionDrain::new(storage.clone(), name)?), ); } + + if let TxSink::Record { name } = sink { + tx_pipe.sinks.insert( + name.to_owned(), + Arc::new(TransactionSink::new(storage.clone(), name)?), + ); + } } let env_id = ENVS_COUNTER.fetch_add(1, Ordering::Relaxed); @@ -224,7 +248,7 @@ impl Environment { storage, node_map, initial_nodes, - tx_drains, + tx_pipe, cannon_configs, cannons_counter: Default::default(), cannons: Default::default(), @@ -325,7 +349,7 @@ impl Environment { &'a self, targets: &'a NodeTargets, pool: &'a HashMap, - is_validator: bool, + port_type: PortType, ) -> impl Iterator + 'a { self.node_map .iter() @@ -338,9 +362,10 @@ impl Environment { Some(AgentPeer::Internal( *id, - match is_validator { - true => agent.bft_port(), - false => agent.node_port(), + match port_type { + PortType::Bft => agent.bft_port(), + PortType::Node => agent.node_port(), + PortType::Rest => agent.rest_port(), }, )) } @@ -350,9 +375,10 @@ impl Environment { return None; }; - Some(AgentPeer::External(match is_validator { - true => external.bft?, - false => external.node?, + Some(AgentPeer::External(match port_type { + PortType::Bft => external.bft?, + PortType::Node => external.node?, + PortType::Rest => external.rest?, })) } }) @@ -363,7 +389,7 @@ impl Environment { targets: &'a NodeTargets, pool: &'a HashMap, ) -> impl Iterator + 'a { - self.matching_nodes(targets, pool, false /* don't care about this */) + self.matching_nodes(targets, pool, PortType::Node) // ignore node type .filter_map(|agent_peer| match agent_peer { AgentPeer::Internal(id, _) => pool.get(&id), AgentPeer::External(_) => None, @@ -410,12 +436,12 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Re }; node_state.peers = env - .matching_nodes(&node.peers, &*pool_lock, false) + .matching_nodes(&node.peers, &*pool_lock, PortType::Node) .filter(not_me) .collect(); node_state.validators = env - .matching_nodes(&node.validators, &*pool_lock, true) + .matching_nodes(&node.validators, &*pool_lock, PortType::Bft) .filter(not_me) .collect(); diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index e636b55a..aea32426 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -54,11 +54,17 @@ pub struct Agent { connection: AgentConnection, state: AgentState, + busy: Arc, + /// The external address of the agent, along with its local addresses. ports: Option, addrs: Option, } +#[derive(Debug)] +/// Apparently `const* ()` is not send, so this is a workaround +pub struct Busy; + pub struct AgentClient(AgentServiceClient); impl Agent { @@ -68,6 +74,7 @@ impl Agent { Self { id, + busy: Arc::new(Busy), claims: Claims { id, nonce: *JWT_NONCE, @@ -92,11 +99,21 @@ impl Agent { external.is_some() || !internal.is_empty() } - /// Check if a test is inventory state + /// Check if an agent is in inventory state pub fn is_inventory(&self) -> bool { matches!(self.state, AgentState::Inventory) } + /// Check if a agent is working on an authorization + pub fn is_busy(&self) -> bool { + Arc::strong_count(&self.busy) > 1 + } + + /// Mark an agent as busy. This is used to prevent multiple authorizations + pub fn make_busy(&self) -> Arc { + Arc::clone(&self.busy) + } + /// The ID of this agent. pub fn id(&self) -> usize { self.id @@ -192,6 +209,25 @@ impl AgentClient { pub async fn get_state_root(&self) -> Result { Ok(self.0.get_state_root(context::current()).await??) } + + pub async fn execute_authorization( + &self, + env_id: usize, + query: String, + auth: String, + ) -> Result<()> { + self.0 + .execute_authorization(context::current(), env_id, query, auth) + .await? + .map_err(anyhow::Error::from) + } + + pub async fn broadcast_tx(&self, tx: String) -> Result<()> { + self.0 + .broadcast_tx(context::current(), tx) + .await? + .map_err(anyhow::Error::from) + } } #[derive(Debug, Clone)] From e2d3c10267b4c09f554fa787e6e7065bf1a923d0 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Wed, 27 Mar 2024 23:35:25 -0500 Subject: [PATCH 4/8] feat(cannon): rename some fields, make cannon types serialize --- crates/snot-common/src/state.rs | 9 ++++ crates/snot/src/cannon/mod.rs | 8 ++-- crates/snot/src/cannon/sink.rs | 40 +++++++++++++++-- crates/snot/src/cannon/source.rs | 63 ++++++++++++++++++++++---- crates/snot/src/env/mod.rs | 12 ++--- crates/snot/src/schema/mod.rs | 47 +++++++++++++++++++- crates/snot/src/schema/nodes.rs | 76 ++++++++++++++++++++------------ specs/test-cannon-record.yaml | 20 +++++++++ 8 files changed, 221 insertions(+), 54 deletions(-) create mode 100644 specs/test-cannon-record.yaml diff --git a/crates/snot-common/src/state.rs b/crates/snot-common/src/state.rs index 0fa663ef..b69c2566 100644 --- a/crates/snot-common/src/state.rs +++ b/crates/snot-common/src/state.rs @@ -199,3 +199,12 @@ impl Display for NodeKey { Ok(()) } } + +impl Serialize for NodeKey { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index cb93a547..88159b82 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -157,7 +157,7 @@ impl CannonInstance { // when in playback mode, ensure the drain exists let (drain_pipe, query_path) = match &source { - TxSource::Playback { name } => { + TxSource::Playback { file_name: name } => { let pipe = env.tx_pipe.drains.get(name).cloned(); ensure!(pipe.is_some(), "transaction drain not found: {name}"); (pipe, None) @@ -166,7 +166,7 @@ impl CannonInstance { let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); let query = match compute { // agents already know the host of the control plane - ComputeTarget::AgentPool => suffix, + ComputeTarget::Agent => suffix, // demox needs to locate it ComputeTarget::Demox { .. } => { let Some(host) = get_host(&state).await else { @@ -180,7 +180,7 @@ impl CannonInstance { }; let sink_pipe = match &sink { - TxSink::Record { name } => { + TxSink::Record { file_name: name } => { let pipe = env.tx_pipe.sinks.get(name).cloned(); ensure!(pipe.is_some(), "transaction sink not found: {name}"); pipe @@ -211,7 +211,7 @@ impl CannonInstance { let auth = source2.get_auth(&env)?.run(&env.aot_bin).await?; match compute { - ComputeTarget::AgentPool => { + ComputeTarget::Agent => { // find a client, mark it as busy let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 4d45febc..44723176 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -1,15 +1,17 @@ use std::{future, time::Duration}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::schema::NodeTargets; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", untagged)] pub enum TxSink { /// Write transactions to a file + #[serde(rename_all = "kebab-case")] Record { /// filename for the recording txs list - name: String, + file_name: String, }, //// Write transactions to a ledger query service // AoTAppend { @@ -21,6 +23,7 @@ pub enum TxSink { // tx_per_block: u32, // }, /// Send transactions to nodes in a env + #[serde(rename_all = "kebab-case")] RealTime { /// The nodes to send transactions to /// @@ -103,7 +106,7 @@ impl Timer { } } TimerState::Waiting => { - self.count.saturating_sub(1); + self.count = self.count.saturating_sub(1); tokio::time::sleep(self.burst_rate).await; match self.count { // if count is empty, the next sleep will be permanent @@ -121,3 +124,32 @@ impl Timer { }; } } + +// I use this to generate example yaml... +/* #[cfg(test)] +mod test { + use super::*; + use crate::schema::NodeTarget; + use std::str::FromStr; + + #[test] + fn what_does_it_look_like() { + println!( + "{}", + serde_yaml::to_string(&TxSink::Record { + file_name: "test".to_string(), + }) + .unwrap() + ); + println!( + "{}", + serde_yaml::to_string(&TxSink::RealTime { + target: NodeTargets::One(NodeTarget::from_str("validator/1").unwrap()), + burst_delay_ms: 5, + tx_per_burst: 5, + tx_delay_ms: 5 + }) + .unwrap() + ); + } +} */ diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 6e76821f..64232ac9 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use anyhow::{anyhow, bail, Result}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use snot_common::state::NodeKey; use crate::{env::Environment, schema::nodes::KeySource}; @@ -9,7 +9,7 @@ use crate::{env::Environment, schema::nodes::KeySource}; use super::{authorized::Authorize, net::get_available_port}; /// Represents an instance of a local query service. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct LocalQueryService { /// Ledger & genesis block to use // pub storage_id: usize, @@ -26,6 +26,7 @@ pub struct LocalQueryService { /// if the node is out of sync, it will corrupt the ledger... /// /// requires cannon to have an associated env_id + #[serde(skip_serializing_if = "Option::is_none")] pub sync_from: Option, } @@ -43,7 +44,8 @@ impl LocalQueryService { /// Used to determine the redirection for the following paths: /// /cannon//mainnet/latest/stateRoot /// /cannon//mainnet/transaction/broadcast -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", tag = "mode")] pub enum LedgerQueryService { /// Use the local ledger query service Local(LocalQueryService), @@ -54,16 +56,18 @@ pub enum LedgerQueryService { } /// Which service is providing the compute power for executing transactions -#[derive(Default, Clone, Debug, Deserialize)] +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] pub enum ComputeTarget { /// Use the agent pool to generate executions #[default] - AgentPool, + Agent, /// Use demox' API to generate executions Demox { url: String }, } -#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] pub enum CreditsTxMode { BondPublic, UnbondPublic, @@ -74,20 +78,24 @@ pub enum CreditsTxMode { TransferPrivateToPublic, } -#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] pub enum TxMode { Credits(CreditsTxMode), // TODO: Program(program, func, input types??) } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", untagged)] pub enum TxSource { /// Read transactions from a file + #[serde(rename_all = "kebab-case")] Playback { // filename from the storage for the tx list - name: String, + file_name: String, }, /// Generate transactions in real time + #[serde(rename_all = "kebab-case")] RealTime { query: LedgerQueryService, compute: ComputeTarget, @@ -168,3 +176,40 @@ impl TxSource { } } } + +// I use this to generate example yaml... +/* #[cfg(test)] +mod test { + use super::*; + use crate::{ + cannon::source::{ + ComputeTarget, CreditsTxMode, LedgerQueryService, LocalQueryService, TxMode, + }, + schema::nodes::KeySource, + }; + use std::str::FromStr; + + #[test] + fn what_does_it_look_like() { + println!( + "{}", + serde_yaml::to_string(&TxSource::Playback { + file_name: "test".to_string(), + }) + .unwrap() + ); + println!( + "{}", + serde_yaml::to_string(&TxSource::RealTime { + query: LedgerQueryService::Local(LocalQueryService { sync_from: None }), + compute: ComputeTarget::Agent, + tx_modes: [TxMode::Credits(CreditsTxMode::TransferPublic)] + .into_iter() + .collect(), + private_keys: vec![KeySource::from_str("committee.$").unwrap()], + addresses: vec![KeySource::from_str("committee.$").unwrap()], + }) + .unwrap() + ); + } +} */ diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index 19796b8b..0efc3526 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -227,17 +227,17 @@ impl Environment { // review cannon configurations to ensure all playback sources and sinks // have a real file backing them for (source, sink) in cannon_configs.values() { - if let TxSource::Playback { name } = source { + if let TxSource::Playback { file_name } = source { tx_pipe.drains.insert( - name.to_owned(), - Arc::new(TransactionDrain::new(storage.clone(), name)?), + file_name.to_owned(), + Arc::new(TransactionDrain::new(storage.clone(), file_name)?), ); } - if let TxSink::Record { name } = sink { + if let TxSink::Record { file_name } = sink { tx_pipe.sinks.insert( - name.to_owned(), - Arc::new(TransactionSink::new(storage.clone(), name)?), + file_name.to_owned(), + Arc::new(TransactionSink::new(storage.clone(), file_name)?), ); } } diff --git a/crates/snot/src/schema/mod.rs b/crates/snot/src/schema/mod.rs index 85d41d86..fe274d07 100644 --- a/crates/snot/src/schema/mod.rs +++ b/crates/snot/src/schema/mod.rs @@ -1,10 +1,11 @@ -use std::str::FromStr; +use std::{fmt::Display, str::FromStr}; use lazy_static::lazy_static; use regex::Regex; use serde::{ de::{Error, Visitor}, - Deserialize, + ser::SerializeSeq, + Deserialize, Serialize, }; use snot_common::state::{NodeKey, NodeType}; use wildmatch::WildMatch; @@ -100,6 +101,25 @@ lazy_static! { .unwrap(); } +impl Serialize for NodeTargets { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeTargets::None => serializer.serialize_seq(Some(0))?.end(), + NodeTargets::One(target) => serializer.serialize_str(&target.to_string()), + NodeTargets::Many(targets) => { + let mut seq = serializer.serialize_seq(Some(targets.len()))?; + for target in targets { + seq.serialize_element(&target.to_string())?; + } + seq.end() + } + } + } +} + /// A **single** matched node target. Use [`NodeTargets`] when deserializing /// from documents. #[derive(Debug, Clone)] @@ -156,6 +176,29 @@ impl FromStr for NodeTarget { } } +impl Display for NodeTarget { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/{}{}", + match self.ty { + NodeTargetType::All => "*".to_owned(), + NodeTargetType::One(ty) => ty.to_string(), + }, + match &self.id { + NodeTargetId::All => "*".to_owned(), + NodeTargetId::WildcardPattern(pattern) => pattern.to_string(), + NodeTargetId::Literal(id) => id.to_owned(), + }, + match &self.ns { + NodeTargetNamespace::All => "@*".to_owned(), + NodeTargetNamespace::Local => "".to_owned(), + NodeTargetNamespace::Literal(ns) => format!("@{}", ns), + } + ) + } +} + #[derive(Debug, Clone, Copy)] pub enum NodeTargetType { /// Matches all node types. diff --git a/crates/snot/src/schema/nodes.rs b/crates/snot/src/schema/nodes.rs index ceda118c..7fe25a8a 100644 --- a/crates/snot/src/schema/nodes.rs +++ b/crates/snot/src/schema/nodes.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{fmt::Display, net::SocketAddr, str::FromStr}; use indexmap::IndexMap; use lazy_static::lazy_static; @@ -100,22 +100,45 @@ impl<'de> Visitor<'de> for KeySourceVisitor { where E: serde::de::Error, { + KeySource::from_str(v).map_err(E::custom) + } +} + +impl<'de> Deserialize<'de> for KeySource { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_str(KeySourceVisitor) + } +} + +impl Serialize for KeySource { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&self.to_string()) + } +} + +impl FromStr for KeySource { + type Err = &'static str; + + fn from_str(s: &str) -> Result { // use KeySource::Literal(String) when the string is 59 characters long and starts with "APrivateKey1zkp" // use KeySource::Commitee(Option) when the string is "committee.0" or "committee.$" // use KeySource::Named(String, Option) when the string is "\w+.0" or "\w+.$" // aleo private key - if v.len() == 59 && v.starts_with("APrivateKey1") { - return Ok(KeySource::Literal(v.to_string())); + if s.len() == 59 && s.starts_with("APrivateKey1") { + return Ok(KeySource::Literal(s.to_string())); // committee key - } else if let Some(index) = v.strip_prefix("committee.") { + } else if let Some(index) = s.strip_prefix("committee.") { if index == "$" { return Ok(KeySource::Committee(None)); } let replica = index .parse() - .map_err(|_e| E::custom("committee index must be a positive number"))?; + .map_err(|_e| "committee index must be a positive number")?; return Ok(KeySource::Committee(Some(replica))); } @@ -125,42 +148,37 @@ impl<'de> Visitor<'de> for KeySourceVisitor { regex::Regex::new(r"^(?P\w+)\.(?P\d+|\$)$").unwrap(); } let groups = NAMED_KEYSOURCE_REGEX - .captures(v) - .ok_or_else(|| E::custom("invalid key source"))?; + .captures(s) + .ok_or("invalid key source")?; let name = groups.name("name").unwrap().as_str().to_string(); let idx = match groups.name("idx").unwrap().as_str() { "$" => None, idx => Some( idx.parse() - .map_err(|_e| E::custom("index must be a positive number"))?, + .map_err(|_e| "index must be a positive number")?, ), }; Ok(KeySource::Named(name, idx)) } } -impl<'de> Deserialize<'de> for KeySource { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_str(KeySourceVisitor) - } -} - -impl Serialize for KeySource { - fn serialize(&self, serializer: S) -> Result { - match self { - KeySource::Literal(key) => serializer.serialize_str(key), - KeySource::Committee(None) => serializer.serialize_str("committee.$"), - KeySource::Committee(Some(idx)) => { - serializer.serialize_str(&format!("committee.{}", idx)) +impl Display for KeySource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + KeySource::Literal(key) => key.to_owned(), + KeySource::Committee(None) => "committee.$".to_owned(), + KeySource::Committee(Some(idx)) => { + format!("committee.{}", idx) + } + KeySource::Named(name, None) => format!("{}.{}", name, "$"), + KeySource::Named(name, Some(idx)) => { + format!("{}.{}", name, idx) + } } - KeySource::Named(name, None) => serializer.serialize_str(&format!("{}.{}", name, "$")), - KeySource::Named(name, Some(idx)) => { - serializer.serialize_str(&format!("{}.{}", name, idx)) - } - } + ) } } diff --git a/specs/test-cannon-record.yaml b/specs/test-cannon-record.yaml new file mode 100644 index 00000000..2943627f --- /dev/null +++ b/specs/test-cannon-record.yaml @@ -0,0 +1,20 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: base +name: base-ledger + +--- +version: cannon.snarkos.testing.monadic.us/v1 + +name: commitee-tx-public + +source: + query: { mode: local } + compute: agent + tx-modes: [transfer-public] + private-keys: [committee.$] + addresses: [committee.$] + +sink: + file-name: txs.json From 09f0b587c9067fea0cd79abebc927a8b206b1ea5 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Thu, 28 Mar 2024 02:46:17 -0500 Subject: [PATCH 5/8] feat(cannon): WIP cannon timeline integration tokio extravaganza --- crates/snot-agent/src/main.rs | 4 +- crates/snot-agent/src/metrics/mod.rs | 2 +- crates/snot-cli/src/commands/env.rs | 6 +- crates/snot/src/cannon/file.rs | 3 + crates/snot/src/cannon/mod.rs | 458 ++++++++++++++++----------- crates/snot/src/cannon/router.rs | 6 +- crates/snot/src/cannon/sink.rs | 7 +- crates/snot/src/env/mod.rs | 16 +- crates/snot/src/env/timeline.rs | 123 +++++-- crates/snot/src/schema/timeline.rs | 4 +- crates/snot/src/server/mod.rs | 4 +- scripts/env_start.sh | 9 + specs/test-cannon-record.yaml | 12 +- 13 files changed, 423 insertions(+), 231 deletions(-) create mode 100755 scripts/env_start.sh diff --git a/crates/snot-agent/src/main.rs b/crates/snot-agent/src/main.rs index 8c40a7b2..099abb00 100644 --- a/crates/snot-agent/src/main.rs +++ b/crates/snot-agent/src/main.rs @@ -208,7 +208,7 @@ async fn main() { msg = server_response_out.recv() => { let msg = msg.expect("internal RPC channel closed"); let bin = bincode::serialize(&MuxedMessageOutgoing::Agent(msg)).expect("failed to serialize response"); - if let Err(_) = ws_stream.send(tungstenite::Message::Binary(bin)).await { + if (ws_stream.send(tungstenite::Message::Binary(bin)).await).is_err() { error!("The connection to the control plane was interrupted"); break 'event; } @@ -218,7 +218,7 @@ async fn main() { msg = client_request_out.recv() => { let msg = msg.expect("internal RPC channel closed"); let bin = bincode::serialize(&MuxedMessageOutgoing::Control(msg)).expect("failed to serialize request"); - if let Err(_) = ws_stream.send(tungstenite::Message::Binary(bin)).await { + if (ws_stream.send(tungstenite::Message::Binary(bin)).await).is_err() { error!("The connection to the control plane was interrupted"); break 'event; } diff --git a/crates/snot-agent/src/metrics/mod.rs b/crates/snot-agent/src/metrics/mod.rs index b6c119ee..b25f3065 100644 --- a/crates/snot-agent/src/metrics/mod.rs +++ b/crates/snot-agent/src/metrics/mod.rs @@ -67,7 +67,7 @@ pub fn init(state: Arc) { } /// Parse the metrics blob when scraping the snarkOS Prometheus exporter. -fn parse_metrics<'a>(source: &'a str) -> ParsedMetrics<'a> { +fn parse_metrics(source: &str) -> ParsedMetrics<'_> { source .split('\n') .map(str::trim) diff --git a/crates/snot-cli/src/commands/env.rs b/crates/snot-cli/src/commands/env.rs index 9507c9e2..f013b373 100644 --- a/crates/snot-cli/src/commands/env.rs +++ b/crates/snot-cli/src/commands/env.rs @@ -46,7 +46,7 @@ impl Env { let ep = format!("{}/api/v1/env/prepare", self.url); let file: String = std::fs::read_to_string(spec)?; - let id: Value = client.post(&ep).body(file).send()?.json()?; + let id: Value = client.post(ep).body(file).send()?.json()?; println!("{}", serde_json::to_string(&id)?); Ok(()) } @@ -54,14 +54,14 @@ impl Env { Start { id } => { let ep = format!("{}/api/v1/env/{id}", self.url); - client.post(&ep).send()?; + client.post(ep).send()?; Ok(()) } Stop { id } => { let ep = format!("{}/api/v1/env/{id}", self.url); - client.delete(&ep).send()?; + client.delete(ep).send()?; Ok(()) } } diff --git a/crates/snot/src/cannon/file.rs b/crates/snot/src/cannon/file.rs index 14e65073..3940b5b0 100644 --- a/crates/snot/src/cannon/file.rs +++ b/crates/snot/src/cannon/file.rs @@ -5,6 +5,7 @@ use std::{ }; use anyhow::{bail, Result}; +use tracing::debug; use crate::schema::storage::LoadedStorage; @@ -15,6 +16,7 @@ impl TransactionDrain { /// Create a new transaction drain pub fn new(storage: Arc, source: &str) -> Result { let source = storage.path.join(source); + debug!("opening tx drain @ {source:?}"); let Ok(f) = File::open(&source) else { bail!("error opening transaction source file: {source:?}"); @@ -50,6 +52,7 @@ impl TransactionSink { /// Create a new transaction sink pub fn new(storage: Arc, target: &str) -> Result { let target = storage.path.join(target); + debug!("opening tx sink @ {target:?}"); let Ok(f) = File::options().create(true).append(true).open(&target) else { bail!("error opening transaction target file: {target:?}"); diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 88159b82..3fb64376 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -6,20 +6,23 @@ pub mod sink; pub mod source; use std::{ - collections::{HashSet, VecDeque}, process::Stdio, - sync::{atomic::AtomicUsize, Arc, Weak}, + sync::{atomic::AtomicUsize, Arc, Mutex, Weak}, }; use anyhow::{bail, ensure, Result}; +use futures_util::{stream::FuturesUnordered, StreamExt}; use serde_json::json; use snot_common::state::{AgentPeer, AgentState}; use tokio::{ process::Command, - sync::{mpsc::UnboundedSender, Mutex as AsyncMutex, OnceCell}, - task::{AbortHandle, JoinHandle}, + sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + OnceCell, + }, + task::AbortHandle, }; -use tracing::warn; +use tracing::{debug, warn}; use self::{sink::TxSink, source::TxSource}; use crate::{ @@ -61,6 +64,7 @@ burst mode?? /// using the `TxSource` and `TxSink` for configuration. #[derive(Debug)] pub struct CannonInstance { + id: usize, // a copy of the global state global_state: Arc, @@ -78,12 +82,18 @@ pub struct CannonInstance { query_port: Option, // TODO: run the actual cannon in this task - task: AsyncMutex, + pub task: Mutex>, + + /// Child process must exist for the duration of the cannon instance. + /// This value is never used + #[allow(dead_code)] child: Option, /// channel to send transactions to the the task tx_sender: UnboundedSender, - fired_txs: AtomicUsize, + tx_receiver: Option>, + fired_txs: Arc, + tx_count: usize, } async fn get_host(state: &GlobalState) -> Option { @@ -110,25 +120,15 @@ impl CannonInstance { /// Locks the global state's tests and storage for reading. pub async fn new( global_state: Arc, - cannon_id: usize, + id: usize, env: Arc, source: TxSource, sink: TxSink, + count: usize, ) -> Result { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let tx_sender = tx.clone(); - + let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel(); let query_port = source.get_query_port()?; - - let env2 = Arc::downgrade(&env); - let source2 = source.clone(); - let sink2 = sink.clone(); - let state = global_state.clone(); - - let fired_txs = AtomicUsize::new(0); - - // buffer for transactions - let tx_queue = VecDeque::::new(); + let fired_txs = Arc::new(AtomicUsize::new(0)); // spawn child process for ledger service if the source is local let child = if let Some(port) = query_port { @@ -155,181 +155,57 @@ impl CannonInstance { None }; - // when in playback mode, ensure the drain exists - let (drain_pipe, query_path) = match &source { - TxSource::Playback { file_name: name } => { - let pipe = env.tx_pipe.drains.get(name).cloned(); - ensure!(pipe.is_some(), "transaction drain not found: {name}"); - (pipe, None) - } - TxSource::RealTime { compute, .. } => { - let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); - let query = match compute { - // agents already know the host of the control plane - ComputeTarget::Agent => suffix, - // demox needs to locate it - ComputeTarget::Demox { .. } => { - let Some(host) = get_host(&state).await else { - bail!("no --host configured for demox based cannon"); - }; - format!("http://{host}:{}{suffix}", global_state.cli.port) - } - }; - (None, Some(query)) - } - }; - - let sink_pipe = match &sink { - TxSink::Record { file_name: name } => { - let pipe = env.tx_pipe.sinks.get(name).cloned(); - ensure!(pipe.is_some(), "transaction sink not found: {name}"); - pipe - } - _ => None, - }; - - let handle: JoinHandle> = tokio::spawn(async move { - // effectively make this be the source of pooling requests - - let gen_tx = || async { - match &source2 { - TxSource::Playback { .. } => { - // if tx source is playback, read lines from the transaction file - let Some(transaction) = drain_pipe.unwrap().next()? else { - bail!("source out of transactions") - }; - tx.send(transaction)?; - Ok(()) - } - TxSource::RealTime { compute, .. } => { - // TODO: if source is realtime, generate authorizations and - // send them to any available agent - - let env = env2 - .upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))?; - - let auth = source2.get_auth(&env)?.run(&env.aot_bin).await?; - match compute { - ComputeTarget::Agent => { - // find a client, mark it as busy - let Some((client, _busy)) = - state.pool.read().await.values().find_map(|a| { - if !a.is_busy() - && matches!(a.state(), AgentState::Inventory) - { - a.client_owned().map(|c| (c, a.make_busy())) - } else { - None - } - }) - else { - bail!("no agents available to execute authorization") - }; - - // execute the authorization - client - .execute_authorization( - env2.upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))? - .id, - query_path.unwrap(), - serde_json::from_value(auth)?, - ) - .await?; - Ok(()) - } - ComputeTarget::Demox { url } => { - let _body = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "generateTransaction", - "params": { - "authorization": serde_json::to_string(&auth["authorization"])?, - "fee": serde_json::to_string(&auth["fee"])?, - "url": query_path.unwrap(), - "broadcast": true, - } - }); - - todo!("post on {url}") - } - } - } - } - }; - - // compare the tx id to an authorization id - let _pending_txs = HashSet::::new(); - - // build a timer that keeps track of the expected sink speed - let mut timer = sink2.timer(10000); - - loop { - tokio::select! { - _ = timer.next() => { - // gen_tx.clone()().await?; - todo!("queue a transaction generation") - } - Some(tx) = rx.recv() => { - match &sink2 { - TxSink::Record { .. } => { - sink_pipe.clone().unwrap().write(&tx)?; - } - TxSink::RealTime { target, .. } => { - let pool = state.pool.read().await; - let nodes = env2.upgrade().ok_or_else(|| anyhow::anyhow!("env dropped"))? - .matching_nodes(target, &pool, PortType::Rest) - .collect::>(); - let Some(node) = nodes.get(rand::random::() % nodes.len()) else { - bail!("no nodes available to broadcast transactions") - }; - match node { - AgentPeer::Internal(id, _) => { - let Some(client) = pool[id].client_owned() else { - bail!("target node was offline"); - }; - - client.broadcast_tx(tx).await?; - } - AgentPeer::External(addr) => { - let url = format!("http://{addr}/mainnet/transaction/broadcast"); - ensure!( - reqwest::Client::new() - .post(url) - .header("Content-Type", "application/json") - .body(tx) - .send() - .await? - .status() - .is_success(), - "failed to post transaction to external target node {addr}" - ); - } - } - } - } - } - } - } - - #[allow(unreachable_code)] - Ok(()) - }); - Ok(Self { + id, global_state, source, sink, env: Arc::downgrade(&env), tx_sender, + tx_receiver: Some(tx_receiver), query_port, child, - task: AsyncMutex::new(handle.abort_handle()), + task: Mutex::new(None), fired_txs, + tx_count: count, + }) + } + + pub fn ctx(&mut self) -> Result { + let Some(rx) = self.tx_receiver.take() else { + bail!("cannon already spawned") + }; + + Ok(ExecutionContext { + id: self.id, + env: self.env.clone(), + source: self.source.clone(), + sink: self.sink.clone(), + fired_txs: self.fired_txs.clone(), + state: self.global_state.clone(), + tx_count: self.tx_count, + tx: self.tx_sender.clone(), + rx, }) } + pub async fn spawn_local(&mut self) -> Result<()> { + let ctx = self.ctx()?; + + let handle = tokio::task::spawn_local(async move { ctx.spawn().await }); + self.task + .lock() + .as_mut() + .unwrap() + .replace(handle.abort_handle()); + + Ok(()) + } + + pub async fn spawn(&mut self) -> Result<()> { + self.ctx()?.spawn().await + } + /// Called by axum to forward /cannon//mainnet/latest/stateRoot /// to the ledger query service's /mainnet/latest/stateRoot pub async fn proxy_state_root(&self) -> Result { @@ -372,6 +248,7 @@ impl CannonInstance { pub fn proxy_broadcast(&self, body: String) -> Result<()> { match &self.source { TxSource::RealTime { .. } => { + debug!("received tx from broadcast path"); self.tx_sender.send(body)?; } TxSource::Playback { .. } => { @@ -385,6 +262,217 @@ impl CannonInstance { impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop - self.task.blocking_lock().abort(); + if let Ok(lock) = self.task.lock() { + if let Some(handle) = lock.as_ref() { + handle.abort(); + } + } + } +} + +/// Information a transaction cannon needs for execution via spawned task +pub struct ExecutionContext { + state: Arc, + /// The cannon's id + id: usize, + /// The environment associated with this cannon + env: Weak, + source: TxSource, + sink: TxSink, + fired_txs: Arc, + tx_count: usize, + tx: UnboundedSender, + rx: UnboundedReceiver, +} + +impl ExecutionContext { + pub async fn spawn(self) -> Result<()> { + let ExecutionContext { + id: cannon_id, + env: env2, + source, + sink, + fired_txs, + tx_count, + state, + tx, + mut rx, + } = self; + + debug!("spawning cannon {cannon_id}"); + + let Some(env) = env2.upgrade() else { + bail!("env dropped") + }; + + // when in playback mode, ensure the drain exists + let (drain_pipe, query_path) = match &source { + TxSource::Playback { file_name: name } => { + let pipe = env.tx_pipe.drains.get(name).cloned(); + ensure!(pipe.is_some(), "transaction drain not found: {name}"); + (pipe, None) + } + TxSource::RealTime { compute, .. } => { + let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); + let query = match compute { + // agents already know the host of the control plane + ComputeTarget::Agent => suffix, + // demox needs to locate it + ComputeTarget::Demox { .. } => { + let Some(host) = get_host(&state).await else { + bail!("no --host configured for demox based cannon"); + }; + format!("http://{host}:{}{suffix}", state.cli.port) + } + }; + debug!("using realtime query {query}"); + (None, Some(query)) + } + }; + + let sink_pipe = match &sink { + TxSink::Record { file_name: name } => { + let pipe = env.tx_pipe.sinks.get(name).cloned(); + ensure!(pipe.is_some(), "transaction sink not found: {name}"); + pipe + } + _ => None, + }; + + // effectively make this be the source of pooling requests + + let gen_tx = || async { + match &source { + TxSource::Playback { .. } => { + // if tx source is playback, read lines from the transaction file + let Some(transaction) = drain_pipe.unwrap().next()? else { + bail!("source out of transactions") + }; + tx.send(transaction)?; + Ok(()) + } + TxSource::RealTime { compute, .. } => { + // TODO: if source is realtime, generate authorizations and + // send them to any available agent + + let env = env2 + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))?; + + debug!("generating authorization..."); + let auth = source.get_auth(&env)?.run(&env.aot_bin).await?; + match compute { + ComputeTarget::Agent => { + // find a client, mark it as busy + let Some((client, _busy)) = + state.pool.read().await.values().find_map(|a| { + if !a.is_busy() && matches!(a.state(), AgentState::Inventory) { + a.client_owned().map(|c| (c, a.make_busy())) + } else { + None + } + }) + else { + bail!("no agents available to execute authorization") + }; + debug!("firing auth at agent"); + + // execute the authorization + client + .execute_authorization( + env2.upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))? + .id, + query_path.unwrap(), + serde_json::from_value(auth)?, + ) + .await?; + Ok(()) + } + ComputeTarget::Demox { url } => { + let _body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "generateTransaction", + "params": { + "authorization": serde_json::to_string(&auth["authorization"])?, + "fee": serde_json::to_string(&auth["fee"])?, + "url": query_path.unwrap(), + "broadcast": true, + } + }); + + todo!("post on {url}") + } + } + } + } + }; + + // build a timer that keeps track of the expected sink speed + let mut timer = sink.timer(tx_count); + + let mut container = FuturesUnordered::new(); + + loop { + tokio::select! { + Some(res) = container.next() => { + if let Err(e) = res { + warn!("transaction gen failed: {e}"); + } + }, + _ = timer.next() => { + debug!("queue new transaction"); + // queue a new transaction + container.push(gen_tx.clone()()); + } + Some(tx) = rx.recv() => { + let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if fired_count >= tx_count { + break; + } + match &sink { + TxSink::Record { .. } => { + debug!("writing tx to file"); + sink_pipe.clone().unwrap().write(&tx)?; + } + TxSink::RealTime { target, .. } => { + let pool = state.pool.read().await; + let nodes = env2.upgrade().ok_or_else(|| anyhow::anyhow!("env dropped"))? + .matching_nodes(target, &pool, PortType::Rest) + .collect::>(); + let Some(node) = nodes.get(rand::random::() % nodes.len()) else { + bail!("no nodes available to broadcast transactions") + }; + match node { + AgentPeer::Internal(id, _) => { + let Some(client) = pool[id].client_owned() else { + bail!("target node was offline"); + }; + + client.broadcast_tx(tx).await?; + } + AgentPeer::External(addr) => { + let url = format!("http://{addr}/mainnet/transaction/broadcast"); + ensure!( + reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .body(tx) + .send() + .await? + .status() + .is_success(), + "failed to post transaction to external target node {addr}" + ); + } + } + } + } + } + } + } + + Ok(()) } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index 851cdf5a..b6ccc1f2 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -26,7 +26,8 @@ async fn state_root( return StatusCode::NOT_FOUND.into_response(); }; - let Some(cannon) = env.cannons.get(&cannon_id) else { + let cannon_lock = env.cannons.read().await; + let Some(cannon) = cannon_lock.get(&cannon_id) else { return StatusCode::NOT_FOUND.into_response(); }; @@ -53,7 +54,8 @@ async fn transaction( return StatusCode::NOT_FOUND.into_response(); }; - let Some(cannon) = env.cannons.get(&cannon_id) else { + let cannon_lock = env.cannons.read().await; + let Some(cannon) = cannon_lock.get(&cannon_id) else { return StatusCode::NOT_FOUND.into_response(); }; diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 44723176..a00ef43b 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -85,10 +85,15 @@ enum TimerState { impl Timer { /* - example for burst 6, size 3, + example for burst 6, size 3 wait is `=`, active is `-,` fire is `>` [======>-->-->======>-->-->======] + example for burst 6, size 2 + [======>-->======>-->======] + + example for burst 6, size 1/0, + [======>======>======] */ diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index 4a1e5bac..b5c08f75 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -16,7 +16,10 @@ use futures_util::future::join_all; use indexmap::{map::Entry, IndexMap}; use serde::Deserialize; use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; -use tokio::{sync::Mutex, task::JoinHandle}; +use tokio::{ + sync::{Mutex, RwLock}, + task::JoinHandle, +}; use tracing::{info, warn}; use self::timeline::{reconcile_agents, ExecutionError}; @@ -51,7 +54,7 @@ pub struct Environment { /// To help generate the id of the new cannon. pub cannons_counter: AtomicUsize, /// Map of cannon ids to their cannon instances - pub cannons: HashMap, + pub cannons: Arc>>, pub timeline: Vec, pub timeline_handle: Mutex>>>, @@ -433,18 +436,15 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Re .as_ref() .and_then(|key| env.storage.lookup_keysource_pk(key)); - let not_me = |agent: &AgentPeer| match agent { - AgentPeer::Internal(candidate_id, _) if *candidate_id == id => false, - _ => true, - }; + let not_me = |agent: &AgentPeer| !matches!(agent, AgentPeer::Internal(candidate_id, _) if *candidate_id == id); node_state.peers = env - .matching_nodes(&node.peers, &*pool_lock, PortType::Node) + .matching_nodes(&node.peers, &pool_lock, PortType::Node) .filter(not_me) .collect(); node_state.validators = env - .matching_nodes(&node.validators, &*pool_lock, PortType::Bft) + .matching_nodes(&node.validators, &pool_lock, PortType::Bft) .filter(not_me) .collect(); diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index 7685a1de..c7b6d119 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -1,6 +1,6 @@ use std::{ collections::{hash_map::Entry, HashMap}, - sync::Arc, + sync::{atomic::Ordering, Arc}, }; use anyhow::bail; @@ -8,10 +8,15 @@ use futures_util::future::join_all; use snot_common::state::AgentState; use thiserror::Error; use tokio::{select, sync::RwLock, task::JoinError}; -use tracing::info; +use tracing::{debug, info, warn}; use super::Environment; use crate::{ + cannon::{ + sink::TxSink, + source::{LedgerQueryService, TxSource}, + CannonInstance, + }, schema::timeline::{Action, ActionInstance, EventDuration}, state::{Agent, AgentClient, AgentId, GlobalState}, }; @@ -24,6 +29,10 @@ pub enum ExecutionError { Reconcile(#[from] BatchReconcileError), #[error("join error: {0}")] Join(#[from] JoinError), + #[error("unknown cannon: {0}")] + UnknownCannon(String), + #[error("cannon error: {0}")] + Cannon(anyhow::Error), } /// The tuple to pass into `reconcile_agents`. @@ -43,8 +52,6 @@ pub async fn reconcile_agents( where I: Iterator, { - use tracing::{info, warn}; - let mut handles = vec![]; let mut agent_ids = vec![]; @@ -98,12 +105,17 @@ where } impl Environment { - pub async fn execute(state: Arc, id: usize) -> anyhow::Result<()> { - let env = Arc::clone(match state.envs.read().await.get(&id) { + pub async fn execute(state: Arc, env_id: usize) -> anyhow::Result<()> { + let env = Arc::clone(match state.envs.read().await.get(&env_id) { Some(env) => env, - None => bail!("no env with id {id}"), + None => bail!("no env with id {env_id}"), }); + info!( + "starting timeline playback for env {env_id} with {} events", + env.timeline.len() + ); + let handle_lock_env = Arc::clone(&env); let mut handle_lock = handle_lock_env.timeline_handle.lock().await; @@ -117,10 +129,12 @@ impl Environment { *handle_lock = Some(tokio::spawn(async move { for event in env.timeline.iter() { + debug!("next event in timeline {event:?}"); let pool = state.pool.read().await; // task handles that must be awaited for this timeline event - let mut awaiting_handles = vec![]; + let mut awaiting_handles: Vec>> = + vec![]; // add a duration sleep if a duration was specified if let Some(duration) = &event.duration { @@ -180,30 +194,89 @@ impl Environment { let online = matches!(action, Action::Online(_)); - for agent in env.matching_agents(targets, &*pool) { + for agent in env.matching_agents(targets, &pool) { set_node_field!(agent, online = online); } } - Action::Cannon(_) => unimplemented!(), + Action::Cannon(cannons) => { + debug!("enter cannon action, {} cannons", cannons.len()); + for cannon in cannons.iter() { + debug!("iter cannon {cannon:?} configs: {:?}", env.cannon_configs); + let cannon_id = env.cannons_counter.fetch_add(1, Ordering::Relaxed); + let Some((mut source, mut sink)) = + env.cannon_configs.get(&cannon.name).cloned() + else { + debug!("unknown cannon"); + return Err(ExecutionError::UnknownCannon(cannon.name.clone())); + }; + + // override the query and target if they are specified + if let (Some(q), TxSource::RealTime { query, .. }) = + (&cannon.query, &mut source) + { + *query = LedgerQueryService::Node(q.clone()); + }; + + if let (Some(t), TxSink::RealTime { target, .. }) = + (&cannon.target, &mut sink) + { + *target = t.clone(); + }; + let count = cannon.count; + + let mut instance = CannonInstance::new( + state.clone(), + cannon_id, + env.clone(), + source, + sink, + count, + ) + .await + .map_err(ExecutionError::Cannon)?; + + if *awaited { + // debug!("instance started await mode"); + awaiting_handles.push(tokio::task::spawn_local(async move { + debug!("spawn start"); + + instance.spawn().await.map_err(ExecutionError::Cannon)?; + debug!("spawn complete"); + Ok::<_, ExecutionError>(()) + })); + todo!() + } else { + instance + .spawn_local() + .await + .map_err(ExecutionError::Cannon)?; + env.cannons.write().await.insert(cannon_id, instance); + } + } + + todo!() + } Action::Height(_) => unimplemented!(), }; } drop(pool); - let task_state = Arc::clone(&state); - let reconcile_handle = tokio::spawn(async move { - reconcile_agents( - pending_reconciliations.into_iter().map(|(_, v)| v), - &task_state.pool, - ) - .await - }); - - // await the reconciliation if any of the actions were `.await` - if reconcile_async { - awaiting_handles.push(reconcile_handle); + // if there are any pending reconciliations, + if !pending_reconciliations.is_empty() { + // reconcile all nodes + let task_state = Arc::clone(&state); + let reconcile_handle = tokio::spawn(async move { + reconcile_agents(pending_reconciliations.into_values(), &task_state.pool) + .await?; + Ok(()) + }); + + // await the reconciliation if any of the actions were `.await` + if reconcile_async { + awaiting_handles.push(reconcile_handle); + } } let handles_fut = join_all(awaiting_handles.into_iter()); @@ -212,8 +285,8 @@ impl Environment { let handles_result = match &event.timeout { // apply a timeout to `handles_fut` Some(timeout) => match timeout { - EventDuration::Time(duration) => select! { - _ = tokio::time::sleep(*duration) => continue, + EventDuration::Time(timeout_duration) => select! { + _ = tokio::time::sleep(*timeout_duration) => continue, res = handles_fut => res, }, @@ -227,7 +300,7 @@ impl Environment { for result in handles_result.into_iter() { match result { Ok(Ok(())) => (), - Ok(Err(e)) => return Err(ExecutionError::Reconcile(e)), + Ok(e) => return e, Err(e) => return Err(ExecutionError::Join(e)), } } diff --git a/crates/snot/src/schema/timeline.rs b/crates/snot/src/schema/timeline.rs index cdd76626..6e15e2b0 100644 --- a/crates/snot/src/schema/timeline.rs +++ b/crates/snot/src/schema/timeline.rs @@ -12,6 +12,8 @@ use super::NodeTargets; /// A document describing a test's event timeline. #[derive(Deserialize, Debug, Clone)] pub struct Document { + pub name: String, + pub description: Option, pub timeline: Vec, } @@ -180,7 +182,7 @@ impl<'de> Deserialize<'de> for EventDuration { #[derive(Deserialize, Debug, Clone)] pub struct SpawnCannon { pub name: String, - pub count: u64, + pub count: usize, /// overwrite the query's source node pub query: Option, /// overwrite the cannon sink target diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index 96c4c308..36b931e3 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -225,7 +225,7 @@ async fn handle_socket(mut socket: WebSocket, headers: HeaderMap, state: AppStat msg = client_request_out.recv() => { let msg = msg.expect("internal RPC channel closed"); let bin = bincode::serialize(&MuxedMessageOutgoing::Agent(msg)).expect("failed to serialize request"); - if let Err(_) = socket.send(Message::Binary(bin)).await { + if (socket.send(Message::Binary(bin)).await).is_err() { break; } } @@ -234,7 +234,7 @@ async fn handle_socket(mut socket: WebSocket, headers: HeaderMap, state: AppStat msg = server_response_out.recv() => { let msg = msg.expect("internal RPC channel closed"); let bin = bincode::serialize(&MuxedMessageOutgoing::Control(msg)).expect("failed to serialize response"); - if let Err(_) = socket.send(Message::Binary(bin)).await { + if (socket.send(Message::Binary(bin)).await).is_err() { break; } } diff --git a/scripts/env_start.sh b/scripts/env_start.sh new file mode 100755 index 00000000..bea687b2 --- /dev/null +++ b/scripts/env_start.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +# check if index is set +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +curl -v -X POST http://localhost:1234/api/v1/env/$1 diff --git a/specs/test-cannon-record.yaml b/specs/test-cannon-record.yaml index 2943627f..1c4458a9 100644 --- a/specs/test-cannon-record.yaml +++ b/specs/test-cannon-record.yaml @@ -7,7 +7,7 @@ name: base-ledger --- version: cannon.snarkos.testing.monadic.us/v1 -name: commitee-tx-public +name: committee-tx-public source: query: { mode: local } @@ -18,3 +18,13 @@ source: sink: file-name: txs.json + +--- +version: timeline.snarkos.testing.monadic.us/v1 + +name: tx-local + +timeline: + - cannon.await: + - name: committee-tx-public + count: 5 From 111bda1bd9fb93d54cfcf692dfebc00f029646db Mon Sep 17 00:00:00 2001 From: Meshiest Date: Thu, 28 Mar 2024 20:41:25 -0500 Subject: [PATCH 6/8] feat(cannon): functional agent source, file sink cannon --- crates/snot-agent/src/rpc.rs | 17 ++++-- crates/snot/Cargo.toml | 2 +- crates/snot/src/cannon/authorized.rs | 13 +++-- crates/snot/src/cannon/file.rs | 1 + crates/snot/src/cannon/mod.rs | 86 +++++++++++++--------------- crates/snot/src/cannon/router.rs | 50 ++++++++++++---- crates/snot/src/cannon/sink.rs | 41 ++++++++++--- crates/snot/src/cannon/source.rs | 2 +- crates/snot/src/env/mod.rs | 3 +- crates/snot/src/env/timeline.rs | 23 ++++---- crates/snot/src/main.rs | 3 + scripts/probe.mjs | 43 ++++++++++++++ specs/test-cannon-record.yaml | 3 +- 13 files changed, 198 insertions(+), 89 deletions(-) create mode 100755 scripts/probe.mjs diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index 42ee9d5a..39a16d41 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -449,20 +449,27 @@ impl AgentService for AgentRpcServer { info!("executing authorization..."); // TODO: ensure binary associated with this env_id is present - let res = Command::new(self.state.cli.path.join(SNARKOS_FILE)) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) + let res = Command::new(dbg!(self.state.cli.path.join(SNARKOS_FILE))) + .stdout(std::io::stdout()) + .stderr(std::io::stderr()) .arg("execute") .arg("--query") .arg(&format!("http://{}{query}", self.state.endpoint)) .arg(auth) .spawn() - .map_err(|_| AgentError::FailedToSpawnProcess)? + .map_err(|e| { + warn!("failed to spawn auth exec process: {e}"); + AgentError::FailedToSpawnProcess + })? .wait() .await - .map_err(|_| AgentError::ProcessFailed)?; + .map_err(|e| { + warn!("auth exec process failed: {e}"); + AgentError::ProcessFailed + })?; if !res.success() { + warn!("auth exec process exited with status: {res}"); return Err(AgentError::ProcessFailed); } Ok(()) diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 6e24068c..b3b9088a 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -19,7 +19,7 @@ lazy_static.workspace = true rand.workspace = true rand_chacha.workspace = true regex.workspace = true -reqwest = { workspace = true, features = ["stream"] } +reqwest = { workspace = true, features = ["stream", "json"] } serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/crates/snot/src/cannon/authorized.rs b/crates/snot/src/cannon/authorized.rs index c1c8a83a..5d7580f3 100644 --- a/crates/snot/src/cannon/authorized.rs +++ b/crates/snot/src/cannon/authorized.rs @@ -19,7 +19,6 @@ impl Authorize { command .stdout(std::io::stdout()) .stderr(std::io::stderr()) - .arg("aot") .arg("authorize"); match self { @@ -46,13 +45,19 @@ impl Authorize { let res = command.output().await?; + if !res.status.success() { + return Err(anyhow::anyhow!( + "command failed with status {}: {}", + res.status, + String::from_utf8_lossy(&res.stderr) + )); + } + let blob: serde_json::Value = serde_json::from_slice(&res.stdout)?; ensure!(blob.is_object(), "expected JSON object in response"); ensure!( - blob.get("function").is_some() - && blob.get("fee").is_some() - && blob.get("broadcast").is_some(), + blob.get("function").is_some() && blob.get("broadcast").is_some(), "expected function, fee, and broadcast fields in response" ); diff --git a/crates/snot/src/cannon/file.rs b/crates/snot/src/cannon/file.rs index 3940b5b0..aebec2e2 100644 --- a/crates/snot/src/cannon/file.rs +++ b/crates/snot/src/cannon/file.rs @@ -74,6 +74,7 @@ impl TransactionSink { let writer = lock.as_mut().unwrap(); writer.write_all(line.as_bytes())?; writer.write_all(b"\n")?; + writer.flush()?; Ok(()) } } diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 3fb64376..69513a61 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -7,22 +7,19 @@ pub mod source; use std::{ process::Stdio, - sync::{atomic::AtomicUsize, Arc, Mutex, Weak}, + sync::{atomic::AtomicUsize, Arc, OnceLock, Weak}, }; use anyhow::{bail, ensure, Result}; use futures_util::{stream::FuturesUnordered, StreamExt}; use serde_json::json; -use snot_common::state::{AgentPeer, AgentState}; +use snot_common::state::AgentPeer; use tokio::{ process::Command, - sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - OnceCell, - }, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::AbortHandle, }; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use self::{sink::TxSink, source::TxSource}; use crate::{ @@ -82,7 +79,7 @@ pub struct CannonInstance { query_port: Option, // TODO: run the actual cannon in this task - pub task: Mutex>, + pub task: Option, /// Child process must exist for the duration of the cannon instance. /// This value is never used @@ -96,20 +93,20 @@ pub struct CannonInstance { tx_count: usize, } +#[tokio::main] +async fn get_external_ip() -> Option { + let sources: external_ip::Sources = external_ip::get_http_sources(); + let consensus = external_ip::ConsensusBuilder::new() + .add_sources(sources) + .build(); + consensus.get_consensus().await.map(|s| s.to_string()) +} + async fn get_host(state: &GlobalState) -> Option { - static ONCE: OnceCell> = OnceCell::const_new(); + static ONCE: OnceLock> = OnceLock::new(); match state.cli.hostname.as_ref() { Some(host) => Some(host.to_owned()), - None => ONCE - .get_or_init(|| async { - let sources: external_ip::Sources = external_ip::get_http_sources(); - let consensus = external_ip::ConsensusBuilder::new() - .add_sources(sources) - .build(); - consensus.get_consensus().await.map(|a| a.to_string()) - }) - .await - .to_owned(), + None => ONCE.get_or_init(get_external_ip).to_owned(), } } @@ -165,7 +162,7 @@ impl CannonInstance { tx_receiver: Some(tx_receiver), query_port, child, - task: Mutex::new(None), + task: None, fired_txs, tx_count: count, }) @@ -192,12 +189,8 @@ impl CannonInstance { pub async fn spawn_local(&mut self) -> Result<()> { let ctx = self.ctx()?; - let handle = tokio::task::spawn_local(async move { ctx.spawn().await }); - self.task - .lock() - .as_mut() - .unwrap() - .replace(handle.abort_handle()); + let handle = tokio::task::spawn(async move { ctx.spawn().await }); + self.task = Some(handle.abort_handle()); Ok(()) } @@ -262,10 +255,9 @@ impl CannonInstance { impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop - if let Ok(lock) = self.task.lock() { - if let Some(handle) = lock.as_ref() { - handle.abort(); - } + debug!("dropping cannon {}", self.id); + if let Some(handle) = self.task.take() { + handle.abort(); } } } @@ -299,11 +291,12 @@ impl ExecutionContext { mut rx, } = self; - debug!("spawning cannon {cannon_id}"); - let Some(env) = env2.upgrade() else { bail!("env dropped") }; + let env_id = env.id; + + trace!("cannon {env_id}/{cannon_id} spawned"); // when in playback mode, ensure the drain exists let (drain_pipe, query_path) = match &source { @@ -325,15 +318,15 @@ impl ExecutionContext { format!("http://{host}:{}{suffix}", state.cli.port) } }; - debug!("using realtime query {query}"); + trace!("cannon {cannon_id}/{env_id} using realtime query {query}"); (None, Some(query)) } }; let sink_pipe = match &sink { - TxSink::Record { file_name: name } => { - let pipe = env.tx_pipe.sinks.get(name).cloned(); - ensure!(pipe.is_some(), "transaction sink not found: {name}"); + TxSink::Record { file_name, .. } => { + let pipe = env.tx_pipe.sinks.get(file_name).cloned(); + ensure!(pipe.is_some(), "transaction sink not found: {file_name}"); pipe } _ => None, @@ -359,14 +352,14 @@ impl ExecutionContext { .upgrade() .ok_or_else(|| anyhow::anyhow!("env dropped"))?; - debug!("generating authorization..."); + trace!("cannon {cannon_id}/{env_id} generating authorization..."); let auth = source.get_auth(&env)?.run(&env.aot_bin).await?; match compute { ComputeTarget::Agent => { // find a client, mark it as busy let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { - if !a.is_busy() && matches!(a.state(), AgentState::Inventory) { + if !a.is_busy() && a.is_inventory() { a.client_owned().map(|c| (c, a.make_busy())) } else { None @@ -375,7 +368,6 @@ impl ExecutionContext { else { bail!("no agents available to execute authorization") }; - debug!("firing auth at agent"); // execute the authorization client @@ -384,9 +376,10 @@ impl ExecutionContext { .ok_or_else(|| anyhow::anyhow!("env dropped"))? .id, query_path.unwrap(), - serde_json::from_value(auth)?, + serde_json::to_string(&auth)?, ) .await?; + Ok(()) } ComputeTarget::Demox { url } => { @@ -418,19 +411,16 @@ impl ExecutionContext { tokio::select! { Some(res) = container.next() => { if let Err(e) = res { + timer.undo(); warn!("transaction gen failed: {e}"); } }, _ = timer.next() => { - debug!("queue new transaction"); // queue a new transaction container.push(gen_tx.clone()()); } Some(tx) = rx.recv() => { - let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if fired_count >= tx_count { - break; - } + let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; match &sink { TxSink::Record { .. } => { debug!("writing tx to file"); @@ -469,6 +459,12 @@ impl ExecutionContext { } } } + + if fired_count >= tx_count { + debug!("finished firing txs"); + break; + } + debug!("fired {fired_count}/{tx_count} txs"); } } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index b6ccc1f2..b24ffffb 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use axum::{ extract::{Path, State}, response::{IntoResponse, Response}, @@ -23,23 +25,51 @@ async fn state_root( let env = state.envs.read().await; env.get(&env_id).cloned() }) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); }; let cannon_lock = env.cannons.read().await; let Some(cannon) = cannon_lock.get(&cannon_id) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); }; - match cannon.proxy_state_root().await { - // the nodes expect this state root to be string escaped json - Ok(root) => Json(json!(root)).into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("{e}")})), - ) - .into_response(), + // TODO: lock this with a mutex or something so that multiple route callers can't bombard the cannon with proxy_state_root call attempts + let mut attempts = 0; + loop { + attempts += 1; + match cannon.proxy_state_root().await { + Ok(root) => break Json(root).into_response(), + + Err(e) if attempts > 5 => { + break ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": "non-responsive query node", "inner": format!("{e}") })), + ) + .into_response() + } + + _ => attempts += 1, + } + tokio::time::sleep(Duration::from_secs(1)).await; } + + // match cannon.proxy_state_root().await { + // // the nodes expect this state root to be string escaped json + // Ok(root) => Json(root).into_response(), + // Err(e) => ( + // StatusCode::INTERNAL_SERVER_ERROR, + // Json(json!({ "error": format!("{e}")})), + // ) + // .into_response(), + // } } async fn transaction( diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index a00ef43b..0ce79881 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -1,9 +1,14 @@ use std::{future, time::Duration}; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; use crate::schema::NodeTargets; +fn one_thousand() -> u32 { + 1000 +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case", untagged)] pub enum TxSink { @@ -12,6 +17,8 @@ pub enum TxSink { Record { /// filename for the recording txs list file_name: String, + #[serde(default = "one_thousand")] + tx_request_delay_ms: u32, }, //// Write transactions to a ledger query service // AoTAppend { @@ -42,10 +49,14 @@ pub enum TxSink { impl TxSink { pub fn timer(&self, count: usize) -> Timer { match self { - TxSink::Record { .. } => Timer { - state: TimerState::Active(0), + TxSink::Record { + tx_request_delay_ms, + .. + } => Timer { + last_shot: Instant::now(), + state: TimerState::Waiting, count, - burst_rate: Duration::from_secs(1), + burst_rate: Duration::from_millis(*tx_request_delay_ms as u64), burst_size: 1, fire_rate: Duration::ZERO, }, @@ -55,7 +66,8 @@ impl TxSink { tx_delay_ms, .. } => Timer { - state: TimerState::Active(*tx_per_burst as usize), + last_shot: Instant::now(), + state: TimerState::Waiting, count, burst_rate: Duration::from_millis(*burst_delay_ms as u64), burst_size: *tx_per_burst, @@ -71,8 +83,10 @@ pub struct Timer { burst_size: u32, fire_rate: Duration, state: TimerState, + last_shot: Instant, } +#[derive(Debug)] enum TimerState { /// wait the `fire_rate` duration Active(usize), @@ -97,10 +111,19 @@ impl Timer { */ + pub fn undo(&mut self) { + self.count += 1; + if matches!(self.state, TimerState::Done) { + self.state = TimerState::Waiting; + } + } + pub async fn next(&mut self) { self.state = match self.state { TimerState::Active(remaining) => { - tokio::time::sleep(self.fire_rate).await; + tokio::time::sleep_until(self.last_shot + self.fire_rate).await; + self.last_shot = Instant::now(); + self.count = self.count.saturating_sub(1); // we reach this point by having waited before, so we remove one match remaining.saturating_sub(1) { @@ -111,17 +134,19 @@ impl Timer { } } TimerState::Waiting => { + tokio::time::sleep_until(self.last_shot + self.burst_rate).await; + self.last_shot = Instant::now(); self.count = self.count.saturating_sub(1); - tokio::time::sleep(self.burst_rate).await; + match self.count { // if count is empty, the next sleep will be permanent 0 => TimerState::Done, - _ => match self.burst_size { + _ => match self.burst_size.saturating_sub(1) { // if the burst size is 0, do a full burst wait 0 => TimerState::Waiting, // if the burst size is nonzero, wait for the shorter burst latency - _ => TimerState::Active(self.burst_size as usize), + shots => TimerState::Active((shots as usize).min(self.count)), }, } } diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 64232ac9..af91470d 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -37,7 +37,7 @@ impl LocalQueryService { pub async fn get_state_root(&self, port: u16) -> Result { let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", port); let response = reqwest::get(&url).await?; - Ok(response.text().await?) + Ok(response.json().await?) } } diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index b5c08f75..258ba3cb 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -240,7 +240,7 @@ impl Environment { ); } - if let TxSink::Record { file_name } = sink { + if let TxSink::Record { file_name, .. } = sink { tx_pipe.sinks.insert( file_name.to_owned(), Arc::new(TransactionSink::new(storage.clone(), file_name)?), @@ -271,7 +271,6 @@ impl Environment { timeline_handle: Default::default(), }; - let env_id = ENVS_COUNTER.fetch_add(1, Ordering::Relaxed); state_lock.insert(env_id, Arc::new(env)); drop(state_lock); diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index c7b6d119..b65e6ad3 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -200,14 +200,11 @@ impl Environment { } Action::Cannon(cannons) => { - debug!("enter cannon action, {} cannons", cannons.len()); for cannon in cannons.iter() { - debug!("iter cannon {cannon:?} configs: {:?}", env.cannon_configs); let cannon_id = env.cannons_counter.fetch_add(1, Ordering::Relaxed); let Some((mut source, mut sink)) = env.cannon_configs.get(&cannon.name).cloned() else { - debug!("unknown cannon"); return Err(ExecutionError::UnknownCannon(cannon.name.clone())); }; @@ -237,25 +234,27 @@ impl Environment { .map_err(ExecutionError::Cannon)?; if *awaited { + let ctx = instance.ctx().unwrap(); + let env = env.clone(); + // debug!("instance started await mode"); - awaiting_handles.push(tokio::task::spawn_local(async move { - debug!("spawn start"); + awaiting_handles.push(tokio::task::spawn(async move { + let res = ctx.spawn().await; - instance.spawn().await.map_err(ExecutionError::Cannon)?; - debug!("spawn complete"); - Ok::<_, ExecutionError>(()) + // remove the cannon after the task is complete + env.cannons.write().await.remove(&cannon_id); + res.map_err(ExecutionError::Cannon) })); - todo!() } else { instance .spawn_local() .await .map_err(ExecutionError::Cannon)?; - env.cannons.write().await.insert(cannon_id, instance); } - } - todo!() + // insert the cannon + env.cannons.write().await.insert(cannon_id, instance); + } } Action::Height(_) => unimplemented!(), }; diff --git a/crates/snot/src/main.rs b/crates/snot/src/main.rs index 924c6de7..2be2eb3d 100644 --- a/crates/snot/src/main.rs +++ b/crates/snot/src/main.rs @@ -22,6 +22,9 @@ async fn main() { let env_filter = env_filter .parse_lossy("") + .add_directive("hyper_util=off".parse().unwrap()) + .add_directive("hyper=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()) .add_directive("surrealdb_core=off".parse().unwrap()) .add_directive("surrealdb=off".parse().unwrap()) .add_directive("tungstenite=off".parse().unwrap()) diff --git a/scripts/probe.mjs b/scripts/probe.mjs new file mode 100755 index 00000000..8c2820cf --- /dev/null +++ b/scripts/probe.mjs @@ -0,0 +1,43 @@ +#!/usr/bin/env node + +const pending = process.argv.filter(arg => arg.match(/^(\d+\.){3}\d+:\d+$/)); + +const seen = new Set(); +const offline = new Set(); +let peer; + +while ((peer = pending.shift())) { + if (seen.has(peer)) continue; + seen.add(peer); + + console.log(`Scanning peers from ${peer}...`); + try { + const res = await fetch(`http://${peer}/mainnet/peers/all`); + const peers = await res.json(); + + for (let p of peers) { + p = p.replace(/413/, '303'); + if (seen.has(p)) continue; + + pending.push(p); + console.log(`- found ${p}`); + } + } catch (err) { + console.error(`Error scanning ${peer}: ${err.message}`); + offline.add(peer); + } +} + +for (const peer of seen) { + if (offline.has(peer)) continue; + + try { + const res = await fetch(`http://${peer}/mainnet/latest/block`); + const block = await res.json(); + console.log( + `${peer} - ${block.header.metadata.height} ${block.block_hash}` + ); + } catch (err) { + console.error(`Error fetching block from ${peer}: ${err.message}`); + } +} diff --git a/specs/test-cannon-record.yaml b/specs/test-cannon-record.yaml index 1c4458a9..86d5a41e 100644 --- a/specs/test-cannon-record.yaml +++ b/specs/test-cannon-record.yaml @@ -17,6 +17,7 @@ source: addresses: [committee.$] sink: + tx-request-delay-ms: 1000 file-name: txs.json --- @@ -27,4 +28,4 @@ name: tx-local timeline: - cannon.await: - name: committee-tx-public - count: 5 + count: 10 From 4a108f425e502385260610a374a60faa3f9ac0db Mon Sep 17 00:00:00 2001 From: Meshiest Date: Fri, 29 Mar 2024 13:22:36 -0500 Subject: [PATCH 7/8] refactor(cannon): code cleanup, streamline auth sourcing, test replay sink --- crates/snot/src/cannon/file.rs | 2 +- crates/snot/src/cannon/mod.rs | 395 ++++++++++++++++++------------- crates/snot/src/cannon/router.rs | 50 +++- crates/snot/src/cannon/sink.rs | 75 ++++-- crates/snot/src/cannon/source.rs | 68 +++++- crates/snot/src/env/timeline.rs | 10 +- specs/test-cannon-replay.yaml | 27 +++ 7 files changed, 426 insertions(+), 201 deletions(-) create mode 100644 specs/test-cannon-replay.yaml diff --git a/crates/snot/src/cannon/file.rs b/crates/snot/src/cannon/file.rs index aebec2e2..23ccec36 100644 --- a/crates/snot/src/cannon/file.rs +++ b/crates/snot/src/cannon/file.rs @@ -72,7 +72,7 @@ impl TransactionSink { } let writer = lock.as_mut().unwrap(); - writer.write_all(line.as_bytes())?; + writer.write_all(line.trim().as_bytes())?; writer.write_all(b"\n")?; writer.flush()?; Ok(()) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 69513a61..e2dcde9d 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -12,18 +12,24 @@ use std::{ use anyhow::{bail, ensure, Result}; use futures_util::{stream::FuturesUnordered, StreamExt}; -use serde_json::json; use snot_common::state::AgentPeer; use tokio::{ process::Command, sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::AbortHandle, }; -use tracing::{debug, trace, warn}; +use tracing::{info, trace, warn}; -use self::{sink::TxSink, source::TxSource}; +use self::{ + file::{TransactionDrain, TransactionSink}, + sink::TxSink, + source::TxSource, +}; use crate::{ - cannon::source::{ComputeTarget, LedgerQueryService}, + cannon::{ + sink::Timer, + source::{ComputeTarget, QueryTarget}, + }, env::{Environment, PortType}, state::GlobalState, }; @@ -57,6 +63,8 @@ burst mode?? */ +pub type Authorization = serde_json::Value; + /// Transaction cannon state /// using the `TxSource` and `TxSink` for configuration. #[derive(Debug)] @@ -88,7 +96,9 @@ pub struct CannonInstance { /// channel to send transactions to the the task tx_sender: UnboundedSender, - tx_receiver: Option>, + /// channel to send authorizations to the the task + auth_sender: UnboundedSender, + fired_txs: Arc, tx_count: usize, } @@ -110,6 +120,11 @@ async fn get_host(state: &GlobalState) -> Option { } } +pub struct CannonReceivers { + transactions: UnboundedReceiver, + authorizations: UnboundedReceiver, +} + impl CannonInstance { /// Create a new active transaction cannon /// with the given source and sink. @@ -122,7 +137,7 @@ impl CannonInstance { source: TxSource, sink: TxSink, count: usize, - ) -> Result { + ) -> Result<(Self, CannonReceivers)> { let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel(); let query_port = source.get_query_port()?; let fired_txs = Arc::new(AtomicUsize::new(0)); @@ -152,27 +167,31 @@ impl CannonInstance { None }; - Ok(Self { - id, - global_state, - source, - sink, - env: Arc::downgrade(&env), - tx_sender, - tx_receiver: Some(tx_receiver), - query_port, - child, - task: None, - fired_txs, - tx_count: count, - }) + let (auth_sender, auth_receiver) = tokio::sync::mpsc::unbounded_channel(); + + Ok(( + Self { + id, + global_state, + source, + sink, + env: Arc::downgrade(&env), + tx_sender, + auth_sender, + query_port, + child, + task: None, + fired_txs, + tx_count: count, + }, + CannonReceivers { + transactions: tx_receiver, + authorizations: auth_receiver, + }, + )) } - pub fn ctx(&mut self) -> Result { - let Some(rx) = self.tx_receiver.take() else { - bail!("cannon already spawned") - }; - + pub fn ctx(&self) -> Result { Ok(ExecutionContext { id: self.id, env: self.env.clone(), @@ -181,37 +200,37 @@ impl CannonInstance { fired_txs: self.fired_txs.clone(), state: self.global_state.clone(), tx_count: self.tx_count, - tx: self.tx_sender.clone(), - rx, + tx_sender: self.tx_sender.clone(), + auth_sender: self.auth_sender.clone(), }) } - pub async fn spawn_local(&mut self) -> Result<()> { + pub async fn spawn_local(&mut self, rx: CannonReceivers) -> Result<()> { let ctx = self.ctx()?; - let handle = tokio::task::spawn(async move { ctx.spawn().await }); + let handle = tokio::task::spawn(async move { ctx.spawn(rx).await }); self.task = Some(handle.abort_handle()); Ok(()) } - pub async fn spawn(&mut self) -> Result<()> { - self.ctx()?.spawn().await + pub async fn spawn(&mut self, rx: CannonReceivers) -> Result<()> { + self.ctx()?.spawn(rx).await } /// Called by axum to forward /cannon//mainnet/latest/stateRoot /// to the ledger query service's /mainnet/latest/stateRoot pub async fn proxy_state_root(&self) -> Result { match &self.source { - TxSource::RealTime { query, .. } => match query { - LedgerQueryService::Local(qs) => { + TxSource::RealTime { query, .. } | TxSource::Listen { query, .. } => match query { + QueryTarget::Local(qs) => { if let Some(port) = self.query_port { qs.get_state_root(port).await } else { bail!("cannon is missing a query port") } } - LedgerQueryService::Node(key) => { + QueryTarget::Node(key) => { let Some(env) = self.env.upgrade() else { unreachable!("called from a place where env is present") }; @@ -240,8 +259,7 @@ impl CannonInstance { /// to the desired sink pub fn proxy_broadcast(&self, body: String) -> Result<()> { match &self.source { - TxSource::RealTime { .. } => { - debug!("received tx from broadcast path"); + TxSource::RealTime { .. } | TxSource::Listen { .. } => { self.tx_sender.send(body)?; } TxSource::Playback { .. } => { @@ -250,12 +268,27 @@ impl CannonInstance { } Ok(()) } + + /// Called by axum to forward /cannon//auth to a listen source + pub fn proxy_auth(&self, body: Authorization) -> Result<()> { + match &self.source { + TxSource::Listen { .. } => { + self.auth_sender.send(body)?; + } + TxSource::RealTime { .. } => { + warn!("cannon received broadcasted transaction in realtime mode. ignoring.") + } + TxSource::Playback { .. } => { + warn!("cannon received broadcasted transaction in playback mode. ignoring.") + } + } + Ok(()) + } } impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop - debug!("dropping cannon {}", self.id); if let Some(handle) = self.task.take() { handle.abort(); } @@ -273,30 +306,29 @@ pub struct ExecutionContext { sink: TxSink, fired_txs: Arc, tx_count: usize, - tx: UnboundedSender, - rx: UnboundedReceiver, + tx_sender: UnboundedSender, + auth_sender: UnboundedSender, } impl ExecutionContext { - pub async fn spawn(self) -> Result<()> { + pub async fn spawn(self, mut rx: CannonReceivers) -> Result<()> { let ExecutionContext { id: cannon_id, - env: env2, + env: env_weak, source, sink, fired_txs, tx_count, state, - tx, - mut rx, - } = self; + .. + } = &self; - let Some(env) = env2.upgrade() else { + let Some(env) = env_weak.upgrade() else { bail!("env dropped") }; let env_id = env.id; - trace!("cannon {env_id}/{cannon_id} spawned"); + trace!("cannon {env_id}.{cannon_id} spawned"); // when in playback mode, ensure the drain exists let (drain_pipe, query_path) = match &source { @@ -305,20 +337,20 @@ impl ExecutionContext { ensure!(pipe.is_some(), "transaction drain not found: {name}"); (pipe, None) } - TxSource::RealTime { compute, .. } => { + TxSource::RealTime { compute, .. } | TxSource::Listen { compute, .. } => { let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); let query = match compute { // agents already know the host of the control plane ComputeTarget::Agent => suffix, // demox needs to locate it ComputeTarget::Demox { .. } => { - let Some(host) = get_host(&state).await else { + let Some(host) = get_host(state).await else { bail!("no --host configured for demox based cannon"); }; format!("http://{host}:{}{suffix}", state.cli.port) } }; - trace!("cannon {cannon_id}/{env_id} using realtime query {query}"); + trace!("cannon {env_id}.{cannon_id} using realtime query {query}"); (None, Some(query)) } }; @@ -332,143 +364,168 @@ impl ExecutionContext { _ => None, }; - // effectively make this be the source of pooling requests - - let gen_tx = || async { - match &source { - TxSource::Playback { .. } => { - // if tx source is playback, read lines from the transaction file - let Some(transaction) = drain_pipe.unwrap().next()? else { - bail!("source out of transactions") - }; - tx.send(transaction)?; - Ok(()) - } - TxSource::RealTime { compute, .. } => { - // TODO: if source is realtime, generate authorizations and - // send them to any available agent - - let env = env2 - .upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))?; - - trace!("cannon {cannon_id}/{env_id} generating authorization..."); - let auth = source.get_auth(&env)?.run(&env.aot_bin).await?; - match compute { - ComputeTarget::Agent => { - // find a client, mark it as busy - let Some((client, _busy)) = - state.pool.read().await.values().find_map(|a| { - if !a.is_busy() && a.is_inventory() { - a.client_owned().map(|c| (c, a.make_busy())) - } else { - None - } - }) - else { - bail!("no agents available to execute authorization") - }; - - // execute the authorization - client - .execute_authorization( - env2.upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))? - .id, - query_path.unwrap(), - serde_json::to_string(&auth)?, - ) - .await?; - - Ok(()) - } - ComputeTarget::Demox { url } => { - let _body = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "generateTransaction", - "params": { - "authorization": serde_json::to_string(&auth["authorization"])?, - "fee": serde_json::to_string(&auth["fee"])?, - "url": query_path.unwrap(), - "broadcast": true, - } - }); - - todo!("post on {url}") - } - } - } - } - }; - // build a timer that keeps track of the expected sink speed - let mut timer = sink.timer(tx_count); + // if the source is listen, the sink's rate is ignored + let mut timer = matches!(source, TxSource::Listen { .. }) + .then(Timer::never) + .unwrap_or_else(|| sink.timer(*tx_count)); - let mut container = FuturesUnordered::new(); + let mut tx_reqs = FuturesUnordered::new(); + let mut auth_execs = FuturesUnordered::new(); + let mut tx_shots = FuturesUnordered::new(); loop { tokio::select! { - Some(res) = container.next() => { + // ------------------------ + // Work generation + // ------------------------ + + // when the timer resolves, request a new transaction + _ = timer.next() => { + tx_reqs.push(self.request_tx(drain_pipe.clone())); + } + // receive authorizations and forward the executions to the compute target + Some(auth) = rx.authorizations.recv() => { + auth_execs.push(self.execute_auth(auth, query_path.clone().unwrap())); + } + // receive transactions and forward them to the sink target + Some(tx) = rx.transactions.recv() => { + tx_shots.push(self.fire_tx(sink_pipe.clone(), tx)); + } + + // ------------------------ + // Work results + // ------------------------ + + Some(res) = tx_reqs.next() => { + match res { + // if the request was successful, continue + Ok(true) => {} + // if the source is depleted, break the loop + Ok(false) => { + info!("cannon {env_id}.{cannon_id} source depleted after {} txs", fired_txs.load(std::sync::atomic::Ordering::Relaxed)); + break; + }, + // if the request failed, undo the timer to allow another transaction to replace the failure + Err(e) => { + warn!("cannon {env_id}.{cannon_id} transaction task failed: {e}"); + timer.undo(); + } + } + }, + Some(res) = auth_execs.next() => { if let Err(e) = res { + warn!("cannon {env_id}.{cannon_id} auth execute task failed: {e}"); timer.undo(); - warn!("transaction gen failed: {e}"); } }, - _ = timer.next() => { - // queue a new transaction - container.push(gen_tx.clone()()); - } - Some(tx) = rx.recv() => { - let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; - match &sink { - TxSink::Record { .. } => { - debug!("writing tx to file"); - sink_pipe.clone().unwrap().write(&tx)?; - } - TxSink::RealTime { target, .. } => { - let pool = state.pool.read().await; - let nodes = env2.upgrade().ok_or_else(|| anyhow::anyhow!("env dropped"))? - .matching_nodes(target, &pool, PortType::Rest) - .collect::>(); - let Some(node) = nodes.get(rand::random::() % nodes.len()) else { - bail!("no nodes available to broadcast transactions") - }; - match node { - AgentPeer::Internal(id, _) => { - let Some(client) = pool[id].client_owned() else { - bail!("target node was offline"); - }; - - client.broadcast_tx(tx).await?; - } - AgentPeer::External(addr) => { - let url = format!("http://{addr}/mainnet/transaction/broadcast"); - ensure!( - reqwest::Client::new() - .post(url) - .header("Content-Type", "application/json") - .body(tx) - .send() - .await? - .status() - .is_success(), - "failed to post transaction to external target node {addr}" - ); - } + Some(res) = tx_shots.next() => { + match res { + Ok(()) => { + let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + if fired_count >= *tx_count { + trace!("cannon {env_id}.{cannon_id} finished firing txs"); + break; } + trace!("cannon {env_id}.{cannon_id} fired {fired_count}/{tx_count} txs"); + } + Err(e) => { + warn!("cannon {env_id}.{cannon_id} failed to fire transaction {e}"); + timer.undo(); } } + }, + } + } + + Ok(()) + } + + /// Request a new transaction from the context's source + async fn request_tx(&self, drain_pipe: Option>) -> Result { + match &self.source { + TxSource::Playback { .. } => { + // if tx source is playback, read lines from the transaction file + let Some(transaction) = drain_pipe.unwrap().next()? else { + return Ok(false); + }; + self.tx_sender.send(transaction)?; + Ok(true) + } + TxSource::RealTime { .. } => { + let Some(env) = self.env.upgrade() else { + bail!("env dropped") + }; + trace!("cannon {}.{} generating authorization...", env.id, self.id); + + let auth = self.source.get_auth(&env)?.run(&env.aot_bin).await?; + self.auth_sender.send(auth)?; + Ok(true) + } + TxSource::Listen { .. } => { + unreachable!("listen mode cannot generate transactions") + } + } + } + + /// Execute an authorization on the source's compute target + async fn execute_auth(&self, auth: Authorization, query_path: String) -> Result<()> { + match &self.source { + TxSource::Playback { .. } => { + unreachable!("playback mode cannot receive authorizations") + } + TxSource::RealTime { compute, .. } | TxSource::Listen { compute, .. } => { + let env = self + .env + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))?; + compute.execute(&self.state, &env, query_path, auth).await + } + } + } + + /// Fire a transaction to the sink + async fn fire_tx(&self, sink_pipe: Option>, tx: String) -> Result<()> { + match &self.sink { + TxSink::Record { .. } => { + sink_pipe.unwrap().write(&tx)?; + } + TxSink::RealTime { target, .. } => { + let pool = self.state.pool.read().await; + let nodes = self + .env + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))? + .matching_nodes(target, &pool, PortType::Rest) + .collect::>(); + let Some(node) = nodes.get(rand::random::() % nodes.len()) else { + bail!("no nodes available to broadcast transactions") + }; + match node { + AgentPeer::Internal(id, _) => { + let Some(client) = pool[id].client_owned() else { + bail!("target agent {id} was offline"); + }; - if fired_count >= tx_count { - debug!("finished firing txs"); - break; + client.broadcast_tx(tx).await?; + } + AgentPeer::External(addr) => { + let url = format!("http://{addr}/mainnet/transaction/broadcast"); + ensure!( + reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .body(tx) + .send() + .await? + .status() + .is_success(), + "failed to post transaction to external target node {addr}" + ); } - debug!("fired {fired_count}/{tx_count} txs"); } } } - Ok(()) } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index b24ffffb..38a05661 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -11,10 +11,13 @@ use serde_json::json; use crate::state::AppState; +use super::Authorization; + pub(crate) fn redirect_cannon_routes() -> Router { Router::new() .route("/:cannon/mainnet/latest/stateRoot", get(state_root)) .route("/:cannon/mainnet/transaction/broadcast", post(transaction)) + .route("/:cannon/auth", post(authorization)) } async fn state_root( @@ -81,12 +84,20 @@ async fn transaction( let env = state.envs.read().await; env.get(&env_id).cloned() }) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); }; let cannon_lock = env.cannons.read().await; let Some(cannon) = cannon_lock.get(&cannon_id) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); }; match cannon.proxy_broadcast(body) { @@ -98,3 +109,38 @@ async fn transaction( .into_response(), } } + +async fn authorization( + Path((env_id, cannon_id)): Path<(usize, usize)>, + state: State, + Json(body): Json, +) -> Response { + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() + }) else { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); + }; + + let cannon_lock = env.cannons.read().await; + let Some(cannon) = cannon_lock.get(&cannon_id) else { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); + }; + + match cannon.proxy_auth(body) { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 0ce79881..69bb64a0 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -37,6 +37,18 @@ pub enum TxSink { /// Requires cannon to have an associated env_id target: NodeTargets, + #[serde(flatten)] + // rate in which the transactions are sent + rate: FireRate, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", untagged)] +pub enum FireRate { + Never, + #[serde(rename_all = "kebab-case")] + Burst { /// How long between each burst of transactions burst_delay_ms: u32, /// How many transactions to fire off in each burst @@ -44,27 +56,20 @@ pub enum TxSink { /// How long between each transaction in a burst tx_delay_ms: u32, }, + #[serde(rename_all = "kebab-case")] + Repeat { + tx_delay_ms: u32, + }, } -impl TxSink { - pub fn timer(&self, count: usize) -> Timer { +impl FireRate { + fn as_timer(&self, count: usize) -> Timer { match self { - TxSink::Record { - tx_request_delay_ms, - .. - } => Timer { - last_shot: Instant::now(), - state: TimerState::Waiting, - count, - burst_rate: Duration::from_millis(*tx_request_delay_ms as u64), - burst_size: 1, - fire_rate: Duration::ZERO, - }, - TxSink::RealTime { + FireRate::Never => Timer::never(), + FireRate::Burst { burst_delay_ms, tx_per_burst, tx_delay_ms, - .. } => Timer { last_shot: Instant::now(), state: TimerState::Waiting, @@ -73,6 +78,29 @@ impl TxSink { burst_size: *tx_per_burst, fire_rate: Duration::from_millis(*tx_delay_ms as u64), }, + FireRate::Repeat { tx_delay_ms } => Timer { + last_shot: Instant::now(), + state: TimerState::Waiting, + count, + burst_rate: Duration::from_millis(*tx_delay_ms as u64), + burst_size: 1, + fire_rate: Duration::ZERO, + }, + } + } +} + +impl TxSink { + pub fn timer(&self, count: usize) -> Timer { + match self { + TxSink::Record { + tx_request_delay_ms, + .. + } => FireRate::Repeat { + tx_delay_ms: *tx_request_delay_ms, + } + .as_timer(count), + TxSink::RealTime { rate: speed, .. } => speed.as_timer(count), } } } @@ -92,8 +120,10 @@ enum TimerState { Active(usize), /// wait the `burst_rate` duration Waiting, - /// wait forever + /// wait forever, but available for undo Done, + /// wait forever. does not support undo + Never, } impl Timer { @@ -118,6 +148,17 @@ impl Timer { } } + pub fn never() -> Self { + Timer { + last_shot: Instant::now(), + state: TimerState::Never, + count: 0, + burst_rate: Duration::ZERO, + burst_size: 0, + fire_rate: Duration::ZERO, + } + } + pub async fn next(&mut self) { self.state = match self.state { TimerState::Active(remaining) => { @@ -150,7 +191,7 @@ impl Timer { }, } } - TimerState::Done => future::pending().await, + TimerState::Done | TimerState::Never => future::pending().await, }; } } diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index af91470d..f919d49e 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -2,15 +2,16 @@ use std::collections::HashSet; use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Serialize}; +use serde_json::json; use snot_common::state::NodeKey; -use crate::{env::Environment, schema::nodes::KeySource}; +use crate::{env::Environment, schema::nodes::KeySource, state::GlobalState}; use super::{authorized::Authorize, net::get_available_port}; /// Represents an instance of a local query service. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LocalQueryService { +pub struct LocalService { /// Ledger & genesis block to use // pub storage_id: usize, /// port to host the service on (needs to be unused by other cannons and services) @@ -30,7 +31,7 @@ pub struct LocalQueryService { pub sync_from: Option, } -impl LocalQueryService { +impl LocalService { // TODO: cache this when sync_from is false /// Fetch the state root from the local query service /// (non-cached) @@ -46,9 +47,9 @@ impl LocalQueryService { /// /cannon//mainnet/transaction/broadcast #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case", tag = "mode")] -pub enum LedgerQueryService { +pub enum QueryTarget { /// Use the local ledger query service - Local(LocalQueryService), + Local(LocalService), /// Target a specific node (probably over rpc instead of reqwest lol...) /// /// Requires cannon to have an associated env_id @@ -97,7 +98,7 @@ pub enum TxSource { /// Generate transactions in real time #[serde(rename_all = "kebab-case")] RealTime { - query: LedgerQueryService, + query: QueryTarget, compute: ComputeTarget, /// defaults to TransferPublic @@ -110,6 +111,12 @@ pub enum TxSource { /// defaults to committee addresses addresses: Vec, }, + /// Receive authorizations from a persistent path /api/v1/env/:env_id/cannons/:id/auth + #[serde(rename_all = "kebab-case")] + Listen { + query: QueryTarget, + compute: ComputeTarget, + }, } impl TxSource { @@ -118,7 +125,7 @@ impl TxSource { matches!( self, TxSource::RealTime { - query: LedgerQueryService::Local(_), + query: QueryTarget::Local(_), .. } ) @@ -177,6 +184,53 @@ impl TxSource { } } +impl ComputeTarget { + pub async fn execute( + &self, + state: &GlobalState, + env: &Environment, + query_path: String, + auth: serde_json::Value, + ) -> Result<()> { + match self { + ComputeTarget::Agent => { + // find a client, mark it as busy + let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { + if !a.is_busy() && a.is_inventory() { + a.client_owned().map(|c| (c, a.make_busy())) + } else { + None + } + }) else { + bail!("no agents available to execute authorization") + }; + + // execute the authorization + client + .execute_authorization(env.id, query_path, serde_json::to_string(&auth)?) + .await?; + + Ok(()) + } + ComputeTarget::Demox { url } => { + let _body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "generateTransaction", + "params": { + "authorization": serde_json::to_string(&auth["authorization"])?, + "fee": serde_json::to_string(&auth["fee"])?, + "url": query_path, + "broadcast": true, + } + }); + + todo!("post on {url}") + } + } + } +} + // I use this to generate example yaml... /* #[cfg(test)] mod test { diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index b65e6ad3..6f35cf62 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -14,7 +14,7 @@ use super::Environment; use crate::{ cannon::{ sink::TxSink, - source::{LedgerQueryService, TxSource}, + source::{QueryTarget, TxSource}, CannonInstance, }, schema::timeline::{Action, ActionInstance, EventDuration}, @@ -212,7 +212,7 @@ impl Environment { if let (Some(q), TxSource::RealTime { query, .. }) = (&cannon.query, &mut source) { - *query = LedgerQueryService::Node(q.clone()); + *query = QueryTarget::Node(q.clone()); }; if let (Some(t), TxSink::RealTime { target, .. }) = @@ -222,7 +222,7 @@ impl Environment { }; let count = cannon.count; - let mut instance = CannonInstance::new( + let (mut instance, rx) = CannonInstance::new( state.clone(), cannon_id, env.clone(), @@ -239,7 +239,7 @@ impl Environment { // debug!("instance started await mode"); awaiting_handles.push(tokio::task::spawn(async move { - let res = ctx.spawn().await; + let res = ctx.spawn(rx).await; // remove the cannon after the task is complete env.cannons.write().await.remove(&cannon_id); @@ -247,7 +247,7 @@ impl Environment { })); } else { instance - .spawn_local() + .spawn_local(rx) .await .map_err(ExecutionError::Cannon)?; } diff --git a/specs/test-cannon-replay.yaml b/specs/test-cannon-replay.yaml new file mode 100644 index 00000000..a5952af8 --- /dev/null +++ b/specs/test-cannon-replay.yaml @@ -0,0 +1,27 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: base +name: base-ledger + +--- +version: cannon.snarkos.testing.monadic.us/v1 + +name: committee-tx-public + +source: + file-name: txs.json + +sink: + tx-request-delay-ms: 1000 + file-name: txs-sink.json + +--- +version: timeline.snarkos.testing.monadic.us/v1 + +name: tx-local + +timeline: + - cannon.await: + - name: committee-tx-public + count: 10 From 35132a05a027f0198b46fb96f4a2ff6a1a5c7a8f Mon Sep 17 00:00:00 2001 From: Meshiest Date: Fri, 29 Mar 2024 17:12:47 -0500 Subject: [PATCH 8/8] fix(aot): authorization cost fix --- crates/aot/src/authorized.rs | 67 +++++++++++++++++++++++++++++- crates/snot/src/cannon/mod.rs | 5 +++ specs/test-4-validator-cannon.yaml | 43 +++++++++++++++++++ 3 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 specs/test-4-validator-cannon.yaml diff --git a/crates/aot/src/authorized.rs b/crates/aot/src/authorized.rs index b9bd1b22..437c12d5 100644 --- a/crates/aot/src/authorized.rs +++ b/crates/aot/src/authorized.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Result}; use clap::Args; use rand::{CryptoRng, Rng}; use snarkvm::{ + console::program::Network as NetworkTrait, ledger::{ query::Query, store::{helpers::memory::ConsensusMemory, ConsensusStore}, @@ -11,6 +12,8 @@ use snarkvm::{ prelude::{ de, Deserialize, DeserializeExt, Deserializer, Serialize, SerializeStruct, Serializer, }, + synthesizer::process::cost_in_microcredits, + utilities::ToBytes, }; use tracing::error; @@ -111,8 +114,8 @@ impl Authorized { // Retrieve the execution ID. let execution_id: snarkvm::prelude::Field = function.to_execution_id()?; // Determine the base fee in microcredits. - let base_fee_in_microcredits = 0; - // get_base_fee_in_microcredits(program_id, function_name)?; + let base_fee_in_microcredits = estimate_cost(&function)?; + // Authorize the fee. let fee = match base_fee_in_microcredits == 0 && priority_fee_in_microcredits == 0 { true => None, @@ -181,6 +184,66 @@ impl Authorized { } } +fn estimate_cost(func: &Authorization) -> Result { + let transitions = func.transitions(); + + let storage_cost = { + let mut cost = 0u64; + + cost += 1; // execution version, 1 byte + cost += 1; // number of transitions, 1 byte + + // write each transition + for transition in transitions.values() { + cost += transition.to_bytes_le()?.len() as u64; + } + + // state root (this is 32 bytes) + cost += ::StateRoot::default() + .to_bytes_le()? + .len() as u64; + + // proof option is_some (1 byte) + cost += 1; + // Proof version + cost += 1; + + cost += 956; // size of proof with 1 batch size + + /* cost += varuna::Proof::<::PairingCurve>::new( + todo!("batch_sizes"), + todo!("commitments"), + todo!("evaluations"), + todo!("prover_third_message"), + todo!("prover_fourth_message"), + todo!("pc_proof"), + )? + .to_bytes_le()? + .len() as u64; */ + + cost + }; + //execution.size_in_bytes().map_err(|e| e.to_string())?; + + // Compute the finalize cost in microcredits. + let mut finalize_cost = 0u64; + // Iterate over the transitions to accumulate the finalize cost. + for (_key, transition) in transitions { + // Retrieve the function name, program id, and program. + let function_name = transition.function_name(); + let stack = PROCESS.get_stack(transition.program_id())?; + let cost = cost_in_microcredits(stack, function_name)?; + + // Accumulate the finalize cost. + if let Some(cost) = finalize_cost.checked_add(cost) { + finalize_cost = cost; + } else { + bail!("The finalize cost computation overflowed for an execution") + }; + } + Ok(storage_cost + finalize_cost) +} + impl Serialize for Authorized { /// Serializes the authorization into string or bytes. fn serialize(&self, serializer: S) -> Result { diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index e2dcde9d..bfe83bbe 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -498,6 +498,11 @@ impl ExecutionContext { .ok_or_else(|| anyhow::anyhow!("env dropped"))? .matching_nodes(target, &pool, PortType::Rest) .collect::>(); + + if nodes.is_empty() { + bail!("no nodes available to broadcast transactions") + } + let Some(node) = nodes.get(rand::random::() % nodes.len()) else { bail!("no nodes available to broadcast transactions") }; diff --git a/specs/test-4-validator-cannon.yaml b/specs/test-4-validator-cannon.yaml new file mode 100644 index 00000000..14ed6309 --- /dev/null +++ b/specs/test-4-validator-cannon.yaml @@ -0,0 +1,43 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: base +name: base-ledger + +--- +version: nodes.snarkos.testing.monadic.us/v1 +name: 4-validators + +nodes: + validator/test: + replicas: 4 + key: committee.$ + height: 0 + validators: [validator/*] + peers: [] + +--- +version: cannon.snarkos.testing.monadic.us/v1 + +name: committee-tx-public + +source: + file-name: txs.json + +sink: + target: validator/test-1 + tx-delay-ms: 1000 + +--- +version: timeline.snarkos.testing.monadic.us/v1 + +name: tx-local + +timeline: + - cannon.await: + - name: committee-tx-public + count: 10 + - offline.await: + - validator/test-0 + - validator/test-2 + - validator/test-3