diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index 42ee9d5a..39a16d41 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -449,20 +449,27 @@ impl AgentService for AgentRpcServer { info!("executing authorization..."); // TODO: ensure binary associated with this env_id is present - let res = Command::new(self.state.cli.path.join(SNARKOS_FILE)) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) + let res = Command::new(dbg!(self.state.cli.path.join(SNARKOS_FILE))) + .stdout(std::io::stdout()) + .stderr(std::io::stderr()) .arg("execute") .arg("--query") .arg(&format!("http://{}{query}", self.state.endpoint)) .arg(auth) .spawn() - .map_err(|_| AgentError::FailedToSpawnProcess)? + .map_err(|e| { + warn!("failed to spawn auth exec process: {e}"); + AgentError::FailedToSpawnProcess + })? .wait() .await - .map_err(|_| AgentError::ProcessFailed)?; + .map_err(|e| { + warn!("auth exec process failed: {e}"); + AgentError::ProcessFailed + })?; if !res.success() { + warn!("auth exec process exited with status: {res}"); return Err(AgentError::ProcessFailed); } Ok(()) diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 6e24068c..b3b9088a 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -19,7 +19,7 @@ lazy_static.workspace = true rand.workspace = true rand_chacha.workspace = true regex.workspace = true -reqwest = { workspace = true, features = ["stream"] } +reqwest = { workspace = true, features = ["stream", "json"] } serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/crates/snot/src/cannon/authorized.rs b/crates/snot/src/cannon/authorized.rs index c1c8a83a..5d7580f3 100644 --- a/crates/snot/src/cannon/authorized.rs +++ b/crates/snot/src/cannon/authorized.rs @@ -19,7 +19,6 @@ impl Authorize { command .stdout(std::io::stdout()) .stderr(std::io::stderr()) - .arg("aot") .arg("authorize"); match self { @@ -46,13 +45,19 @@ impl Authorize { let res = command.output().await?; + if !res.status.success() { + return Err(anyhow::anyhow!( + "command failed with status {}: {}", + res.status, + String::from_utf8_lossy(&res.stderr) + )); + } + let blob: serde_json::Value = serde_json::from_slice(&res.stdout)?; ensure!(blob.is_object(), "expected JSON object in response"); ensure!( - blob.get("function").is_some() - && blob.get("fee").is_some() - && blob.get("broadcast").is_some(), + blob.get("function").is_some() && blob.get("broadcast").is_some(), "expected function, fee, and broadcast fields in response" ); diff --git a/crates/snot/src/cannon/file.rs b/crates/snot/src/cannon/file.rs index 3940b5b0..aebec2e2 100644 --- a/crates/snot/src/cannon/file.rs +++ b/crates/snot/src/cannon/file.rs @@ -74,6 +74,7 @@ impl TransactionSink { let writer = lock.as_mut().unwrap(); writer.write_all(line.as_bytes())?; writer.write_all(b"\n")?; + writer.flush()?; Ok(()) } } diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 3fb64376..69513a61 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -7,22 +7,19 @@ pub mod source; use std::{ process::Stdio, - sync::{atomic::AtomicUsize, Arc, Mutex, Weak}, + sync::{atomic::AtomicUsize, Arc, OnceLock, Weak}, }; use anyhow::{bail, ensure, Result}; use futures_util::{stream::FuturesUnordered, StreamExt}; use serde_json::json; -use snot_common::state::{AgentPeer, AgentState}; +use snot_common::state::AgentPeer; use tokio::{ process::Command, - sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - OnceCell, - }, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::AbortHandle, }; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use self::{sink::TxSink, source::TxSource}; use crate::{ @@ -82,7 +79,7 @@ pub struct CannonInstance { query_port: Option, // TODO: run the actual cannon in this task - pub task: Mutex>, + pub task: Option, /// Child process must exist for the duration of the cannon instance. /// This value is never used @@ -96,20 +93,20 @@ pub struct CannonInstance { tx_count: usize, } +#[tokio::main] +async fn get_external_ip() -> Option { + let sources: external_ip::Sources = external_ip::get_http_sources(); + let consensus = external_ip::ConsensusBuilder::new() + .add_sources(sources) + .build(); + consensus.get_consensus().await.map(|s| s.to_string()) +} + async fn get_host(state: &GlobalState) -> Option { - static ONCE: OnceCell> = OnceCell::const_new(); + static ONCE: OnceLock> = OnceLock::new(); match state.cli.hostname.as_ref() { Some(host) => Some(host.to_owned()), - None => ONCE - .get_or_init(|| async { - let sources: external_ip::Sources = external_ip::get_http_sources(); - let consensus = external_ip::ConsensusBuilder::new() - .add_sources(sources) - .build(); - consensus.get_consensus().await.map(|a| a.to_string()) - }) - .await - .to_owned(), + None => ONCE.get_or_init(get_external_ip).to_owned(), } } @@ -165,7 +162,7 @@ impl CannonInstance { tx_receiver: Some(tx_receiver), query_port, child, - task: Mutex::new(None), + task: None, fired_txs, tx_count: count, }) @@ -192,12 +189,8 @@ impl CannonInstance { pub async fn spawn_local(&mut self) -> Result<()> { let ctx = self.ctx()?; - let handle = tokio::task::spawn_local(async move { ctx.spawn().await }); - self.task - .lock() - .as_mut() - .unwrap() - .replace(handle.abort_handle()); + let handle = tokio::task::spawn(async move { ctx.spawn().await }); + self.task = Some(handle.abort_handle()); Ok(()) } @@ -262,10 +255,9 @@ impl CannonInstance { impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop - if let Ok(lock) = self.task.lock() { - if let Some(handle) = lock.as_ref() { - handle.abort(); - } + debug!("dropping cannon {}", self.id); + if let Some(handle) = self.task.take() { + handle.abort(); } } } @@ -299,11 +291,12 @@ impl ExecutionContext { mut rx, } = self; - debug!("spawning cannon {cannon_id}"); - let Some(env) = env2.upgrade() else { bail!("env dropped") }; + let env_id = env.id; + + trace!("cannon {env_id}/{cannon_id} spawned"); // when in playback mode, ensure the drain exists let (drain_pipe, query_path) = match &source { @@ -325,15 +318,15 @@ impl ExecutionContext { format!("http://{host}:{}{suffix}", state.cli.port) } }; - debug!("using realtime query {query}"); + trace!("cannon {cannon_id}/{env_id} using realtime query {query}"); (None, Some(query)) } }; let sink_pipe = match &sink { - TxSink::Record { file_name: name } => { - let pipe = env.tx_pipe.sinks.get(name).cloned(); - ensure!(pipe.is_some(), "transaction sink not found: {name}"); + TxSink::Record { file_name, .. } => { + let pipe = env.tx_pipe.sinks.get(file_name).cloned(); + ensure!(pipe.is_some(), "transaction sink not found: {file_name}"); pipe } _ => None, @@ -359,14 +352,14 @@ impl ExecutionContext { .upgrade() .ok_or_else(|| anyhow::anyhow!("env dropped"))?; - debug!("generating authorization..."); + trace!("cannon {cannon_id}/{env_id} generating authorization..."); let auth = source.get_auth(&env)?.run(&env.aot_bin).await?; match compute { ComputeTarget::Agent => { // find a client, mark it as busy let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { - if !a.is_busy() && matches!(a.state(), AgentState::Inventory) { + if !a.is_busy() && a.is_inventory() { a.client_owned().map(|c| (c, a.make_busy())) } else { None @@ -375,7 +368,6 @@ impl ExecutionContext { else { bail!("no agents available to execute authorization") }; - debug!("firing auth at agent"); // execute the authorization client @@ -384,9 +376,10 @@ impl ExecutionContext { .ok_or_else(|| anyhow::anyhow!("env dropped"))? .id, query_path.unwrap(), - serde_json::from_value(auth)?, + serde_json::to_string(&auth)?, ) .await?; + Ok(()) } ComputeTarget::Demox { url } => { @@ -418,19 +411,16 @@ impl ExecutionContext { tokio::select! { Some(res) = container.next() => { if let Err(e) = res { + timer.undo(); warn!("transaction gen failed: {e}"); } }, _ = timer.next() => { - debug!("queue new transaction"); // queue a new transaction container.push(gen_tx.clone()()); } Some(tx) = rx.recv() => { - let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if fired_count >= tx_count { - break; - } + let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; match &sink { TxSink::Record { .. } => { debug!("writing tx to file"); @@ -469,6 +459,12 @@ impl ExecutionContext { } } } + + if fired_count >= tx_count { + debug!("finished firing txs"); + break; + } + debug!("fired {fired_count}/{tx_count} txs"); } } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index b6ccc1f2..b24ffffb 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use axum::{ extract::{Path, State}, response::{IntoResponse, Response}, @@ -23,23 +25,51 @@ async fn state_root( let env = state.envs.read().await; env.get(&env_id).cloned() }) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); }; let cannon_lock = env.cannons.read().await; let Some(cannon) = cannon_lock.get(&cannon_id) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); }; - match cannon.proxy_state_root().await { - // the nodes expect this state root to be string escaped json - Ok(root) => Json(json!(root)).into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("{e}")})), - ) - .into_response(), + // TODO: lock this with a mutex or something so that multiple route callers can't bombard the cannon with proxy_state_root call attempts + let mut attempts = 0; + loop { + attempts += 1; + match cannon.proxy_state_root().await { + Ok(root) => break Json(root).into_response(), + + Err(e) if attempts > 5 => { + break ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": "non-responsive query node", "inner": format!("{e}") })), + ) + .into_response() + } + + _ => attempts += 1, + } + tokio::time::sleep(Duration::from_secs(1)).await; } + + // match cannon.proxy_state_root().await { + // // the nodes expect this state root to be string escaped json + // Ok(root) => Json(root).into_response(), + // Err(e) => ( + // StatusCode::INTERNAL_SERVER_ERROR, + // Json(json!({ "error": format!("{e}")})), + // ) + // .into_response(), + // } } async fn transaction( diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index a00ef43b..0ce79881 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -1,9 +1,14 @@ use std::{future, time::Duration}; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; use crate::schema::NodeTargets; +fn one_thousand() -> u32 { + 1000 +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case", untagged)] pub enum TxSink { @@ -12,6 +17,8 @@ pub enum TxSink { Record { /// filename for the recording txs list file_name: String, + #[serde(default = "one_thousand")] + tx_request_delay_ms: u32, }, //// Write transactions to a ledger query service // AoTAppend { @@ -42,10 +49,14 @@ pub enum TxSink { impl TxSink { pub fn timer(&self, count: usize) -> Timer { match self { - TxSink::Record { .. } => Timer { - state: TimerState::Active(0), + TxSink::Record { + tx_request_delay_ms, + .. + } => Timer { + last_shot: Instant::now(), + state: TimerState::Waiting, count, - burst_rate: Duration::from_secs(1), + burst_rate: Duration::from_millis(*tx_request_delay_ms as u64), burst_size: 1, fire_rate: Duration::ZERO, }, @@ -55,7 +66,8 @@ impl TxSink { tx_delay_ms, .. } => Timer { - state: TimerState::Active(*tx_per_burst as usize), + last_shot: Instant::now(), + state: TimerState::Waiting, count, burst_rate: Duration::from_millis(*burst_delay_ms as u64), burst_size: *tx_per_burst, @@ -71,8 +83,10 @@ pub struct Timer { burst_size: u32, fire_rate: Duration, state: TimerState, + last_shot: Instant, } +#[derive(Debug)] enum TimerState { /// wait the `fire_rate` duration Active(usize), @@ -97,10 +111,19 @@ impl Timer { */ + pub fn undo(&mut self) { + self.count += 1; + if matches!(self.state, TimerState::Done) { + self.state = TimerState::Waiting; + } + } + pub async fn next(&mut self) { self.state = match self.state { TimerState::Active(remaining) => { - tokio::time::sleep(self.fire_rate).await; + tokio::time::sleep_until(self.last_shot + self.fire_rate).await; + self.last_shot = Instant::now(); + self.count = self.count.saturating_sub(1); // we reach this point by having waited before, so we remove one match remaining.saturating_sub(1) { @@ -111,17 +134,19 @@ impl Timer { } } TimerState::Waiting => { + tokio::time::sleep_until(self.last_shot + self.burst_rate).await; + self.last_shot = Instant::now(); self.count = self.count.saturating_sub(1); - tokio::time::sleep(self.burst_rate).await; + match self.count { // if count is empty, the next sleep will be permanent 0 => TimerState::Done, - _ => match self.burst_size { + _ => match self.burst_size.saturating_sub(1) { // if the burst size is 0, do a full burst wait 0 => TimerState::Waiting, // if the burst size is nonzero, wait for the shorter burst latency - _ => TimerState::Active(self.burst_size as usize), + shots => TimerState::Active((shots as usize).min(self.count)), }, } } diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 64232ac9..af91470d 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -37,7 +37,7 @@ impl LocalQueryService { pub async fn get_state_root(&self, port: u16) -> Result { let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", port); let response = reqwest::get(&url).await?; - Ok(response.text().await?) + Ok(response.json().await?) } } diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index b5c08f75..258ba3cb 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -240,7 +240,7 @@ impl Environment { ); } - if let TxSink::Record { file_name } = sink { + if let TxSink::Record { file_name, .. } = sink { tx_pipe.sinks.insert( file_name.to_owned(), Arc::new(TransactionSink::new(storage.clone(), file_name)?), @@ -271,7 +271,6 @@ impl Environment { timeline_handle: Default::default(), }; - let env_id = ENVS_COUNTER.fetch_add(1, Ordering::Relaxed); state_lock.insert(env_id, Arc::new(env)); drop(state_lock); diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index c7b6d119..b65e6ad3 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -200,14 +200,11 @@ impl Environment { } Action::Cannon(cannons) => { - debug!("enter cannon action, {} cannons", cannons.len()); for cannon in cannons.iter() { - debug!("iter cannon {cannon:?} configs: {:?}", env.cannon_configs); let cannon_id = env.cannons_counter.fetch_add(1, Ordering::Relaxed); let Some((mut source, mut sink)) = env.cannon_configs.get(&cannon.name).cloned() else { - debug!("unknown cannon"); return Err(ExecutionError::UnknownCannon(cannon.name.clone())); }; @@ -237,25 +234,27 @@ impl Environment { .map_err(ExecutionError::Cannon)?; if *awaited { + let ctx = instance.ctx().unwrap(); + let env = env.clone(); + // debug!("instance started await mode"); - awaiting_handles.push(tokio::task::spawn_local(async move { - debug!("spawn start"); + awaiting_handles.push(tokio::task::spawn(async move { + let res = ctx.spawn().await; - instance.spawn().await.map_err(ExecutionError::Cannon)?; - debug!("spawn complete"); - Ok::<_, ExecutionError>(()) + // remove the cannon after the task is complete + env.cannons.write().await.remove(&cannon_id); + res.map_err(ExecutionError::Cannon) })); - todo!() } else { instance .spawn_local() .await .map_err(ExecutionError::Cannon)?; - env.cannons.write().await.insert(cannon_id, instance); } - } - todo!() + // insert the cannon + env.cannons.write().await.insert(cannon_id, instance); + } } Action::Height(_) => unimplemented!(), }; diff --git a/crates/snot/src/main.rs b/crates/snot/src/main.rs index 924c6de7..2be2eb3d 100644 --- a/crates/snot/src/main.rs +++ b/crates/snot/src/main.rs @@ -22,6 +22,9 @@ async fn main() { let env_filter = env_filter .parse_lossy("") + .add_directive("hyper_util=off".parse().unwrap()) + .add_directive("hyper=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()) .add_directive("surrealdb_core=off".parse().unwrap()) .add_directive("surrealdb=off".parse().unwrap()) .add_directive("tungstenite=off".parse().unwrap()) diff --git a/scripts/probe.mjs b/scripts/probe.mjs new file mode 100755 index 00000000..8c2820cf --- /dev/null +++ b/scripts/probe.mjs @@ -0,0 +1,43 @@ +#!/usr/bin/env node + +const pending = process.argv.filter(arg => arg.match(/^(\d+\.){3}\d+:\d+$/)); + +const seen = new Set(); +const offline = new Set(); +let peer; + +while ((peer = pending.shift())) { + if (seen.has(peer)) continue; + seen.add(peer); + + console.log(`Scanning peers from ${peer}...`); + try { + const res = await fetch(`http://${peer}/mainnet/peers/all`); + const peers = await res.json(); + + for (let p of peers) { + p = p.replace(/413/, '303'); + if (seen.has(p)) continue; + + pending.push(p); + console.log(`- found ${p}`); + } + } catch (err) { + console.error(`Error scanning ${peer}: ${err.message}`); + offline.add(peer); + } +} + +for (const peer of seen) { + if (offline.has(peer)) continue; + + try { + const res = await fetch(`http://${peer}/mainnet/latest/block`); + const block = await res.json(); + console.log( + `${peer} - ${block.header.metadata.height} ${block.block_hash}` + ); + } catch (err) { + console.error(`Error fetching block from ${peer}: ${err.message}`); + } +} diff --git a/specs/test-cannon-record.yaml b/specs/test-cannon-record.yaml index 1c4458a9..86d5a41e 100644 --- a/specs/test-cannon-record.yaml +++ b/specs/test-cannon-record.yaml @@ -17,6 +17,7 @@ source: addresses: [committee.$] sink: + tx-request-delay-ms: 1000 file-name: txs.json --- @@ -27,4 +28,4 @@ name: tx-local timeline: - cannon.await: - name: committee-tx-public - count: 5 + count: 10