Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent address resolution and various fixes for functional topology deploy #72

Merged
merged 15 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ resolver = "2"

[workspace.dependencies]
aleo-std = "=0.1.24"
axum = "0.7.4"
anyhow = "1"
bimap = "0.6"
bincode = "1.3"
Expand All @@ -32,6 +33,7 @@ serde_yaml = "0.9"
tarpc = { version = "0.34", features = ["tokio1", "serde1"] }
thiserror = "1.0"
tokio = { version = "1", features = ["full", "macros"] }
tower-http = { version = "0.5.2", features = ["fs", "trace"] }
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/aot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ node = ["snarkos-node", "crossterm"]
[dependencies]
aleo-std.workspace = true
anyhow.workspace = true
axum.workspace = true
clap.workspace = true
colored.workspace = true
indexmap.workspace = true
Expand All @@ -30,6 +31,7 @@ snarkos-node = { workspace = true, optional = true }
snarkvm.workspace = true
snot-common.workspace = true
tokio.workspace = true
tower-http.workspace = true
tracing-flame = "0.2.0"
tracing-appender.workspace = true
tracing-subscriber.workspace = true
Expand Down
40 changes: 37 additions & 3 deletions crates/aot/src/authorized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use snarkvm::{
de, Deserialize, DeserializeExt, Deserializer, Serialize, SerializeStruct, Serializer,
},
};
use tracing::error;

use crate::{
credits::PROCESS, Aleo, Authorization, DbLedger, MemVM, Network, PrivateKey, Transaction, Value,
Expand All @@ -36,12 +37,45 @@ pub enum ExecutionMode<'a> {
pub struct Execute {
pub authorization: Authorized,
#[arg(short, long)]
pub query: Option<String>,
pub query: String,
}

impl Execute {
pub fn parse(self) -> Result<()> {
Ok(())
let broadcast = self.authorization.broadcast;
// execute the transaction
let tx = self.authorization.execute_local(
None,
&mut rand::thread_rng(),
Some(self.query.to_owned()),
)?;

if !broadcast {
println!("{}", serde_json::to_string(&tx)?);
return Ok(());
}

// Broadcast the transaction.
tracing::info!("broadcasting transaction...");
tracing::debug!("{}", serde_json::to_string(&tx)?);
let response = reqwest::blocking::Client::new()
.post(format!("{}/mainnet/transaction/broadcast", self.query))
.header("Content-Type", "application/json")
.json(&tx)
.send()?;

// Ensure the response is successful.
if response.status().is_success() {
// Return the transaction.
println!("{}", response.text()?);
Ok(())
// Return the error.
} else {
let status = response.status();
let err = response.text()?;
error!("broadcast failed with code {}: {}", status, err);
bail!(err)
}
}
}

Expand Down Expand Up @@ -100,7 +134,7 @@ impl Authorized {
let response = reqwest::blocking::Client::new()
.post(format!("{api_url}/execute"))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&self)?)
.json(&self)
.send()?;

// Ensure the response is successful.
Expand Down
10 changes: 3 additions & 7 deletions crates/aot/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Cli {

// Add layer redirecting logs to the file
layers.push(
tracing_subscriber::fmt::Layer::default()
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(non_blocking)
.with_filter(filter2)
Expand All @@ -148,7 +148,7 @@ impl Cli {
guards.push(g);

layers.push(
tracing_subscriber::fmt::Layer::default()
tracing_subscriber::fmt::layer()
.with_ansi(io::stdout().is_tty())
.with_writer(stdout)
.with_filter(filter)
Expand All @@ -157,11 +157,7 @@ impl Cli {
} else {
let (stderr, g) = tracing_appender::non_blocking(io::stderr());
guards.push(g);
layers.push(
tracing_subscriber::fmt::Layer::default()
.with_writer(stderr)
.boxed(),
);
layers.push(tracing_subscriber::fmt::layer().with_writer(stderr).boxed());
};

let subscriber = tracing_subscriber::registry::Registry::default().with(layers);
Expand Down
19 changes: 15 additions & 4 deletions crates/aot/src/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{authorized::Execute, Address, PrivateKey};
pub mod add;
pub mod distribute;
pub mod init;
pub mod query;
pub mod truncate;
pub mod tx;
pub mod util;
Expand Down Expand Up @@ -41,10 +42,14 @@ macro_rules! comma_separated {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(s.split(',')
.map(|i| <$item>::from_str(i))
.collect::<Result<Vec<_>, <$item as FromStr>::Err>>()
.map_err(anyhow::Error::from)?))
if s.is_empty() {
return Ok(Self(Vec::new()));
}

Ok(Self(s.split(',')
.map(|i| <$item>::from_str(i))
.collect::<Result<Vec<_>, <$item as FromStr>::Err>>()
.map_err(anyhow::Error::from)?))
}
}

Expand Down Expand Up @@ -92,6 +97,7 @@ pub enum Commands {
Distribute(distribute::Distribute),
Truncate(truncate::Truncate),
Execute(Execute),
Query(query::LedgerQuery),
}

impl Ledger {
Expand Down Expand Up @@ -141,6 +147,11 @@ impl Ledger {
println!("{}", serde_json::to_string(&tx)?);
Ok(())
}

Commands::Query(query) => {
let ledger = util::open_ledger(genesis, ledger)?;
query.parse(&ledger)
}
}
}
}
146 changes: 146 additions & 0 deletions crates/aot/src/ledger/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use std::{fs::File, io::Write, ops::Deref, path::PathBuf, sync::Arc};

use anyhow::Result;
use axum::{
extract::{self, State},
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use clap::Args;
use reqwest::StatusCode;
use serde_json::json;
use tracing_appender::non_blocking::NonBlocking;

use crate::{Block, DbLedger, Transaction};

#[derive(Debug, Args, Clone)]
/// Receive inquiries on /mainnet/latest/stateRoot
pub struct LedgerQuery {
#[arg(long, default_value = "3030")]
/// Port to listen on for incoming messages
pub port: u16,

#[arg(long)]
/// When true, the POST /block endpoint will not be available
pub readonly: bool,

#[arg(long)]
/// Receive messages from /mainnet/transaction/broadcast and record them to the output
pub record: bool,

#[arg(long, short, default_value = "transactions.json")]
/// Path to the directory containing the stored data
pub output: PathBuf,
}

struct LedgerState {
readonly: bool,
ledger: DbLedger,
appender: Option<NonBlocking>,
}

type AppState = Arc<LedgerState>;

impl LedgerQuery {
#[tokio::main]
pub async fn parse(self, ledger: &DbLedger) -> Result<()> {
let (appender, _guard) = if self.record {
let (appender, guard) = tracing_appender::non_blocking(
File::options()
.create(true)
.append(true)
.open(self.output.clone())
.expect("Failed to open the file for writing transactions"),
);
(Some(appender), Some(guard))
} else {
(None, None)
};

let state = LedgerState {
readonly: self.readonly,
ledger: ledger.clone(),
appender,
};

let app = Router::new()
.route("/mainnet/latest/stateRoot", get(Self::latest_state_root))
.route("/mainnet/block/height/latest", get(Self::latest_height))
.route("/mainnet/block/hash/latest", get(Self::latest_hash))
.route("/mainnet/transaction/broadcast", post(Self::broadcast_tx))
.route("/block", post(Self::add_block))
.with_state(Arc::new(state));

let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port)).await?;
tracing::info!("listening on: {:?}", listener.local_addr().unwrap());
axum::serve(listener, app).await?;

Ok(())
}

async fn latest_state_root(state: State<AppState>) -> impl IntoResponse {
Json(json!(state.ledger.latest_state_root()))
}

async fn latest_height(state: State<AppState>) -> impl IntoResponse {
Json(json!(state.ledger.latest_height()))
}

async fn latest_hash(state: State<AppState>) -> impl IntoResponse {
Json(json!(state.ledger.latest_hash()))
}

async fn broadcast_tx(
state: State<AppState>,
payload: extract::Json<Transaction>,
) -> impl IntoResponse {
let Ok(tx_json) = serde_json::to_string(payload.deref()) else {
return StatusCode::BAD_REQUEST;
};

if let Some(mut a) = state.appender.clone() {
match write!(a, "{}", tx_json) {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
} else {
println!("{}", tx_json);
StatusCode::OK
}
}

async fn add_block(state: State<AppState>, payload: extract::Json<Block>) -> impl IntoResponse {
if state.readonly {
return (StatusCode::FORBIDDEN, Json(json!({"error": "readonly"})));
}

if state.ledger.latest_hash() != payload.previous_hash()
|| state.ledger.latest_state_root() != payload.previous_state_root()
|| state.ledger.latest_height() + 1 != payload.height()
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "invalid block"})),
);
}

if let Err(e) = state
.ledger
.check_next_block(&payload, &mut rand::thread_rng())
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("failed to validate block: {e}")})),
);
}

match state.ledger.advance_to_next_block(&payload) {
Ok(_) => (StatusCode::OK, Json(json!({"status": "ok"}))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("failed to advance block: {e}")})),
),
}
}
}
4 changes: 2 additions & 2 deletions crates/aot/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct Runner {
pub rest: u16,

/// Specify the IP address and port of the peer(s) to connect to
#[clap(long = "peers")]
#[clap(long = "peers", default_value = "")]
pub peers: Addrs,
/// Specify the IP address and port of the validator(s) to connect to
#[clap(long = "validators")]
#[clap(long = "validators", default_value = "")]
pub validators: Addrs,
/// Specify the requests per second (RPS) rate limit per IP for the REST
/// server
Expand Down
1 change: 1 addition & 0 deletions crates/snot-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ snot-common = { path = "../snot-common" }
tarpc.workspace = true
tokio.workspace = true
tokio-tungstenite = "0.21.0"
tracing-appender.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Loading