diff --git a/Cargo.lock b/Cargo.lock index 1a7c55d7..872f6333 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,7 +204,7 @@ dependencies = [ "http 0.2.12", "log", "url", - "wildmatch", + "wildmatch 1.1.0", ] [[package]] @@ -2824,6 +2824,7 @@ version = "0.1.0" dependencies = [ "aleo-std", "anyhow", + "axum", "clap", "colored", "crossterm", @@ -2844,6 +2845,7 @@ dependencies = [ "snot-common", "tikv-jemallocator", "tokio", + "tower-http", "tracing", "tracing-appender", "tracing-flame", @@ -3903,7 +3905,9 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-appender", "tracing-subscriber", + "wildmatch 2.3.3", ] [[package]] @@ -3925,6 +3929,7 @@ dependencies = [ "tokio", "tokio-tungstenite", "tracing", + "tracing-appender", "tracing-subscriber", ] @@ -4784,6 +4789,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f44b95f62d34113cf558c93511ac93027e03e9c29a60dd0fd70e6e025c7270a" +[[package]] +name = "wildmatch" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "939e59c1bc731542357fdaad98b209ef78c8743d652bb61439d16b16a79eb025" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 37966150..e1eb9296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ resolver = "2" [workspace.dependencies] aleo-std = "=0.1.24" +axum = "0.7.4" anyhow = "1" bimap = "0.6" bincode = "1.3" @@ -32,6 +33,7 @@ serde_yaml = "0.9" tarpc = { version = "0.34", features = ["tokio1", "serde1"] } thiserror = "1.0" tokio = { version = "1", features = ["full", "macros"] } +tower-http = { version = "0.5.2", features = ["fs", "trace"] } tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/aot/Cargo.toml b/crates/aot/Cargo.toml index 9c9d3c34..80f90c20 100644 --- a/crates/aot/Cargo.toml +++ b/crates/aot/Cargo.toml @@ -12,6 +12,7 @@ node = ["snarkos-node", "crossterm"] [dependencies] aleo-std.workspace = true anyhow.workspace = true +axum.workspace = true clap.workspace = true colored.workspace = true indexmap.workspace = true @@ -30,6 +31,7 @@ snarkos-node = { workspace = true, optional = true } snarkvm.workspace = true snot-common.workspace = true tokio.workspace = true +tower-http.workspace = true tracing-flame = "0.2.0" tracing-appender.workspace = true tracing-subscriber.workspace = true diff --git a/crates/aot/src/authorized.rs b/crates/aot/src/authorized.rs index bfd0ac7a..b9bd1b22 100644 --- a/crates/aot/src/authorized.rs +++ b/crates/aot/src/authorized.rs @@ -12,6 +12,7 @@ use snarkvm::{ de, Deserialize, DeserializeExt, Deserializer, Serialize, SerializeStruct, Serializer, }, }; +use tracing::error; use crate::{ credits::PROCESS, Aleo, Authorization, DbLedger, MemVM, Network, PrivateKey, Transaction, Value, @@ -36,12 +37,45 @@ pub enum ExecutionMode<'a> { pub struct Execute { pub authorization: Authorized, #[arg(short, long)] - pub query: Option, + pub query: String, } impl Execute { pub fn parse(self) -> Result<()> { - Ok(()) + let broadcast = self.authorization.broadcast; + // execute the transaction + let tx = self.authorization.execute_local( + None, + &mut rand::thread_rng(), + Some(self.query.to_owned()), + )?; + + if !broadcast { + println!("{}", serde_json::to_string(&tx)?); + return Ok(()); + } + + // Broadcast the transaction. + tracing::info!("broadcasting transaction..."); + tracing::debug!("{}", serde_json::to_string(&tx)?); + let response = reqwest::blocking::Client::new() + .post(format!("{}/mainnet/transaction/broadcast", self.query)) + .header("Content-Type", "application/json") + .json(&tx) + .send()?; + + // Ensure the response is successful. + if response.status().is_success() { + // Return the transaction. + println!("{}", response.text()?); + Ok(()) + // Return the error. + } else { + let status = response.status(); + let err = response.text()?; + error!("broadcast failed with code {}: {}", status, err); + bail!(err) + } } } @@ -100,7 +134,7 @@ impl Authorized { let response = reqwest::blocking::Client::new() .post(format!("{api_url}/execute")) .header("Content-Type", "application/json") - .body(serde_json::to_string(&self)?) + .json(&self) .send()?; // Ensure the response is successful. diff --git a/crates/aot/src/cli.rs b/crates/aot/src/cli.rs index 6255cc99..9e26247a 100644 --- a/crates/aot/src/cli.rs +++ b/crates/aot/src/cli.rs @@ -133,7 +133,7 @@ impl Cli { // Add layer redirecting logs to the file layers.push( - tracing_subscriber::fmt::Layer::default() + tracing_subscriber::fmt::layer() .with_ansi(false) .with_writer(non_blocking) .with_filter(filter2) @@ -148,7 +148,7 @@ impl Cli { guards.push(g); layers.push( - tracing_subscriber::fmt::Layer::default() + tracing_subscriber::fmt::layer() .with_ansi(io::stdout().is_tty()) .with_writer(stdout) .with_filter(filter) @@ -157,11 +157,7 @@ impl Cli { } else { let (stderr, g) = tracing_appender::non_blocking(io::stderr()); guards.push(g); - layers.push( - tracing_subscriber::fmt::Layer::default() - .with_writer(stderr) - .boxed(), - ); + layers.push(tracing_subscriber::fmt::layer().with_writer(stderr).boxed()); }; let subscriber = tracing_subscriber::registry::Registry::default().with(layers); diff --git a/crates/aot/src/ledger/mod.rs b/crates/aot/src/ledger/mod.rs index f6fecd56..c1f29d9b 100644 --- a/crates/aot/src/ledger/mod.rs +++ b/crates/aot/src/ledger/mod.rs @@ -9,6 +9,7 @@ use crate::{authorized::Execute, Address, PrivateKey}; pub mod add; pub mod distribute; pub mod init; +pub mod query; pub mod truncate; pub mod tx; pub mod util; @@ -41,10 +42,14 @@ macro_rules! comma_separated { type Err = anyhow::Error; fn from_str(s: &str) -> Result { - Ok(Self(s.split(',') - .map(|i| <$item>::from_str(i)) - .collect::, <$item as FromStr>::Err>>() - .map_err(anyhow::Error::from)?)) + if s.is_empty() { + return Ok(Self(Vec::new())); + } + + Ok(Self(s.split(',') + .map(|i| <$item>::from_str(i)) + .collect::, <$item as FromStr>::Err>>() + .map_err(anyhow::Error::from)?)) } } @@ -92,6 +97,7 @@ pub enum Commands { Distribute(distribute::Distribute), Truncate(truncate::Truncate), Execute(Execute), + Query(query::LedgerQuery), } impl Ledger { @@ -141,6 +147,11 @@ impl Ledger { println!("{}", serde_json::to_string(&tx)?); Ok(()) } + + Commands::Query(query) => { + let ledger = util::open_ledger(genesis, ledger)?; + query.parse(&ledger) + } } } } diff --git a/crates/aot/src/ledger/query.rs b/crates/aot/src/ledger/query.rs new file mode 100644 index 00000000..0850b826 --- /dev/null +++ b/crates/aot/src/ledger/query.rs @@ -0,0 +1,146 @@ +use std::{fs::File, io::Write, ops::Deref, path::PathBuf, sync::Arc}; + +use anyhow::Result; +use axum::{ + extract::{self, State}, + response::IntoResponse, + routing::{get, post}, + Json, Router, +}; +use clap::Args; +use reqwest::StatusCode; +use serde_json::json; +use tracing_appender::non_blocking::NonBlocking; + +use crate::{Block, DbLedger, Transaction}; + +#[derive(Debug, Args, Clone)] +/// Receive inquiries on /mainnet/latest/stateRoot +pub struct LedgerQuery { + #[arg(long, default_value = "3030")] + /// Port to listen on for incoming messages + pub port: u16, + + #[arg(long)] + /// When true, the POST /block endpoint will not be available + pub readonly: bool, + + #[arg(long)] + /// Receive messages from /mainnet/transaction/broadcast and record them to the output + pub record: bool, + + #[arg(long, short, default_value = "transactions.json")] + /// Path to the directory containing the stored data + pub output: PathBuf, +} + +struct LedgerState { + readonly: bool, + ledger: DbLedger, + appender: Option, +} + +type AppState = Arc; + +impl LedgerQuery { + #[tokio::main] + pub async fn parse(self, ledger: &DbLedger) -> Result<()> { + let (appender, _guard) = if self.record { + let (appender, guard) = tracing_appender::non_blocking( + File::options() + .create(true) + .append(true) + .open(self.output.clone()) + .expect("Failed to open the file for writing transactions"), + ); + (Some(appender), Some(guard)) + } else { + (None, None) + }; + + let state = LedgerState { + readonly: self.readonly, + ledger: ledger.clone(), + appender, + }; + + let app = Router::new() + .route("/mainnet/latest/stateRoot", get(Self::latest_state_root)) + .route("/mainnet/block/height/latest", get(Self::latest_height)) + .route("/mainnet/block/hash/latest", get(Self::latest_hash)) + .route("/mainnet/transaction/broadcast", post(Self::broadcast_tx)) + .route("/block", post(Self::add_block)) + .with_state(Arc::new(state)); + + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port)).await?; + tracing::info!("listening on: {:?}", listener.local_addr().unwrap()); + axum::serve(listener, app).await?; + + Ok(()) + } + + async fn latest_state_root(state: State) -> impl IntoResponse { + Json(json!(state.ledger.latest_state_root())) + } + + async fn latest_height(state: State) -> impl IntoResponse { + Json(json!(state.ledger.latest_height())) + } + + async fn latest_hash(state: State) -> impl IntoResponse { + Json(json!(state.ledger.latest_hash())) + } + + async fn broadcast_tx( + state: State, + payload: extract::Json, + ) -> impl IntoResponse { + let Ok(tx_json) = serde_json::to_string(payload.deref()) else { + return StatusCode::BAD_REQUEST; + }; + + if let Some(mut a) = state.appender.clone() { + match write!(a, "{}", tx_json) { + Ok(_) => StatusCode::OK, + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } else { + println!("{}", tx_json); + StatusCode::OK + } + } + + async fn add_block(state: State, payload: extract::Json) -> impl IntoResponse { + if state.readonly { + return (StatusCode::FORBIDDEN, Json(json!({"error": "readonly"}))); + } + + if state.ledger.latest_hash() != payload.previous_hash() + || state.ledger.latest_state_root() != payload.previous_state_root() + || state.ledger.latest_height() + 1 != payload.height() + { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "invalid block"})), + ); + } + + if let Err(e) = state + .ledger + .check_next_block(&payload, &mut rand::thread_rng()) + { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("failed to validate block: {e}")})), + ); + } + + match state.ledger.advance_to_next_block(&payload) { + Ok(_) => (StatusCode::OK, Json(json!({"status": "ok"}))), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("failed to advance block: {e}")})), + ), + } + } +} diff --git a/crates/aot/src/runner.rs b/crates/aot/src/runner.rs index c961d15e..0d0bc6f6 100644 --- a/crates/aot/src/runner.rs +++ b/crates/aot/src/runner.rs @@ -45,10 +45,10 @@ pub struct Runner { pub rest: u16, /// Specify the IP address and port of the peer(s) to connect to - #[clap(long = "peers")] + #[clap(long = "peers", default_value = "")] pub peers: Addrs, /// Specify the IP address and port of the validator(s) to connect to - #[clap(long = "validators")] + #[clap(long = "validators", default_value = "")] pub validators: Addrs, /// Specify the requests per second (RPS) rate limit per IP for the REST /// server diff --git a/crates/snot-agent/Cargo.toml b/crates/snot-agent/Cargo.toml index 04acbbe3..dfdb142e 100644 --- a/crates/snot-agent/Cargo.toml +++ b/crates/snot-agent/Cargo.toml @@ -20,5 +20,6 @@ snot-common = { path = "../snot-common" } tarpc.workspace = true tokio.workspace = true tokio-tungstenite = "0.21.0" +tracing-appender.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/crates/snot-agent/src/cli.rs b/crates/snot-agent/src/cli.rs index bea5929f..5c9ce37a 100644 --- a/crates/snot-agent/src/cli.rs +++ b/crates/snot-agent/src/cli.rs @@ -8,6 +8,8 @@ use clap::Parser; pub const ENV_ENDPOINT: &str = "SNOT_ENDPOINT"; pub const ENV_ENDPOINT_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234); +// TODO: allow agents to define preferred internal/external addrs + #[derive(Debug, Parser)] pub struct Cli { #[arg(long)] diff --git a/crates/snot-agent/src/main.rs b/crates/snot-agent/src/main.rs index 84751ef8..efbb5376 100644 --- a/crates/snot-agent/src/main.rs +++ b/crates/snot-agent/src/main.rs @@ -6,6 +6,7 @@ mod state; use std::{ env, + os::unix::fs::PermissionsExt, path::Path, sync::{Arc, Mutex}, time::Duration, @@ -37,15 +38,36 @@ use crate::state::GlobalState; #[tokio::main] async fn main() { + let (stdout, _guard) = tracing_appender::non_blocking(std::io::stdout()); + + let output: tracing_subscriber::fmt::Layer< + _, + tracing_subscriber::fmt::format::DefaultFields, + tracing_subscriber::fmt::format::Format, + tracing_appender::non_blocking::NonBlocking, + > = tracing_subscriber::fmt::layer().with_writer(stdout); + + let output = if cfg!(debug_assertions) { + output.with_file(true).with_line_number(true) + } else { + output + }; + tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) + .with_env_var("RUST_LOG") + .with_default_directive(LevelFilter::TRACE.into()) .parse_lossy("") + .add_directive("neli=off".parse().unwrap()) + .add_directive("hyper_util=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()) + .add_directive("tungstenite=off".parse().unwrap()) + .add_directive("tokio_tungstenite=off".parse().unwrap()) .add_directive("tarpc::client=ERROR".parse().unwrap()) .add_directive("tarpc::server=ERROR".parse().unwrap()), ) - .with(tracing_subscriber::fmt::layer()) + .with(output) .try_init() .unwrap(); @@ -97,8 +119,17 @@ async fn main() { .await .expect("failed to acquire snarkOS binary"); + // create rpc channels + let (client_response_in, client_transport, mut client_request_out) = RpcTransport::new(); + let (server_request_in, server_transport, mut server_response_out) = RpcTransport::new(); + + // set up the client, facing the control plane + let client = + ControlServiceClient::new(tarpc::client::Config::default(), client_transport).spawn(); + // create the client state let state = Arc::new(GlobalState { + client, external_addr, internal_addrs, cli: args, @@ -107,16 +138,9 @@ async fn main() { agent_state: Default::default(), reconcilation_handle: Default::default(), child: Default::default(), + resolved_addrs: Default::default(), }); - // create rpc channels - let (client_response_in, client_transport, mut client_request_out) = RpcTransport::new(); - let (server_request_in, server_transport, mut server_response_out) = RpcTransport::new(); - - // set up the client, facing the control plane - let _client = - ControlServiceClient::new(tarpc::client::Config::default(), client_transport).spawn(); - // initialize and start the rpc server let rpc_server = tarpc::server::BaseChannel::with_defaults(server_transport); tokio::spawn( @@ -282,6 +306,9 @@ async fn check_binary(base_url: &str, path: &Path) -> anyhow::Result<()> { file.write_all(&chunk?).await?; } + // ensure the permissions are set + tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).await?; + Ok(()) } diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index ac2c4e8a..a95fad25 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -1,4 +1,4 @@ -use std::{net::IpAddr, ops::Deref, process::Stdio, sync::Arc}; +use std::{collections::HashSet, net::IpAddr, ops::Deref, process::Stdio, sync::Arc}; use snot_common::{ rpc::{ @@ -6,14 +6,14 @@ use snot_common::{ control::{ControlServiceRequest, ControlServiceResponse}, MuxMessage, }, - state::AgentState, + state::{AgentId, AgentPeer, AgentState, PortConfig}, }; use tarpc::{context, ClientMessage, Response}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, }; -use tracing::{debug, info, warn, Level}; +use tracing::{debug, error, info, warn, Level}; use crate::{api, state::AppState}; @@ -70,11 +70,14 @@ impl AgentService for AgentRpcServer { _: context::Context, target: AgentState, ) -> Result<(), ReconcileError> { + info!("beginning reconcilation..."); + // acquire the handle lock let mut handle_container = self.state.reconcilation_handle.lock().await; // abort if we are already reconciling if let Some(handle) = handle_container.take() { + info!("aborting previous reconcilation task..."); handle.abort(); } @@ -87,6 +90,7 @@ impl AgentService for AgentRpcServer { match agent_state_lock.deref() { // kill existing child if running AgentState::Node(_, node) if node.online => { + info!("cleaning up snarkos process..."); if let Some(mut child) = state.child.write().await.take() { child.kill().await.expect("failed to kill child process"); } @@ -137,12 +141,14 @@ impl AgentService for AgentRpcServer { }; let genesis_url = format!( - "http://{}/api/storage/{storage_id}/genesis", + "http://{}/api/v1/storage/{storage_id}/genesis", &state.endpoint ); - let ledger_url = - format!("http://{}/api/storage/{storage_id}/ledger", &state.endpoint); + let ledger_url = format!( + "http://{}/api/v1/storage/{storage_id}/ledger", + &state.endpoint + ); // download the genesis block api::download_file(genesis_url, base_path.join(SNARKOS_GENESIS_FILE)) @@ -162,7 +168,7 @@ impl AgentService for AgentRpcServer { // use `tar` to decompress the storage let mut tar_child = Command::new("tar") .current_dir(base_path) - .arg("-xzf") + .arg("xzf") .arg(LEDGER_STORAGE_FILE) .kill_on_drop(true) .spawn() @@ -188,7 +194,7 @@ impl AgentService for AgentRpcServer { } // reconcile towards new state - match target { + match target.clone() { // do nothing on inventory state AgentState::Inventory => (), @@ -197,16 +203,16 @@ impl AgentService for AgentRpcServer { let mut child_lock = state.child.write().await; let mut command = Command::new(state.cli.path.join(SNARKOS_FILE)); - // TODO: more args command + // .kill_on_drop(true) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .stdin(Stdio::null()) - .arg("run") + // .stdin(Stdio::null()) .arg("--log") .arg(state.cli.path.join(SNARKOS_LOG_FILE)) + .arg("run") .arg("--type") - .arg(node.ty.flag()) + .arg(node.ty.to_string()) // storage configuration .arg("--genesis") .arg(state.cli.path.join(SNARKOS_GENESIS_FILE)) @@ -226,47 +232,97 @@ impl AgentService for AgentRpcServer { command.arg("--private-key").arg(pk); } - if !node.peers.is_empty() { - // TODO: add peers - - // TODO: local caching of agent IDs, map agent ID to - // IP/port + // Find agents that do not have cached addresses + let unresolved_addrs: HashSet = { + let resolved_addrs = state.resolved_addrs.read().await; + node.peers + .iter() + .chain(node.validators.iter()) + .filter_map(|p| { + if let AgentPeer::Internal(id, _) = p { + (!resolved_addrs.contains_key(id)).then_some(*id) + } else { + None + } + }) + .collect() + }; + + // Fetch all unresolved addresses and update the cache + if !unresolved_addrs.is_empty() { + tracing::debug!("need to resolve addrs: {unresolved_addrs:?}"); + let new_addrs = state + .client + .resolve_addrs(context::current(), unresolved_addrs) + .await + .map_err(|err| { + error!("rpc error while resolving addresses: {err}"); + ReconcileError::Unknown + })? + .map_err(ReconcileError::ResolveAddrError)?; + tracing::debug!("resolved new addrs: {new_addrs:?}"); + state.resolved_addrs.write().await.extend(new_addrs); } - // TODO: same for validators - - // TODO: ensure node is not killed if the reconciled state is the same + if !node.peers.is_empty() { + command + .arg("--peers") + .arg(state.agentpeers_to_cli(&node.peers).await.join(",")); + } - // ensure the previos node is properly killed - if let Some(mut child) = child_lock.take() { - if let Err(e) = child.kill().await { - warn!("failed to kill old node: {e}") - } + if !node.validators.is_empty() { + command + .arg("--validators") + .arg(state.agentpeers_to_cli(&node.validators).await.join(",")); } if node.online { + tracing::trace!("spawning node process..."); + tracing::debug!("node command: {command:?}"); let mut child = command.spawn().expect("failed to start child"); // start a new task to log stdout // TODO: probably also want to read stderr - let stdout = child.stdout.take().unwrap(); + let stdout: tokio::process::ChildStdout = child.stdout.take().unwrap(); + let stderr: tokio::process::ChildStderr = child.stderr.take().unwrap(); + tokio::spawn(async move { let child_span = tracing::span!(Level::INFO, "child process stdout"); let _enter = child_span.enter(); - let mut reader = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - info!(line); + let mut reader1 = BufReader::new(stdout).lines(); + let mut reader2 = BufReader::new(stderr).lines(); + + loop { + tokio::select! { + Ok(line) = reader1.next_line() => { + if let Some(line) = line { + info!(line); + } else { + break; + } + } + Ok(Some(line)) = reader2.next_line() => { + error!(line); + } + } } }); *child_lock = Some(child); - // todo: check to ensure the node actually comes online by hitting the REST latest block + // todo: check to ensure the node actually comes online + // by hitting the REST latest block + } else { + tracing::debug!("skipping node spawn"); } } } + // After completing the reconcilation, update the agent state + let mut agent_state = state.agent_state.write().await; + *agent_state = target; + Ok(()) }); @@ -297,8 +353,15 @@ impl AgentService for AgentRpcServer { res } - #[doc = r" Control plane asks the agent for its external network address, along with local addrs."] - async fn get_addrs(self, _: context::Context) -> (Option, Vec) { - (self.state.external_addr, self.state.internal_addrs.clone()) + async fn get_addrs(self, _: context::Context) -> (PortConfig, Option, Vec) { + ( + PortConfig { + bft: self.state.cli.bft, + node: self.state.cli.node, + rest: self.state.cli.rest, + }, + self.state.external_addr, + self.state.internal_addrs.clone(), + ) } } diff --git a/crates/snot-agent/src/state.rs b/crates/snot-agent/src/state.rs index f02108de..20133c6f 100644 --- a/crates/snot-agent/src/state.rs +++ b/crates/snot-agent/src/state.rs @@ -1,9 +1,13 @@ use std::{ + collections::HashMap, net::{IpAddr, SocketAddr}, sync::{Arc, Mutex}, }; -use snot_common::state::AgentState; +use snot_common::{ + rpc::control::ControlServiceClient, + state::{AgentId, AgentPeer, AgentState}, +}; use tokio::{ process::Child, sync::{Mutex as AsyncMutex, RwLock}, @@ -16,6 +20,8 @@ pub type AppState = Arc; /// Global state for this agent runner. pub struct GlobalState { + pub client: ControlServiceClient, + pub external_addr: Option, pub internal_addrs: Vec, pub cli: Cli, @@ -25,4 +31,24 @@ pub struct GlobalState { pub reconcilation_handle: AsyncMutex>, pub child: RwLock>, /* TODO: this may need to be handled by an owning thread, * not sure yet */ + // Map of agent IDs to their resolved addresses. + pub resolved_addrs: RwLock>, +} + +impl GlobalState { + // Resolve the addresses of the given agents. + // Locks resolve_addrs + pub async fn agentpeers_to_cli(&self, peers: &[AgentPeer]) -> Vec { + let resolved_addrs = self.resolved_addrs.read().await; + peers + .iter() + .filter_map(|p| match p { + AgentPeer::Internal(id, port) => resolved_addrs + .get(id) + .copied() + .map(|addr| std::net::SocketAddr::new(addr, *port).to_string()), + AgentPeer::External(addr) => Some(addr.to_string()), + }) + .collect::>() + } } diff --git a/crates/snot-common/src/rpc/agent.rs b/crates/snot-common/src/rpc/agent.rs index b91a2df9..08a48343 100644 --- a/crates/snot-common/src/rpc/agent.rs +++ b/crates/snot-common/src/rpc/agent.rs @@ -3,7 +3,9 @@ use std::net::IpAddr; use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::state::AgentState; +use crate::state::{AgentState, PortConfig}; + +use super::control::ResolveError; /// The RPC service that agents implement as a server. #[tarpc::service] @@ -12,7 +14,7 @@ pub trait AgentService { async fn keep_jwt(jwt: String); /// Control plane asks the agent for its external network address, along with local addrs. - async fn get_addrs() -> (Option, Vec); + async fn get_addrs() -> (PortConfig, Option, Vec); /// Control plane instructs the agent to reconcile towards a particular /// state. @@ -25,6 +27,8 @@ pub enum ReconcileError { Aborted, #[error("failed to download the specified storage")] StorageAcquireError, + #[error("failed to resolve addresses of stated peers")] + ResolveAddrError(ResolveError), #[error("unknown error")] Unknown, } diff --git a/crates/snot-common/src/rpc/control.rs b/crates/snot-common/src/rpc/control.rs index 81ade673..f5a3f039 100644 --- a/crates/snot-common/src/rpc/control.rs +++ b/crates/snot-common/src/rpc/control.rs @@ -1,4 +1,27 @@ +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, +}; + +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::state::AgentId; + #[tarpc::service] pub trait ControlService { async fn placeholder() -> String; + + /// Resolve the addresses of the given agents. + async fn resolve_addrs( + peers: HashSet, + ) -> Result, ResolveError>; +} + +#[derive(Debug, Error, Serialize, Deserialize)] +pub enum ResolveError { + #[error("source agent not found")] + SourceAgentNotFound, + #[error("agent has no addresses")] + AgentHasNoAddresses, } diff --git a/crates/snot-common/src/state.rs b/crates/snot-common/src/state.rs index e47476ea..330d27bf 100644 --- a/crates/snot-common/src/state.rs +++ b/crates/snot-common/src/state.rs @@ -30,6 +30,23 @@ pub struct NodeState { pub validators: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PortConfig { + pub bft: u16, + pub node: u16, + pub rest: u16, +} + +impl Display for PortConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "bft: {}, node: {}, rest: {}", + self.bft, self.node, self.rest + ) + } +} + // // agent code // impl AgentState { // async fn reconcile(&self, target: AgentState) { @@ -91,10 +108,28 @@ pub enum HeightRequest { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum AgentPeer { - Internal(AgentId), + Internal(AgentId, u16), External(SocketAddr), } +impl AgentPeer { + /// Get the port from the peer + pub fn port(&self) -> u16 { + match self { + Self::Internal(_, port) => *port, + Self::External(addr) => addr.port(), + } + } + + /// Return a new peer with the given port. + pub fn with_port(&self, port: u16) -> Self { + match self { + Self::Internal(ip, _) => Self::Internal(*ip, port), + Self::External(addr) => Self::External(SocketAddr::new(addr.ip(), port)), + } + } +} + // /// The state reported by an agent. // #[derive(Debug, Default, Clone, Serialize, Deserialize, Hash)] // pub struct ResolvedState { @@ -140,6 +175,7 @@ pub struct NodeKey { } #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum NodeType { Client, Validator, diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 1826ab38..701cf24c 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] anyhow.workspace = true -axum = { version = "0.7.4", features = ["ws"] } +axum = { workspace = true, features = ["ws"] } bimap.workspace = true bincode.workspace = true clap.workspace = true @@ -25,6 +25,8 @@ sha2 = "0.10.8" snot-common = { path = "../snot-common" } tarpc.workspace = true tokio.workspace = true -tower-http = { version = "0.5.2", features = ["fs"] } +tower-http.workspace = true +tracing-appender.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +wildmatch = "2.3.3" diff --git a/crates/snot/src/cli.rs b/crates/snot/src/cli.rs index 7ba94ada..1a38f866 100644 --- a/crates/snot/src/cli.rs +++ b/crates/snot/src/cli.rs @@ -8,7 +8,7 @@ pub struct Cli { /// Control plane server port pub port: u16, - #[arg(long, default_value = "./snot-control-data")] + #[arg(long, default_value = "snot-control-data")] /// Path to the directory containing the stored data pub path: PathBuf, } diff --git a/crates/snot/src/main.rs b/crates/snot/src/main.rs index 81f3b5d9..eb2fb190 100644 --- a/crates/snot/src/main.rs +++ b/crates/snot/src/main.rs @@ -1,3 +1,5 @@ +use std::io; + use clap::Parser; use cli::Cli; use tracing::level_filters::LevelFilter; @@ -11,15 +13,32 @@ pub mod testing; #[tokio::main] async fn main() { + let env_filter = if cfg!(debug_assertions) { + tracing_subscriber::EnvFilter::builder().with_default_directive(LevelFilter::TRACE.into()) + } else { + tracing_subscriber::EnvFilter::builder().with_default_directive(LevelFilter::INFO.into()) + }; + + let env_filter = env_filter + .parse_lossy("") + .add_directive("tungstenite=off".parse().unwrap()) + .add_directive("tokio_tungstenite=off".parse().unwrap()) + .add_directive("tarpc::client=ERROR".parse().unwrap()) + .add_directive("tarpc::server=ERROR".parse().unwrap()); + + let (stdout, _guard) = tracing_appender::non_blocking(io::stdout()); + + let output = tracing_subscriber::fmt::layer().with_writer(stdout); + + let output = if cfg!(debug_assertions) { + output.with_file(true).with_line_number(true) + } else { + output + }; + tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .parse_lossy("") - .add_directive("tarpc::client=ERROR".parse().unwrap()) - .add_directive("tarpc::server=ERROR".parse().unwrap()), - ) - .with(tracing_subscriber::fmt::layer()) + .with(env_filter) + .with(output) .try_init() .unwrap(); diff --git a/crates/snot/src/schema/mod.rs b/crates/snot/src/schema/mod.rs index ebb0ad6b..fef59d3f 100644 --- a/crates/snot/src/schema/mod.rs +++ b/crates/snot/src/schema/mod.rs @@ -7,6 +7,7 @@ use serde::{ Deserialize, }; use snot_common::state::{NodeKey, NodeType}; +use wildmatch::WildMatch; pub mod infrastructure; pub mod nodes; @@ -187,3 +188,45 @@ impl From for NodeTarget { } } } + +impl NodeTarget { + pub fn matches(&self, key: &NodeKey) -> bool { + (match self.ty { + NodeTargetType::All => true, + NodeTargetType::One(ty) => ty == key.ty, + }) && (match self.id { + NodeTargetId::All => true, + NodeTargetId::WildcardPattern(ref pattern) => WildMatch::new(pattern).matches(&key.id), + NodeTargetId::Literal(ref id) => &key.id == id, + }) && (match self.ns { + NodeTargetNamespace::All => true, + NodeTargetNamespace::Local => key.ns.is_none() || key.ns == Some("local".into()), + NodeTargetNamespace::Literal(ref ns) => { + ns == "local" && key.ns.is_none() + || key.ns.as_ref().map_or(false, |key_ns| key_ns == ns) + } + }) + } +} + +impl NodeTargets { + pub fn is_empty(&self) -> bool { + if matches!(self, &NodeTargets::None) { + return true; + } + + if let NodeTargets::Many(targets) = self { + return targets.is_empty(); + } + + false + } + + pub fn matches(&self, key: &NodeKey) -> bool { + match self { + NodeTargets::None => false, + NodeTargets::One(target) => target.matches(key), + NodeTargets::Many(targets) => targets.iter().any(|target| target.matches(key)), + } + } +} diff --git a/crates/snot/src/schema/nodes.rs b/crates/snot/src/schema/nodes.rs index cec39d12..82217e53 100644 --- a/crates/snot/src/schema/nodes.rs +++ b/crates/snot/src/schema/nodes.rs @@ -9,6 +9,9 @@ use super::{NodeKey, NodeTargets}; /// A document describing the node infrastructure for a test. #[derive(Deserialize, Debug, Clone)] pub struct Document { + pub name: String, + pub description: Option, + #[serde(default)] pub external: IndexMap, @@ -26,10 +29,17 @@ pub struct ExternalNode { pub rest: Option, } +// zander forgive me -isaac +fn please_be_online() -> bool { + true +} + // TODO: could use some more clarification on some of these fields /// A node in the testing infrastructure. #[derive(Deserialize, Debug, Clone)] pub struct Node { + #[serde(default = "please_be_online")] + pub online: bool, /// When specified, creates a group of nodes, all with the same /// configuration. pub replicas: Option, @@ -39,6 +49,7 @@ pub struct Node { // `Committee(usize)`, `Named(String)`, `Literal(String)` pub key: Option, /// The storage ID to use when starting the node. + /// TODO: move this outside of the node. this is a setting for the swarm pub storage: String, /// Height of ledger to inherit. /// @@ -62,11 +73,10 @@ impl Node { // TODO height: (0, HeightRequest::Top), - // TODO: should this be online? - online: true, - // TODO: resolve validators + online: self.online, + + // these are resolved later validators: vec![], - // TODO: resolve peers peers: vec![], } } diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index acd4dbdb..85470bb1 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -19,6 +19,7 @@ pub struct Document { pub name: String, pub description: Option, /// Prefer using existing storage instead of generating new stuff. + #[serde(default)] pub prefer_existing: bool, pub generate: Option, } @@ -30,7 +31,9 @@ pub struct StorageGeneration { pub path: PathBuf, // TODO: individually validate arguments, or just pass them like this? + #[serde(default)] pub genesis: GenesisGeneration, + #[serde(default)] pub ledger: LedgerGeneration, #[serde(default)] @@ -149,6 +152,9 @@ impl Document { // TODO: is this the behavior we want? warn!("the specified storage ID {id} already exists, using that one instead"); break 'generate; + } else { + tracing::debug!("generating storage for {id}"); + tokio::fs::create_dir_all(&base).await?; } generation.genesis = GenesisGeneration { @@ -163,10 +169,13 @@ impl Document { // }; // generate the genesis block using the aot cli - Command::new("./target/release/snarkos-aot") + let bin = std::env::var("AOT_BIN").map(PathBuf::from).unwrap_or( + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../target/release/snarkos-aot"), + ); + let res = Command::new(bin) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .current_dir(&base) .arg("genesis") .arg("--output") .arg(&generation.genesis.output) @@ -178,10 +187,44 @@ impl Document { .arg("5") .arg("--additional-accounts-output") .arg(base.join("accounts.json")) + .arg("--ledger") + .arg(base.join("ledger")) .spawn()? .wait() .await?; + if !res.success() { + warn!("failed to run genesis generation command..."); + } + + if tokio::fs::try_exists(&generation.genesis.output) + .await + .is_err() + { + anyhow::bail!("failed to generate {:#?}", generation.genesis.output); + } + + let res = Command::new("tar") + .current_dir(&base) + .arg("czf") + .arg("ledger.tar.gz") // TODO: move constants from client... + .arg("ledger/*") + .kill_on_drop(true) + .spawn()? + .wait() + .await?; + + if !res.success() { + warn!("error running tar command..."); + } + + if tokio::fs::try_exists(&base.join("ledger.tar.gz")) + .await + .is_err() + { + anyhow::bail!("failed to tar the ledger"); + } + // TODO: transactions } diff --git a/crates/snot/src/server/api.rs b/crates/snot/src/server/api.rs index 511dfa17..ffee498b 100644 --- a/crates/snot/src/server/api.rs +++ b/crates/snot/src/server/api.rs @@ -20,6 +20,7 @@ pub(super) fn routes() -> Router { } #[derive(Deserialize)] +#[serde(rename_all = "lowercase")] enum StorageType { Genesis, Ledger, @@ -27,7 +28,7 @@ enum StorageType { async fn redirect_storage( Path((storage_id, ty)): Path<(usize, StorageType)>, - State(state): State, + state: State, ) -> Response { let Some(real_id) = state.storage.read().await.get_by_left(&storage_id).cloned() else { return StatusCode::NOT_FOUND.into_response(); @@ -41,14 +42,21 @@ async fn redirect_storage( Redirect::temporary(&format!("/content/storage/{real_id}/{filename}")).into_response() } -async fn get_agents(State(state): State) -> impl IntoResponse { +async fn get_agents(state: State) -> impl IntoResponse { // TODO: return actual relevant info about agents Json(json!({ "count": state.pool.read().await.len() })) } -async fn post_test_prepare(State(state): State, body: String) -> Response { - let Ok(documents) = Test::deserialize(&body) else { - return StatusCode::BAD_REQUEST.into_response(); +async fn post_test_prepare(state: State, body: String) -> Response { + let documents = match Test::deserialize(&body) { + Ok(documents) => documents, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": format!("{e}")})), + ) + .into_response(); + } }; // TODO: some live state to report to the calling CLI or something would be @@ -56,6 +64,8 @@ async fn post_test_prepare(State(state): State, body: String) -> Respo // TODO: clean up existing test + // TODO: support concurrent tests + return test id + match Test::prepare(documents, &state).await { Ok(_) => StatusCode::OK.into_response(), Err(e) => ( @@ -68,7 +78,11 @@ async fn post_test_prepare(State(state): State, body: String) -> Respo async fn delete_test(State(state): State) -> impl IntoResponse { match Test::cleanup(&state).await { - Ok(_) => StatusCode::OK, - Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + Ok(_) => StatusCode::OK.into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}") })), + ) + .into_response(), } } diff --git a/crates/snot/src/server/content.rs b/crates/snot/src/server/content.rs index a3f3cf04..6023a0e9 100644 --- a/crates/snot/src/server/content.rs +++ b/crates/snot/src/server/content.rs @@ -7,6 +7,7 @@ use crate::state::GlobalState; pub(super) async fn init_routes(state: &GlobalState) -> Router { // create storage path let storage_path = state.cli.path.join("storage"); + tracing::debug!("storage path: {:?}", storage_path); tokio::fs::create_dir_all(&storage_path) .await .expect("failed to create ledger storage path"); @@ -15,5 +16,5 @@ pub(super) async fn init_routes(state: &GlobalState) -> Router { // the snarkOS binary .route_service("/snarkos", ServeFile::new("./target/release/snarkos-aot")) // ledger/block storage derived from tests (.tar.gz'd) - .route_service("/storage", ServeDir::new(storage_path)) + .nest_service("/storage", ServeDir::new(storage_path)) } diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index d242aade..f5cb6999 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -1,14 +1,15 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use ::jwt::VerifyWithKey; use anyhow::Result; use axum::{ + body::Body, extract::{ ws::{Message, WebSocket}, State, WebSocketUpgrade, }, - http::HeaderMap, - response::IntoResponse, + http::{HeaderMap, Request}, + response::{IntoResponse, Response}, routing::get, Router, }; @@ -19,7 +20,8 @@ use snot_common::{ }; use tarpc::server::Channel; use tokio::select; -use tracing::{info, warn}; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{info, warn, Span}; use self::{ jwt::{Claims, JWT_NONCE, JWT_SECRET}, @@ -48,7 +50,18 @@ pub async fn start(cli: Cli) -> Result<()> { .route("/agent", get(agent_ws_handler)) .nest("/api/v1", api::routes()) .nest("/content", content::init_routes(&state).await) - .with_state(Arc::new(state)); + .with_state(Arc::new(state)) + .layer( + TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::new().include_headers(true)) + .on_request(|request: &Request, _span: &Span| { + tracing::info!("req {} - {}", request.method(), request.uri()); + }) + .on_response(|response: &Response, _latency: Duration, span: &Span| { + span.record("status_code", &tracing::field::display(response.status())); + tracing::info!("res {}", response.status()) + }), + ); let listener = tokio::net::TcpListener::bind("0.0.0.0:1234").await?; axum::serve(listener, app).await?; @@ -153,10 +166,11 @@ async fn handle_socket(mut socket: WebSocket, headers: HeaderMap, state: AppStat // fetch the agent's network addresses on connect/reconnect let state2 = state.clone(); tokio::spawn(async move { - if let Ok((external, internal)) = client.get_addrs(tarpc::context::current()).await { + if let Ok((ports, external, internal)) = client.get_addrs(tarpc::context::current()).await { let mut state = state2.pool.write().await; if let Some(agent) = state.get_mut(&id) { - info!("agent {id} addrs: {external:?} {internal:?}"); + info!("agent {id} addrs: {external:?} {internal:?} @ {ports}"); + agent.set_ports(ports); agent.set_addrs(external, internal); } } diff --git a/crates/snot/src/server/rpc.rs b/crates/snot/src/server/rpc.rs index 52dadc49..40a16cee 100644 --- a/crates/snot/src/server/rpc.rs +++ b/crates/snot/src/server/rpc.rs @@ -1,11 +1,20 @@ -use snot_common::rpc::{ - agent::{AgentServiceRequest, AgentServiceResponse}, - control::{ControlService, ControlServiceRequest, ControlServiceResponse}, - MuxMessage, +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, +}; + +use snot_common::{ + rpc::{ + agent::{AgentServiceRequest, AgentServiceResponse}, + control::{ControlService, ControlServiceRequest, ControlServiceResponse, ResolveError}, + MuxMessage, + }, + state::AgentId, }; use tarpc::{context, ClientMessage, Response}; use super::AppState; +use crate::state::resolve_addrs; /// A multiplexed message, incoming on the websocket. pub type MuxedMessageIncoming = @@ -25,4 +34,19 @@ impl ControlService for ControlRpcServer { async fn placeholder(self, _: context::Context) -> String { "Hello, world".into() } + + async fn resolve_addrs( + self, + _: context::Context, + mut peers: HashSet, + ) -> Result, ResolveError> { + peers.insert(self.agent); + + let addr_map = self + .state + .get_addr_map(Some(&peers)) + .await + .map_err(|_| ResolveError::AgentHasNoAddresses)?; + resolve_addrs(&addr_map, self.agent, &peers).map_err(|_| ResolveError::SourceAgentNotFound) + } } diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index 6c72e125..77f302b4 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, net::IpAddr, sync::{ atomic::{AtomicUsize, Ordering}, @@ -8,11 +8,12 @@ use std::{ time::Instant, }; +use anyhow::{anyhow, Result}; use bimap::BiMap; use jwt::SignWithKey; use snot_common::{ rpc::agent::{AgentServiceClient, ReconcileError}, - state::AgentState, + state::{AgentState, PortConfig}, }; use tarpc::{client::RpcError, context}; use tokio::sync::RwLock; @@ -34,9 +35,13 @@ pub struct GlobalState { pub pool: RwLock>, /// A map from ephemeral integer storage ID to actual storage ID. pub storage: RwLock>, + // TODO: support concurrent tests pub test: RwLock>, } +/// This is the representation of a public addr or a list of internal addrs. +pub type AgentAddrs = (Option, Vec); + /// An active agent, known by the control plane. #[derive(Debug)] pub struct Agent { @@ -46,7 +51,8 @@ pub struct Agent { state: AgentState, /// The external address of the agent, along with its local addresses. - addrs: Option<(Option, Vec)>, + ports: Option, + addrs: Option, } pub struct AgentClient(AgentServiceClient); @@ -64,6 +70,7 @@ impl Agent { }, connection: AgentConnection::Online(rpc), state: Default::default(), + ports: None, addrs: None, } } @@ -72,6 +79,15 @@ impl Agent { matches!(self.connection, AgentConnection::Online(_)) } + /// Whether this agent is capable of being a node in the network. + pub fn is_node_capable(&self) -> bool { + if !self.is_connected() || self.addrs.is_none() { + return false; + }; + let (external, internal) = self.addrs.as_ref().unwrap(); + external.is_some() || !internal.is_empty() + } + /// The ID of this agent. pub fn id(&self) -> usize { self.id @@ -124,7 +140,30 @@ impl Agent { self.state = state; } - /// Set the external and internal addresses of the agent. This does **not** trigger a reconcile + /// Set the ports of the agent. This does **not** trigger a reconcile + pub fn set_ports(&mut self, ports: PortConfig) { + self.ports = Some(ports); + } + + // Gets the bft port of the agent. Assumes the agent is ready, returns 0 if not. + pub fn bft_port(&self) -> u16 { + self.ports.as_ref().map(|p| p.bft).unwrap_or_default() + } + + // Gets the node port of the agent. Assumes the agent is ready, returns 0 if + // not. + pub fn node_port(&self) -> u16 { + self.ports.as_ref().map(|p| p.node).unwrap_or_default() + } + + // Gets the rest port of the agent. Assumes the agent is ready, returns 0 if + // not. + pub fn rest_port(&self) -> u16 { + self.ports.as_ref().map(|p| p.rest).unwrap_or_default() + } + + /// Set the external and internal addresses of the agent. This does **not** + /// trigger a reconcile pub fn set_addrs(&mut self, external_addr: Option, internal_addrs: Vec) { self.addrs = Some((external_addr, internal_addrs)); } @@ -141,3 +180,69 @@ pub enum AgentConnection { Online(AgentServiceClient), Offline { since: Instant }, } + +pub type AddrMap = HashMap; + +/// Given a map of addresses, resolve the addresses of a set of peers relative +/// to a source agent. +pub fn resolve_addrs( + addr_map: &AddrMap, + src: AgentId, + peers: &HashSet, +) -> Result> { + let src_addrs = addr_map + .get(&src) + .ok_or_else(|| anyhow!("source agent not found"))?; + + let all_internal = addr_map.values().all(|(ext, _)| ext.is_none()); + + Ok(peers + .iter() + .filter_map(|id| { + // ignore the source agent + if *id == src { + return None; + } + + // if the agent has no addresses, skip it + let Some(addrs) = addr_map.get(id) else { + return None; + }; + + // if there are no external addresses in the entire addr map, + // use the first internal address + if all_internal { + return addrs.1.first().copied().map(|addr| (*id, addr)); + } + + match (src_addrs.0, addrs.0, addrs.1.first()) { + // if peers have the same external address, use the first internal address + (Some(src_ext), Some(peer_ext), Some(peer_int)) if src_ext == peer_ext => { + Some((*id, *peer_int)) + } + // otherwise use the external address + (_, Some(peer_ext), _) => Some((*id, peer_ext)), + _ => None, + } + }) + .collect()) +} +impl GlobalState { + /// Get a peer-to-addr mapping for a set of agents + /// Locks pools for reading + pub async fn get_addr_map(&self, filter: Option<&HashSet>) -> Result { + self.pool + .read() + .await + .iter() + .filter(|(id, _)| filter.is_none() || filter.is_some_and(|p| p.contains(id))) + .map(|(id, agent)| { + let addrs = agent + .addrs + .as_ref() + .ok_or_else(|| anyhow!("agent has no addresses"))?; + Ok((*id, addrs.clone())) + }) + .collect() + } +} diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index c40506ea..87e6762b 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -1,28 +1,50 @@ -use anyhow::{bail, ensure}; +use anyhow::{anyhow, bail, ensure}; use bimap::BiMap; use futures_util::future::join_all; use indexmap::{map::Entry, IndexMap}; use serde::Deserialize; -use snot_common::state::{AgentPeer, AgentState, NodeKey}; +use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ - schema::{nodes::Node, ItemDocument}, + schema::{ + nodes::{ExternalNode, Node}, + ItemDocument, NodeTargets, + }, state::GlobalState, }; #[derive(Debug, Clone)] pub struct Test { - pub node_map: BiMap, - pub initial_nodes: IndexMap, + pub node_map: BiMap, + pub initial_nodes: IndexMap, // TODO: GlobalStorage.storage should maybe be here instead } +#[derive(Debug, Clone)] +/// The effective test state of a node. +pub enum TestNode { + Internal(Node), + External(ExternalNode), +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +/// A way of looking up a peer in the test state. +/// Could technically use AgentPeer like this but it would have needless port +/// information +pub enum TestPeer { + Internal(AgentId), + External, +} + impl Test { /// Deserialize (YAML) many documents into a `Vec` of documents. - pub fn deserialize(str: &str) -> Result, serde_yaml::Error> { + pub fn deserialize(str: &str) -> Result, anyhow::Error> { serde_yaml::Deserializer::from_str(str) - .map(ItemDocument::deserialize) + .enumerate() + .map(|(i, doc)| { + ItemDocument::deserialize(doc).map_err(|e| anyhow!("document {i}: {e}")) + }) .collect() } @@ -64,18 +86,21 @@ impl Test { match test.initial_nodes.entry(node_key) { Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()), - Entry::Vacant(ent) => ent.insert(doc_node.to_owned()), + Entry::Vacant(ent) => { + // replace the key with a new one + let mut node = doc_node.to_owned(); + if let Some(key) = node.key.take() { + node.key = Some(key.replace('$', &i.to_string())) + } + ent.insert(TestNode::Internal(node)) + } }; } } - // TODO: external nodes - // for (node_key, node) in nodes.external { - // } - // delegate agents to become nodes let pool = state.pool.read().await; - let online_agents = pool.values().filter(|a| a.is_connected()); + let online_agents = pool.values().filter(|a| a.is_node_capable()); let num_online_agents = online_agents.clone().count(); ensure!( @@ -92,8 +117,24 @@ impl Test { test.initial_nodes .keys() .cloned() - .zip(online_agents.map(|agent| AgentPeer::Internal(agent.id()))), + .zip(online_agents.map(|agent| TestPeer::Internal(agent.id()))), ); + + // append external nodes to the node map + + for (node_key, node) in &nodes.external { + match test.initial_nodes.entry(node_key.clone()) { + Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()), + Entry::Vacant(ent) => ent.insert(TestNode::External(node.to_owned())), + }; + } + test.node_map.extend( + nodes + .external + .keys() + .cloned() + .map(|k| (k, TestPeer::External)), + ) } _ => warn!("ignored unimplemented document type"), @@ -110,40 +151,58 @@ impl Test { Ok(()) } - pub async fn cleanup(state: &GlobalState) -> anyhow::Result<()> { - let mut state_lock = state.test.write().await; - let mut agents = state.pool.write().await; + // TODO: cleanup by test id, rather than cleanup EVERY agent... - *state_lock = None; + pub async fn cleanup(state: &GlobalState) -> anyhow::Result<()> { + // clear the test state + { + info!("clearing test state..."); + let mut state_lock = state.test.write().await; + *state_lock = None; + } // reconcile all online agents - let handles = agents - .values() - .filter_map(|agent| agent.client_owned()) - .map(|client| { - tokio::spawn(async move { client.reconcile(AgentState::Inventory).await }) - }); + let (ids, handles): (Vec<_>, Vec<_>) = { + let agents = state.pool.read().await; + agents + .values() + .filter_map(|agent| agent.client_owned().map(|client| (agent.id(), client))) + .map(|(id, client)| { + ( + id, + tokio::spawn(async move { client.reconcile(AgentState::Inventory).await }), + ) + }) + .unzip() + }; + info!("inventorying {} agents...", ids.len()); let reconciliations = join_all(handles).await; + info!("reconcile done, updating agent states..."); - for (agent, result) in agents.values_mut().zip(reconciliations) { + let mut agents = state.pool.write().await; + let mut success = 0; + let num_reconciles = ids.len(); + for (id, result) in ids.into_iter().zip(reconciliations) { match result { // oh god - Ok(Ok(Ok(()))) => agent.set_state(AgentState::Inventory), + Ok(Ok(Ok(()))) => { + if let Some(agent) = agents.get_mut(&id) { + agent.set_state(AgentState::Inventory); + success += 1; + } else { + warn!("agent {id} not found in pool after successful reconcile") + } + } // reconcile error - Ok(Ok(Err(e))) => warn!( - "agent {} experienced a reconcilation error: {e}", - agent.id() - ), + Ok(Ok(Err(e))) => warn!("agent {id} experienced a reconcilation error: {e}",), // could be a tokio error or an RPC error - _ => warn!( - "agent {} failed to cleanup for an unknown reason", - agent.id() - ), + _ => warn!("agent {id} failed to cleanup for an unknown reason"), } } + info!("cleanup result: {success}/{num_reconciles} agents inventoried"); Ok(()) } @@ -151,49 +210,111 @@ impl Test { /// Reconcile all associated nodes with their initial state. pub async fn initial_reconcile(state: &GlobalState) -> anyhow::Result<()> { - let test_lock = state.test.read().await; - let mut pool_lock = state.pool.write().await; - let storage_lock = state.storage.read().await; - - let test = test_lock.as_ref().unwrap(); - let mut handles = vec![]; let mut agent_ids = vec![]; - for (key, node) in &test.initial_nodes { - // get the numeric storage ID from the string storage ID - let storage_id = match storage_lock.get_by_right(&node.storage) { - Some(id) => *id, - None => bail!("invalid storage ID specified for node"), - }; - - // get the internal agent ID from the node key - let Some(AgentPeer::Internal(id)) = test.node_map.get_by_left(key) else { - continue; + { + let test_lock = state.test.read().await; + let test = test_lock.as_ref().unwrap(); + let storage_lock = state.storage.read().await; + let pool_lock = state.pool.read().await; + + // Lookup agent peers given a node key + let node_to_agent = |key: &NodeKey, node: &TestPeer, is_validator: bool| { + // get the internal agent ID from the node key + match node { + // internal peers are mapped to internal agents + TestPeer::Internal(id) => { + let Some(agent) = pool_lock.get(id) else { + bail!("agent {id} not found in pool") + }; + + Ok(AgentPeer::Internal( + *id, + if is_validator { + agent.bft_port() + } else { + agent.node_port() + }, + )) + } + // external peers are mapped to external nodes + TestPeer::External => { + let Some(TestNode::External(external)) = test.initial_nodes.get(key) else { + bail!("external node with key {key} not found") + }; + + Ok(AgentPeer::External(if is_validator { + external + .bft + .ok_or_else(|| anyhow!("external node {key} is missing BFT port"))? + } else { + external + .node + .ok_or_else(|| anyhow!("external node {key} is missing Node port"))? + })) + } + } }; - let Some(agent) = pool_lock.get(&id) else { - continue; - }; + let matching_nodes = |key: &NodeKey, target: &NodeTargets, is_validator: bool| { + if target.is_empty() { + return Ok(vec![]); + } - let Some(client) = agent.client_owned() else { - continue; + // this can't really be cleverly optimized into + // a single lookup at the moment because we don't treat @local + // as a None namespace... + test.node_map + .iter() + .filter(|(k, _)| *k != key && target.matches(k)) + .map(|(k, v)| node_to_agent(k, v, is_validator)) + .collect() }; - let agent_state = AgentState::Node(storage_id, node.into_state(key.ty)); - agent_ids.push(id); - handles.push(tokio::spawn( - async move { client.reconcile(agent_state).await }, - )); + for (key, node) in &test.initial_nodes { + let TestNode::Internal(node) = node else { + continue; + }; + // get the numeric storage ID from the string storage ID + let storage_id = match storage_lock.get_by_right(&node.storage) { + Some(id) => *id, + None => bail!("invalid storage ID specified for node"), + }; + + // get the internal agent ID from the node key + let Some(TestPeer::Internal(id)) = test.node_map.get_by_left(key) else { + bail!("expected internal agent peer for node with key {key}") + }; + + let Some(client) = pool_lock.get(id).and_then(|a| a.client_owned()) else { + continue; + }; + + // resolve the peers and validators + let mut node_state = node.into_state(key.ty); + node_state.peers = matching_nodes(key, &node.peers, false)?; + node_state.validators = matching_nodes(key, &node.validators, true)?; + + let agent_state = AgentState::Node(storage_id, node_state); + agent_ids.push(*id); + handles.push(tokio::spawn( + async move { client.reconcile(agent_state).await }, + )); + } } let num_attempted_reconciliations = handles.len(); + + info!("waiting for reconcile..."); let reconciliations = join_all(handles).await; + info!("reconcile done, updating agent states..."); + let mut pool_lock = state.pool.write().await; let mut success = 0; for (agent_id, result) in agent_ids.into_iter().zip(reconciliations) { // safety: we acquired this before when building handles, agent_id wouldn't be // here if the corresponding agent didn't exist - let agent = pool_lock.get_mut(agent_id).unwrap(); + let agent = pool_lock.get_mut(&agent_id).unwrap(); match result { // oh god diff --git a/scripts/abort_test.sh b/scripts/abort_test.sh new file mode 100755 index 00000000..8e6be265 --- /dev/null +++ b/scripts/abort_test.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +curl 'http://127.0.0.1:1234/api/v1/test' -X DELETE + +# TODO: specify the test to abort + +# # check if index is set +# if [ ! -f "$1" ]; then +# echo "Usage: $0 " +# exit 1 +# fi +# curl -H "Content-Type: application/json" http://localhost:1234/api/v1/test/prepare -d "$(cat $1)" diff --git a/scripts/measure_tps.sh b/scripts/measure_tps.sh new file mode 100755 index 00000000..3cfd792a --- /dev/null +++ b/scripts/measure_tps.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash + +ENDPOINT="$1" +NUM_BLOCKS="$2" + +# check if mode is client, validator, or prover +if [ -z "$ENDPOINT" ]; then + echo "usage: $0 [num_blocks]" + exit 1 +fi + +# check if index is set +if [ -z "$NUM_BLOCKS" ]; then + NUM_BLOCKS=10 +fi + +HEIGHT="$(curl -s $ENDPOINT/mainnet/latest/height)" +if [ -z "$HEIGHT" ]; then + echo "error: failed to get height from $ENDPOINT" + exit 1 +fi + +if [ "$HEIGHT" -lt "$NUM_BLOCKS" ]; then + echo "error: ledger is shorter than test range ($HEIGHT < $NUM_BLOCKS)" + exit 1 +fi + +if [ "$NUM_BLOCKS" -lt 2 ]; then + echo "error: [num_blocks] must be at least 2" + exit 1 +fi + + +_YEL=$(tput setaf 3) +_RES=$(tput sgr0) +_BLD=$(tput bold) + +echo "fetching blocks $_YEL$((HEIGHT-NUM_BLOCKS+1))$_RES to $_YEL$HEIGHT$_RES" + +TOTAL_TX=0 +TOTAL_BLOCKS=0 +FIRST_TIMESTAMP="" +LAST_TIMESTAMP="" +MIN_BLOCK_TIME=999 +MAX_BLOCK_TIME=0 + +prev_block_time="" + +# get all the blocks from (HEIGHT-NUM_BLOCKS) to HEIGHT +for i in $(seq $((HEIGHT-NUM_BLOCKS+1)) $HEIGHT); do + block="$(curl -s $ENDPOINT/mainnet/block/$i)" + + # update timestamps + LAST_TIMESTAMP="$(echo $block | jq '.header.metadata.timestamp')" + if [ -z "$FIRST_TIMESTAMP" ]; then + FIRST_TIMESTAMP="$LAST_TIMESTAMP" + prev_block_time="$LAST_TIMESTAMP" + else + # only increment transactions if this is not the first block + # because the first block is the timestamp for the next block's transactions + num_tx=$(echo $block | jq '.transactions | length') + TOTAL_TX=$(expr $num_tx + $TOTAL_TX) + TOTAL_BLOCKS=$(expr $TOTAL_BLOCKS + 1) + + # calculate min/max block times + block_time=$(expr $LAST_TIMESTAMP - $prev_block_time) + if [ "$block_time" -lt "$MIN_BLOCK_TIME" ]; then + MIN_BLOCK_TIME=$block_time + fi + if [ "$block_time" -gt "$MAX_BLOCK_TIME" ]; then + MAX_BLOCK_TIME=$block_time + fi + prev_block_time=$LAST_TIMESTAMP + fi +done + +SPAN=$(expr $LAST_TIMESTAMP - $FIRST_TIMESTAMP) + +echo " ${_BLD}Duration$_RES" +echo " first block: $(date -d @$FIRST_TIMESTAMP)" +echo " last block: $(date -d @$LAST_TIMESTAMP)" +echo " total time: $_YEL$SPAN seconds$_RES" +echo " min block time: $_YEL$MIN_BLOCK_TIME seconds$_RES" +echo " max block time: $_YEL$MAX_BLOCK_TIME seconds$_RES" + +echo "" +echo " ${_BLD}Blocks$_RES" +echo " total block count: $_YEL$TOTAL_BLOCKS$_RES" +echo " avg block time: $_YEL$(awk "BEGIN{print $SPAN / $TOTAL_BLOCKS}") seconds$_RES" +echo "" +echo " ${_BLD}Transactions$_RES" +echo " total transactions: $_YEL$TOTAL_TX$_RES" +echo " avg tps: $_YEL$(awk "BEGIN{print $TOTAL_TX / $SPAN}") tx/s$_RES" diff --git a/scripts/run_test.sh b/scripts/run_test.sh new file mode 100755 index 00000000..d940f660 --- /dev/null +++ b/scripts/run_test.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +# check if index is set +if [ ! -f "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +curl -H "Content-Type: application/json" http://localhost:1234/api/v1/test/prepare -d "$(cat $1)" diff --git a/specs/test-4-validators.yaml b/specs/test-4-validators.yaml new file mode 100644 index 00000000..e6b43f81 --- /dev/null +++ b/specs/test-4-validators.yaml @@ -0,0 +1,47 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +# ADDITIONAL storage information to be run for this test +# this will run a second test with the same topology but different ledger +id: base +name: base-ledger + +generate: + path: ./tests/base + +--- +version: nodes.snarkos.testing.monadic.us/v1 + +name: 4-validators + +nodes: + # validator/test: + # replicas: 4 + # key: committee.$ + # height: 0 + # validators: [validator/*] + # peers: [] + validator/0: + key: APrivateKey1zkp8CZNn3yeCseEtxuVPbDCwSyhGW6yZKUYKfgXmcpoGPWH + height: 0 + validators: [validator/*] + peers: [] + storage: base + validator/1: + key: APrivateKey1zkp2RWGDcde3efb89rjhME1VYA8QMxcxep5DShNBR6n8Yjh + height: 0 + validators: [validator/*] + peers: [] + storage: base + validator/2: + key: APrivateKey1zkp2GUmKbVsuc1NSj28pa1WTQuZaK5f1DQJAT6vPcHyWokG + height: 0 + validators: [validator/*] + peers: [] + storage: base + validator/3: + key: APrivateKey1zkpBjpEgLo4arVUkQmcLdKQMiAKGaHAQVVwmF8HQby8vdYs + height: 0 + validators: [validator/*] + peers: [] + storage: base