Skip to content

Commit

Permalink
feat(cannon): functional agent source, file sink cannon
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Mar 29, 2024
1 parent 09f0b58 commit 111bda1
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 89 deletions.
17 changes: 12 additions & 5 deletions crates/snot-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,20 +449,27 @@ impl AgentService for AgentRpcServer {
info!("executing authorization...");
// TODO: ensure binary associated with this env_id is present

let res = Command::new(self.state.cli.path.join(SNARKOS_FILE))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
let res = Command::new(dbg!(self.state.cli.path.join(SNARKOS_FILE)))
.stdout(std::io::stdout())
.stderr(std::io::stderr())
.arg("execute")
.arg("--query")
.arg(&format!("http://{}{query}", self.state.endpoint))
.arg(auth)
.spawn()
.map_err(|_| AgentError::FailedToSpawnProcess)?
.map_err(|e| {
warn!("failed to spawn auth exec process: {e}");
AgentError::FailedToSpawnProcess
})?
.wait()
.await
.map_err(|_| AgentError::ProcessFailed)?;
.map_err(|e| {
warn!("auth exec process failed: {e}");
AgentError::ProcessFailed
})?;

if !res.success() {
warn!("auth exec process exited with status: {res}");
return Err(AgentError::ProcessFailed);
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/snot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ lazy_static.workspace = true
rand.workspace = true
rand_chacha.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["stream"] }
reqwest = { workspace = true, features = ["stream", "json"] }
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
Expand Down
13 changes: 9 additions & 4 deletions crates/snot/src/cannon/authorized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ impl Authorize {
command
.stdout(std::io::stdout())
.stderr(std::io::stderr())
.arg("aot")
.arg("authorize");

match self {
Expand All @@ -46,13 +45,19 @@ impl Authorize {

let res = command.output().await?;

if !res.status.success() {
return Err(anyhow::anyhow!(
"command failed with status {}: {}",
res.status,
String::from_utf8_lossy(&res.stderr)
));
}

let blob: serde_json::Value = serde_json::from_slice(&res.stdout)?;

ensure!(blob.is_object(), "expected JSON object in response");
ensure!(
blob.get("function").is_some()
&& blob.get("fee").is_some()
&& blob.get("broadcast").is_some(),
blob.get("function").is_some() && blob.get("broadcast").is_some(),
"expected function, fee, and broadcast fields in response"
);

Expand Down
1 change: 1 addition & 0 deletions crates/snot/src/cannon/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl TransactionSink {
let writer = lock.as_mut().unwrap();
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
writer.flush()?;
Ok(())
}
}
86 changes: 41 additions & 45 deletions crates/snot/src/cannon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,19 @@ pub mod source;

use std::{
process::Stdio,
sync::{atomic::AtomicUsize, Arc, Mutex, Weak},
sync::{atomic::AtomicUsize, Arc, OnceLock, Weak},
};

use anyhow::{bail, ensure, Result};
use futures_util::{stream::FuturesUnordered, StreamExt};
use serde_json::json;
use snot_common::state::{AgentPeer, AgentState};
use snot_common::state::AgentPeer;
use tokio::{
process::Command,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
OnceCell,
},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::AbortHandle,
};
use tracing::{debug, warn};
use tracing::{debug, trace, warn};

use self::{sink::TxSink, source::TxSource};
use crate::{
Expand Down Expand Up @@ -82,7 +79,7 @@ pub struct CannonInstance {
query_port: Option<u16>,

// TODO: run the actual cannon in this task
pub task: Mutex<Option<AbortHandle>>,
pub task: Option<AbortHandle>,

/// Child process must exist for the duration of the cannon instance.
/// This value is never used
Expand All @@ -96,20 +93,20 @@ pub struct CannonInstance {
tx_count: usize,
}

#[tokio::main]
async fn get_external_ip() -> Option<String> {
let sources: external_ip::Sources = external_ip::get_http_sources();
let consensus = external_ip::ConsensusBuilder::new()
.add_sources(sources)
.build();
consensus.get_consensus().await.map(|s| s.to_string())
}

async fn get_host(state: &GlobalState) -> Option<String> {
static ONCE: OnceCell<Option<String>> = OnceCell::const_new();
static ONCE: OnceLock<Option<String>> = OnceLock::new();
match state.cli.hostname.as_ref() {
Some(host) => Some(host.to_owned()),
None => ONCE
.get_or_init(|| async {
let sources: external_ip::Sources = external_ip::get_http_sources();
let consensus = external_ip::ConsensusBuilder::new()
.add_sources(sources)
.build();
consensus.get_consensus().await.map(|a| a.to_string())
})
.await
.to_owned(),
None => ONCE.get_or_init(get_external_ip).to_owned(),
}
}

Expand Down Expand Up @@ -165,7 +162,7 @@ impl CannonInstance {
tx_receiver: Some(tx_receiver),
query_port,
child,
task: Mutex::new(None),
task: None,
fired_txs,
tx_count: count,
})
Expand All @@ -192,12 +189,8 @@ impl CannonInstance {
pub async fn spawn_local(&mut self) -> Result<()> {
let ctx = self.ctx()?;

let handle = tokio::task::spawn_local(async move { ctx.spawn().await });
self.task
.lock()
.as_mut()
.unwrap()
.replace(handle.abort_handle());
let handle = tokio::task::spawn(async move { ctx.spawn().await });
self.task = Some(handle.abort_handle());

Ok(())
}
Expand Down Expand Up @@ -262,10 +255,9 @@ impl CannonInstance {
impl Drop for CannonInstance {
fn drop(&mut self) {
// cancel the task on drop
if let Ok(lock) = self.task.lock() {
if let Some(handle) = lock.as_ref() {
handle.abort();
}
debug!("dropping cannon {}", self.id);
if let Some(handle) = self.task.take() {
handle.abort();
}
}
}
Expand Down Expand Up @@ -299,11 +291,12 @@ impl ExecutionContext {
mut rx,
} = self;

debug!("spawning cannon {cannon_id}");

let Some(env) = env2.upgrade() else {
bail!("env dropped")
};
let env_id = env.id;

trace!("cannon {env_id}/{cannon_id} spawned");

// when in playback mode, ensure the drain exists
let (drain_pipe, query_path) = match &source {
Expand All @@ -325,15 +318,15 @@ impl ExecutionContext {
format!("http://{host}:{}{suffix}", state.cli.port)
}
};
debug!("using realtime query {query}");
trace!("cannon {cannon_id}/{env_id} using realtime query {query}");
(None, Some(query))
}
};

let sink_pipe = match &sink {
TxSink::Record { file_name: name } => {
let pipe = env.tx_pipe.sinks.get(name).cloned();
ensure!(pipe.is_some(), "transaction sink not found: {name}");
TxSink::Record { file_name, .. } => {
let pipe = env.tx_pipe.sinks.get(file_name).cloned();
ensure!(pipe.is_some(), "transaction sink not found: {file_name}");
pipe
}
_ => None,
Expand All @@ -359,14 +352,14 @@ impl ExecutionContext {
.upgrade()
.ok_or_else(|| anyhow::anyhow!("env dropped"))?;

debug!("generating authorization...");
trace!("cannon {cannon_id}/{env_id} generating authorization...");
let auth = source.get_auth(&env)?.run(&env.aot_bin).await?;
match compute {
ComputeTarget::Agent => {
// find a client, mark it as busy
let Some((client, _busy)) =
state.pool.read().await.values().find_map(|a| {
if !a.is_busy() && matches!(a.state(), AgentState::Inventory) {
if !a.is_busy() && a.is_inventory() {
a.client_owned().map(|c| (c, a.make_busy()))
} else {
None
Expand All @@ -375,7 +368,6 @@ impl ExecutionContext {
else {
bail!("no agents available to execute authorization")
};
debug!("firing auth at agent");

// execute the authorization
client
Expand All @@ -384,9 +376,10 @@ impl ExecutionContext {
.ok_or_else(|| anyhow::anyhow!("env dropped"))?
.id,
query_path.unwrap(),
serde_json::from_value(auth)?,
serde_json::to_string(&auth)?,
)
.await?;

Ok(())
}
ComputeTarget::Demox { url } => {
Expand Down Expand Up @@ -418,19 +411,16 @@ impl ExecutionContext {
tokio::select! {
Some(res) = container.next() => {
if let Err(e) = res {
timer.undo();
warn!("transaction gen failed: {e}");
}
},
_ = timer.next() => {
debug!("queue new transaction");
// queue a new transaction
container.push(gen_tx.clone()());
}
Some(tx) = rx.recv() => {
let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if fired_count >= tx_count {
break;
}
let fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
match &sink {
TxSink::Record { .. } => {
debug!("writing tx to file");
Expand Down Expand Up @@ -469,6 +459,12 @@ impl ExecutionContext {
}
}
}

if fired_count >= tx_count {
debug!("finished firing txs");
break;
}
debug!("fired {fired_count}/{tx_count} txs");
}
}
}
Expand Down
50 changes: 40 additions & 10 deletions crates/snot/src/cannon/router.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use axum::{
extract::{Path, State},
response::{IntoResponse, Response},
Expand All @@ -23,23 +25,51 @@ async fn state_root(
let env = state.envs.read().await;
env.get(&env_id).cloned()
}) else {
return StatusCode::NOT_FOUND.into_response();
return (
StatusCode::NOT_FOUND,
Json(json!({ "error": "environment not found" })),
)
.into_response();
};

let cannon_lock = env.cannons.read().await;
let Some(cannon) = cannon_lock.get(&cannon_id) else {
return StatusCode::NOT_FOUND.into_response();
return (
StatusCode::NOT_FOUND,
Json(json!({ "error": "cannon not found" })),
)
.into_response();
};

match cannon.proxy_state_root().await {
// the nodes expect this state root to be string escaped json
Ok(root) => Json(json!(root)).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("{e}")})),
)
.into_response(),
// TODO: lock this with a mutex or something so that multiple route callers can't bombard the cannon with proxy_state_root call attempts
let mut attempts = 0;
loop {
attempts += 1;
match cannon.proxy_state_root().await {
Ok(root) => break Json(root).into_response(),

Err(e) if attempts > 5 => {
break (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "non-responsive query node", "inner": format!("{e}") })),
)
.into_response()
}

_ => attempts += 1,
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

// match cannon.proxy_state_root().await {
// // the nodes expect this state root to be string escaped json
// Ok(root) => Json(root).into_response(),
// Err(e) => (
// StatusCode::INTERNAL_SERVER_ERROR,
// Json(json!({ "error": format!("{e}")})),
// )
// .into_response(),
// }
}

async fn transaction(
Expand Down
Loading

0 comments on commit 111bda1

Please sign in to comment.