From 4a108f425e502385260610a374a60faa3f9ac0db Mon Sep 17 00:00:00 2001 From: Meshiest Date: Fri, 29 Mar 2024 13:22:36 -0500 Subject: [PATCH] refactor(cannon): code cleanup, streamline auth sourcing, test replay sink --- crates/snot/src/cannon/file.rs | 2 +- crates/snot/src/cannon/mod.rs | 395 ++++++++++++++++++------------- crates/snot/src/cannon/router.rs | 50 +++- crates/snot/src/cannon/sink.rs | 75 ++++-- crates/snot/src/cannon/source.rs | 68 +++++- crates/snot/src/env/timeline.rs | 10 +- specs/test-cannon-replay.yaml | 27 +++ 7 files changed, 426 insertions(+), 201 deletions(-) create mode 100644 specs/test-cannon-replay.yaml diff --git a/crates/snot/src/cannon/file.rs b/crates/snot/src/cannon/file.rs index aebec2e2..23ccec36 100644 --- a/crates/snot/src/cannon/file.rs +++ b/crates/snot/src/cannon/file.rs @@ -72,7 +72,7 @@ impl TransactionSink { } let writer = lock.as_mut().unwrap(); - writer.write_all(line.as_bytes())?; + writer.write_all(line.trim().as_bytes())?; writer.write_all(b"\n")?; writer.flush()?; Ok(()) diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index 69513a61..e2dcde9d 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -12,18 +12,24 @@ use std::{ use anyhow::{bail, ensure, Result}; use futures_util::{stream::FuturesUnordered, StreamExt}; -use serde_json::json; use snot_common::state::AgentPeer; use tokio::{ process::Command, sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::AbortHandle, }; -use tracing::{debug, trace, warn}; +use tracing::{info, trace, warn}; -use self::{sink::TxSink, source::TxSource}; +use self::{ + file::{TransactionDrain, TransactionSink}, + sink::TxSink, + source::TxSource, +}; use crate::{ - cannon::source::{ComputeTarget, LedgerQueryService}, + cannon::{ + sink::Timer, + source::{ComputeTarget, QueryTarget}, + }, env::{Environment, PortType}, state::GlobalState, }; @@ -57,6 +63,8 @@ burst mode?? */ +pub type Authorization = serde_json::Value; + /// Transaction cannon state /// using the `TxSource` and `TxSink` for configuration. #[derive(Debug)] @@ -88,7 +96,9 @@ pub struct CannonInstance { /// channel to send transactions to the the task tx_sender: UnboundedSender, - tx_receiver: Option>, + /// channel to send authorizations to the the task + auth_sender: UnboundedSender, + fired_txs: Arc, tx_count: usize, } @@ -110,6 +120,11 @@ async fn get_host(state: &GlobalState) -> Option { } } +pub struct CannonReceivers { + transactions: UnboundedReceiver, + authorizations: UnboundedReceiver, +} + impl CannonInstance { /// Create a new active transaction cannon /// with the given source and sink. @@ -122,7 +137,7 @@ impl CannonInstance { source: TxSource, sink: TxSink, count: usize, - ) -> Result { + ) -> Result<(Self, CannonReceivers)> { let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel(); let query_port = source.get_query_port()?; let fired_txs = Arc::new(AtomicUsize::new(0)); @@ -152,27 +167,31 @@ impl CannonInstance { None }; - Ok(Self { - id, - global_state, - source, - sink, - env: Arc::downgrade(&env), - tx_sender, - tx_receiver: Some(tx_receiver), - query_port, - child, - task: None, - fired_txs, - tx_count: count, - }) + let (auth_sender, auth_receiver) = tokio::sync::mpsc::unbounded_channel(); + + Ok(( + Self { + id, + global_state, + source, + sink, + env: Arc::downgrade(&env), + tx_sender, + auth_sender, + query_port, + child, + task: None, + fired_txs, + tx_count: count, + }, + CannonReceivers { + transactions: tx_receiver, + authorizations: auth_receiver, + }, + )) } - pub fn ctx(&mut self) -> Result { - let Some(rx) = self.tx_receiver.take() else { - bail!("cannon already spawned") - }; - + pub fn ctx(&self) -> Result { Ok(ExecutionContext { id: self.id, env: self.env.clone(), @@ -181,37 +200,37 @@ impl CannonInstance { fired_txs: self.fired_txs.clone(), state: self.global_state.clone(), tx_count: self.tx_count, - tx: self.tx_sender.clone(), - rx, + tx_sender: self.tx_sender.clone(), + auth_sender: self.auth_sender.clone(), }) } - pub async fn spawn_local(&mut self) -> Result<()> { + pub async fn spawn_local(&mut self, rx: CannonReceivers) -> Result<()> { let ctx = self.ctx()?; - let handle = tokio::task::spawn(async move { ctx.spawn().await }); + let handle = tokio::task::spawn(async move { ctx.spawn(rx).await }); self.task = Some(handle.abort_handle()); Ok(()) } - pub async fn spawn(&mut self) -> Result<()> { - self.ctx()?.spawn().await + pub async fn spawn(&mut self, rx: CannonReceivers) -> Result<()> { + self.ctx()?.spawn(rx).await } /// Called by axum to forward /cannon//mainnet/latest/stateRoot /// to the ledger query service's /mainnet/latest/stateRoot pub async fn proxy_state_root(&self) -> Result { match &self.source { - TxSource::RealTime { query, .. } => match query { - LedgerQueryService::Local(qs) => { + TxSource::RealTime { query, .. } | TxSource::Listen { query, .. } => match query { + QueryTarget::Local(qs) => { if let Some(port) = self.query_port { qs.get_state_root(port).await } else { bail!("cannon is missing a query port") } } - LedgerQueryService::Node(key) => { + QueryTarget::Node(key) => { let Some(env) = self.env.upgrade() else { unreachable!("called from a place where env is present") }; @@ -240,8 +259,7 @@ impl CannonInstance { /// to the desired sink pub fn proxy_broadcast(&self, body: String) -> Result<()> { match &self.source { - TxSource::RealTime { .. } => { - debug!("received tx from broadcast path"); + TxSource::RealTime { .. } | TxSource::Listen { .. } => { self.tx_sender.send(body)?; } TxSource::Playback { .. } => { @@ -250,12 +268,27 @@ impl CannonInstance { } Ok(()) } + + /// Called by axum to forward /cannon//auth to a listen source + pub fn proxy_auth(&self, body: Authorization) -> Result<()> { + match &self.source { + TxSource::Listen { .. } => { + self.auth_sender.send(body)?; + } + TxSource::RealTime { .. } => { + warn!("cannon received broadcasted transaction in realtime mode. ignoring.") + } + TxSource::Playback { .. } => { + warn!("cannon received broadcasted transaction in playback mode. ignoring.") + } + } + Ok(()) + } } impl Drop for CannonInstance { fn drop(&mut self) { // cancel the task on drop - debug!("dropping cannon {}", self.id); if let Some(handle) = self.task.take() { handle.abort(); } @@ -273,30 +306,29 @@ pub struct ExecutionContext { sink: TxSink, fired_txs: Arc, tx_count: usize, - tx: UnboundedSender, - rx: UnboundedReceiver, + tx_sender: UnboundedSender, + auth_sender: UnboundedSender, } impl ExecutionContext { - pub async fn spawn(self) -> Result<()> { + pub async fn spawn(self, mut rx: CannonReceivers) -> Result<()> { let ExecutionContext { id: cannon_id, - env: env2, + env: env_weak, source, sink, fired_txs, tx_count, state, - tx, - mut rx, - } = self; + .. + } = &self; - let Some(env) = env2.upgrade() else { + let Some(env) = env_weak.upgrade() else { bail!("env dropped") }; let env_id = env.id; - trace!("cannon {env_id}/{cannon_id} spawned"); + trace!("cannon {env_id}.{cannon_id} spawned"); // when in playback mode, ensure the drain exists let (drain_pipe, query_path) = match &source { @@ -305,20 +337,20 @@ impl ExecutionContext { ensure!(pipe.is_some(), "transaction drain not found: {name}"); (pipe, None) } - TxSource::RealTime { compute, .. } => { + TxSource::RealTime { compute, .. } | TxSource::Listen { compute, .. } => { let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); let query = match compute { // agents already know the host of the control plane ComputeTarget::Agent => suffix, // demox needs to locate it ComputeTarget::Demox { .. } => { - let Some(host) = get_host(&state).await else { + let Some(host) = get_host(state).await else { bail!("no --host configured for demox based cannon"); }; format!("http://{host}:{}{suffix}", state.cli.port) } }; - trace!("cannon {cannon_id}/{env_id} using realtime query {query}"); + trace!("cannon {env_id}.{cannon_id} using realtime query {query}"); (None, Some(query)) } }; @@ -332,143 +364,168 @@ impl ExecutionContext { _ => None, }; - // effectively make this be the source of pooling requests - - let gen_tx = || async { - match &source { - TxSource::Playback { .. } => { - // if tx source is playback, read lines from the transaction file - let Some(transaction) = drain_pipe.unwrap().next()? else { - bail!("source out of transactions") - }; - tx.send(transaction)?; - Ok(()) - } - TxSource::RealTime { compute, .. } => { - // TODO: if source is realtime, generate authorizations and - // send them to any available agent - - let env = env2 - .upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))?; - - trace!("cannon {cannon_id}/{env_id} generating authorization..."); - let auth = source.get_auth(&env)?.run(&env.aot_bin).await?; - match compute { - ComputeTarget::Agent => { - // find a client, mark it as busy - let Some((client, _busy)) = - state.pool.read().await.values().find_map(|a| { - if !a.is_busy() && a.is_inventory() { - a.client_owned().map(|c| (c, a.make_busy())) - } else { - None - } - }) - else { - bail!("no agents available to execute authorization") - }; - - // execute the authorization - client - .execute_authorization( - env2.upgrade() - .ok_or_else(|| anyhow::anyhow!("env dropped"))? - .id, - query_path.unwrap(), - serde_json::to_string(&auth)?, - ) - .await?; - - Ok(()) - } - ComputeTarget::Demox { url } => { - let _body = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "generateTransaction", - "params": { - "authorization": serde_json::to_string(&auth["authorization"])?, - "fee": serde_json::to_string(&auth["fee"])?, - "url": query_path.unwrap(), - "broadcast": true, - } - }); - - todo!("post on {url}") - } - } - } - } - }; - // build a timer that keeps track of the expected sink speed - let mut timer = sink.timer(tx_count); + // if the source is listen, the sink's rate is ignored + let mut timer = matches!(source, TxSource::Listen { .. }) + .then(Timer::never) + .unwrap_or_else(|| sink.timer(*tx_count)); - let mut container = FuturesUnordered::new(); + let mut tx_reqs = FuturesUnordered::new(); + let mut auth_execs = FuturesUnordered::new(); + let mut tx_shots = FuturesUnordered::new(); loop { tokio::select! { - Some(res) = container.next() => { + // ------------------------ + // Work generation + // ------------------------ + + // when the timer resolves, request a new transaction + _ = timer.next() => { + tx_reqs.push(self.request_tx(drain_pipe.clone())); + } + // receive authorizations and forward the executions to the compute target + Some(auth) = rx.authorizations.recv() => { + auth_execs.push(self.execute_auth(auth, query_path.clone().unwrap())); + } + // receive transactions and forward them to the sink target + Some(tx) = rx.transactions.recv() => { + tx_shots.push(self.fire_tx(sink_pipe.clone(), tx)); + } + + // ------------------------ + // Work results + // ------------------------ + + Some(res) = tx_reqs.next() => { + match res { + // if the request was successful, continue + Ok(true) => {} + // if the source is depleted, break the loop + Ok(false) => { + info!("cannon {env_id}.{cannon_id} source depleted after {} txs", fired_txs.load(std::sync::atomic::Ordering::Relaxed)); + break; + }, + // if the request failed, undo the timer to allow another transaction to replace the failure + Err(e) => { + warn!("cannon {env_id}.{cannon_id} transaction task failed: {e}"); + timer.undo(); + } + } + }, + Some(res) = auth_execs.next() => { if let Err(e) = res { + warn!("cannon {env_id}.{cannon_id} auth execute task failed: {e}"); timer.undo(); - warn!("transaction gen failed: {e}"); } }, - _ = timer.next() => { - // queue a new transaction - container.push(gen_tx.clone()()); - } - Some(tx) = rx.recv() => { - let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; - match &sink { - TxSink::Record { .. } => { - debug!("writing tx to file"); - sink_pipe.clone().unwrap().write(&tx)?; - } - TxSink::RealTime { target, .. } => { - let pool = state.pool.read().await; - let nodes = env2.upgrade().ok_or_else(|| anyhow::anyhow!("env dropped"))? - .matching_nodes(target, &pool, PortType::Rest) - .collect::>(); - let Some(node) = nodes.get(rand::random::() % nodes.len()) else { - bail!("no nodes available to broadcast transactions") - }; - match node { - AgentPeer::Internal(id, _) => { - let Some(client) = pool[id].client_owned() else { - bail!("target node was offline"); - }; - - client.broadcast_tx(tx).await?; - } - AgentPeer::External(addr) => { - let url = format!("http://{addr}/mainnet/transaction/broadcast"); - ensure!( - reqwest::Client::new() - .post(url) - .header("Content-Type", "application/json") - .body(tx) - .send() - .await? - .status() - .is_success(), - "failed to post transaction to external target node {addr}" - ); - } + Some(res) = tx_shots.next() => { + match res { + Ok(()) => { + let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + if fired_count >= *tx_count { + trace!("cannon {env_id}.{cannon_id} finished firing txs"); + break; } + trace!("cannon {env_id}.{cannon_id} fired {fired_count}/{tx_count} txs"); + } + Err(e) => { + warn!("cannon {env_id}.{cannon_id} failed to fire transaction {e}"); + timer.undo(); } } + }, + } + } + + Ok(()) + } + + /// Request a new transaction from the context's source + async fn request_tx(&self, drain_pipe: Option>) -> Result { + match &self.source { + TxSource::Playback { .. } => { + // if tx source is playback, read lines from the transaction file + let Some(transaction) = drain_pipe.unwrap().next()? else { + return Ok(false); + }; + self.tx_sender.send(transaction)?; + Ok(true) + } + TxSource::RealTime { .. } => { + let Some(env) = self.env.upgrade() else { + bail!("env dropped") + }; + trace!("cannon {}.{} generating authorization...", env.id, self.id); + + let auth = self.source.get_auth(&env)?.run(&env.aot_bin).await?; + self.auth_sender.send(auth)?; + Ok(true) + } + TxSource::Listen { .. } => { + unreachable!("listen mode cannot generate transactions") + } + } + } + + /// Execute an authorization on the source's compute target + async fn execute_auth(&self, auth: Authorization, query_path: String) -> Result<()> { + match &self.source { + TxSource::Playback { .. } => { + unreachable!("playback mode cannot receive authorizations") + } + TxSource::RealTime { compute, .. } | TxSource::Listen { compute, .. } => { + let env = self + .env + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))?; + compute.execute(&self.state, &env, query_path, auth).await + } + } + } + + /// Fire a transaction to the sink + async fn fire_tx(&self, sink_pipe: Option>, tx: String) -> Result<()> { + match &self.sink { + TxSink::Record { .. } => { + sink_pipe.unwrap().write(&tx)?; + } + TxSink::RealTime { target, .. } => { + let pool = self.state.pool.read().await; + let nodes = self + .env + .upgrade() + .ok_or_else(|| anyhow::anyhow!("env dropped"))? + .matching_nodes(target, &pool, PortType::Rest) + .collect::>(); + let Some(node) = nodes.get(rand::random::() % nodes.len()) else { + bail!("no nodes available to broadcast transactions") + }; + match node { + AgentPeer::Internal(id, _) => { + let Some(client) = pool[id].client_owned() else { + bail!("target agent {id} was offline"); + }; - if fired_count >= tx_count { - debug!("finished firing txs"); - break; + client.broadcast_tx(tx).await?; + } + AgentPeer::External(addr) => { + let url = format!("http://{addr}/mainnet/transaction/broadcast"); + ensure!( + reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .body(tx) + .send() + .await? + .status() + .is_success(), + "failed to post transaction to external target node {addr}" + ); } - debug!("fired {fired_count}/{tx_count} txs"); } } } - Ok(()) } } diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs index b24ffffb..38a05661 100644 --- a/crates/snot/src/cannon/router.rs +++ b/crates/snot/src/cannon/router.rs @@ -11,10 +11,13 @@ use serde_json::json; use crate::state::AppState; +use super::Authorization; + pub(crate) fn redirect_cannon_routes() -> Router { Router::new() .route("/:cannon/mainnet/latest/stateRoot", get(state_root)) .route("/:cannon/mainnet/transaction/broadcast", post(transaction)) + .route("/:cannon/auth", post(authorization)) } async fn state_root( @@ -81,12 +84,20 @@ async fn transaction( let env = state.envs.read().await; env.get(&env_id).cloned() }) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); }; let cannon_lock = env.cannons.read().await; let Some(cannon) = cannon_lock.get(&cannon_id) else { - return StatusCode::NOT_FOUND.into_response(); + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); }; match cannon.proxy_broadcast(body) { @@ -98,3 +109,38 @@ async fn transaction( .into_response(), } } + +async fn authorization( + Path((env_id, cannon_id)): Path<(usize, usize)>, + state: State, + Json(body): Json, +) -> Response { + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() + }) else { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "environment not found" })), + ) + .into_response(); + }; + + let cannon_lock = env.cannons.read().await; + let Some(cannon) = cannon_lock.get(&cannon_id) else { + return ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "cannon not found" })), + ) + .into_response(); + }; + + match cannon.proxy_auth(body) { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs index 0ce79881..69bb64a0 100644 --- a/crates/snot/src/cannon/sink.rs +++ b/crates/snot/src/cannon/sink.rs @@ -37,6 +37,18 @@ pub enum TxSink { /// Requires cannon to have an associated env_id target: NodeTargets, + #[serde(flatten)] + // rate in which the transactions are sent + rate: FireRate, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", untagged)] +pub enum FireRate { + Never, + #[serde(rename_all = "kebab-case")] + Burst { /// How long between each burst of transactions burst_delay_ms: u32, /// How many transactions to fire off in each burst @@ -44,27 +56,20 @@ pub enum TxSink { /// How long between each transaction in a burst tx_delay_ms: u32, }, + #[serde(rename_all = "kebab-case")] + Repeat { + tx_delay_ms: u32, + }, } -impl TxSink { - pub fn timer(&self, count: usize) -> Timer { +impl FireRate { + fn as_timer(&self, count: usize) -> Timer { match self { - TxSink::Record { - tx_request_delay_ms, - .. - } => Timer { - last_shot: Instant::now(), - state: TimerState::Waiting, - count, - burst_rate: Duration::from_millis(*tx_request_delay_ms as u64), - burst_size: 1, - fire_rate: Duration::ZERO, - }, - TxSink::RealTime { + FireRate::Never => Timer::never(), + FireRate::Burst { burst_delay_ms, tx_per_burst, tx_delay_ms, - .. } => Timer { last_shot: Instant::now(), state: TimerState::Waiting, @@ -73,6 +78,29 @@ impl TxSink { burst_size: *tx_per_burst, fire_rate: Duration::from_millis(*tx_delay_ms as u64), }, + FireRate::Repeat { tx_delay_ms } => Timer { + last_shot: Instant::now(), + state: TimerState::Waiting, + count, + burst_rate: Duration::from_millis(*tx_delay_ms as u64), + burst_size: 1, + fire_rate: Duration::ZERO, + }, + } + } +} + +impl TxSink { + pub fn timer(&self, count: usize) -> Timer { + match self { + TxSink::Record { + tx_request_delay_ms, + .. + } => FireRate::Repeat { + tx_delay_ms: *tx_request_delay_ms, + } + .as_timer(count), + TxSink::RealTime { rate: speed, .. } => speed.as_timer(count), } } } @@ -92,8 +120,10 @@ enum TimerState { Active(usize), /// wait the `burst_rate` duration Waiting, - /// wait forever + /// wait forever, but available for undo Done, + /// wait forever. does not support undo + Never, } impl Timer { @@ -118,6 +148,17 @@ impl Timer { } } + pub fn never() -> Self { + Timer { + last_shot: Instant::now(), + state: TimerState::Never, + count: 0, + burst_rate: Duration::ZERO, + burst_size: 0, + fire_rate: Duration::ZERO, + } + } + pub async fn next(&mut self) { self.state = match self.state { TimerState::Active(remaining) => { @@ -150,7 +191,7 @@ impl Timer { }, } } - TimerState::Done => future::pending().await, + TimerState::Done | TimerState::Never => future::pending().await, }; } } diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index af91470d..f919d49e 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -2,15 +2,16 @@ use std::collections::HashSet; use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Serialize}; +use serde_json::json; use snot_common::state::NodeKey; -use crate::{env::Environment, schema::nodes::KeySource}; +use crate::{env::Environment, schema::nodes::KeySource, state::GlobalState}; use super::{authorized::Authorize, net::get_available_port}; /// Represents an instance of a local query service. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LocalQueryService { +pub struct LocalService { /// Ledger & genesis block to use // pub storage_id: usize, /// port to host the service on (needs to be unused by other cannons and services) @@ -30,7 +31,7 @@ pub struct LocalQueryService { pub sync_from: Option, } -impl LocalQueryService { +impl LocalService { // TODO: cache this when sync_from is false /// Fetch the state root from the local query service /// (non-cached) @@ -46,9 +47,9 @@ impl LocalQueryService { /// /cannon//mainnet/transaction/broadcast #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case", tag = "mode")] -pub enum LedgerQueryService { +pub enum QueryTarget { /// Use the local ledger query service - Local(LocalQueryService), + Local(LocalService), /// Target a specific node (probably over rpc instead of reqwest lol...) /// /// Requires cannon to have an associated env_id @@ -97,7 +98,7 @@ pub enum TxSource { /// Generate transactions in real time #[serde(rename_all = "kebab-case")] RealTime { - query: LedgerQueryService, + query: QueryTarget, compute: ComputeTarget, /// defaults to TransferPublic @@ -110,6 +111,12 @@ pub enum TxSource { /// defaults to committee addresses addresses: Vec, }, + /// Receive authorizations from a persistent path /api/v1/env/:env_id/cannons/:id/auth + #[serde(rename_all = "kebab-case")] + Listen { + query: QueryTarget, + compute: ComputeTarget, + }, } impl TxSource { @@ -118,7 +125,7 @@ impl TxSource { matches!( self, TxSource::RealTime { - query: LedgerQueryService::Local(_), + query: QueryTarget::Local(_), .. } ) @@ -177,6 +184,53 @@ impl TxSource { } } +impl ComputeTarget { + pub async fn execute( + &self, + state: &GlobalState, + env: &Environment, + query_path: String, + auth: serde_json::Value, + ) -> Result<()> { + match self { + ComputeTarget::Agent => { + // find a client, mark it as busy + let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { + if !a.is_busy() && a.is_inventory() { + a.client_owned().map(|c| (c, a.make_busy())) + } else { + None + } + }) else { + bail!("no agents available to execute authorization") + }; + + // execute the authorization + client + .execute_authorization(env.id, query_path, serde_json::to_string(&auth)?) + .await?; + + Ok(()) + } + ComputeTarget::Demox { url } => { + let _body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "generateTransaction", + "params": { + "authorization": serde_json::to_string(&auth["authorization"])?, + "fee": serde_json::to_string(&auth["fee"])?, + "url": query_path, + "broadcast": true, + } + }); + + todo!("post on {url}") + } + } + } +} + // I use this to generate example yaml... /* #[cfg(test)] mod test { diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index b65e6ad3..6f35cf62 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -14,7 +14,7 @@ use super::Environment; use crate::{ cannon::{ sink::TxSink, - source::{LedgerQueryService, TxSource}, + source::{QueryTarget, TxSource}, CannonInstance, }, schema::timeline::{Action, ActionInstance, EventDuration}, @@ -212,7 +212,7 @@ impl Environment { if let (Some(q), TxSource::RealTime { query, .. }) = (&cannon.query, &mut source) { - *query = LedgerQueryService::Node(q.clone()); + *query = QueryTarget::Node(q.clone()); }; if let (Some(t), TxSink::RealTime { target, .. }) = @@ -222,7 +222,7 @@ impl Environment { }; let count = cannon.count; - let mut instance = CannonInstance::new( + let (mut instance, rx) = CannonInstance::new( state.clone(), cannon_id, env.clone(), @@ -239,7 +239,7 @@ impl Environment { // debug!("instance started await mode"); awaiting_handles.push(tokio::task::spawn(async move { - let res = ctx.spawn().await; + let res = ctx.spawn(rx).await; // remove the cannon after the task is complete env.cannons.write().await.remove(&cannon_id); @@ -247,7 +247,7 @@ impl Environment { })); } else { instance - .spawn_local() + .spawn_local(rx) .await .map_err(ExecutionError::Cannon)?; } diff --git a/specs/test-cannon-replay.yaml b/specs/test-cannon-replay.yaml new file mode 100644 index 00000000..a5952af8 --- /dev/null +++ b/specs/test-cannon-replay.yaml @@ -0,0 +1,27 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: base +name: base-ledger + +--- +version: cannon.snarkos.testing.monadic.us/v1 + +name: committee-tx-public + +source: + file-name: txs.json + +sink: + tx-request-delay-ms: 1000 + file-name: txs-sink.json + +--- +version: timeline.snarkos.testing.monadic.us/v1 + +name: tx-local + +timeline: + - cannon.await: + - name: committee-tx-public + count: 10