Skip to content

Commit

Permalink
Merge pull request #73 from monadicus/feat-cannon
Browse files Browse the repository at this point in the history
cannon wip code, breaking rename changes
  • Loading branch information
voximity authored Mar 26, 2024
2 parents f2f3cc7 + 025762f commit e4c38c2
Show file tree
Hide file tree
Showing 25 changed files with 738 additions and 145 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/aot/src/ledger/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl LedgerQuery {
.route("/mainnet/block/hash/latest", get(Self::latest_hash))
.route("/mainnet/transaction/broadcast", post(Self::broadcast_tx))
.route("/block", post(Self::add_block))
// TODO: for ahead of time ledger generation, support a /beacon_block endpoint to write beacon block
// TODO: api to get and decrypt records for a private key
.with_state(Arc::new(state));

let listener = tokio::net::TcpListener::bind(SocketAddr::new(self.bind, self.port)).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/snot-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures-util.workspace = true
http.workspace = true
httpdate = "1.0.3"
local-ip-address = "0.6.1"
reqwest = { version = "0.12.0", features = ["stream"] }
reqwest = { workspace = true, features = ["stream", "json"] }
snot-common = { path = "../snot-common" }
tarpc.workspace = true
tokio.workspace = true
Expand Down
23 changes: 22 additions & 1 deletion crates/snot-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashSet, net::IpAddr, ops::Deref, process::Stdio, sync::A
use snot_common::{
rpc::{
agent::{
AgentMetric, AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError,
AgentError, AgentMetric, AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError,
},
control::{ControlServiceRequest, ControlServiceResponse},
MuxMessage,
Expand Down Expand Up @@ -369,6 +369,27 @@ impl AgentService for AgentRpcServer {
)
}

async fn get_state_root(self, _: context::Context) -> Result<String, AgentError> {
if !matches!(
self.state.agent_state.read().await.deref(),
AgentState::Node(_, _)
) {
return Err(AgentError::InvalidState);
}

let url = format!(
"http://127.0.0.1:{}/mainnet/latest/stateRoot",
self.state.cli.rest
);
let response = reqwest::get(&url)
.await
.map_err(|_| AgentError::FailedToMakeRequest)?;
response
.json()
.await
.map_err(|_| AgentError::FailedToParseJson)
}

async fn get_metric(self, _: context::Context, metric: AgentMetric) -> f64 {
let metrics = self.state.metrics.read().await;

Expand Down
11 changes: 11 additions & 0 deletions crates/snot-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub trait AgentService {
/// state.
async fn reconcile(to: AgentState) -> Result<(), ReconcileError>;

/// Get the state root from the running node
async fn get_state_root() -> Result<String, AgentError>;
async fn get_metric(metric: AgentMetric) -> f64;
}

Expand All @@ -35,6 +37,15 @@ pub enum ReconcileError {
Unknown,
}

#[derive(Debug, Error, Serialize, Deserialize)]
pub enum AgentError {
#[error("invalid agent state")]
InvalidState,
#[error("failed to parse json")]
FailedToParseJson,
#[error("failed to make a request")]
FailedToMakeRequest,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentMetric {
Tps,
Expand Down
3 changes: 2 additions & 1 deletion crates/snot-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub enum AgentState {
#[default]
// A node in the inventory can function as a transaction cannon
Inventory,
Node(StorageId, NodeState),
/// Test id mapping to node state
Node(usize, NodeState),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions crates/snot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy_static.workspace = true
rand.workspace = true
rand_chacha.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["stream"] }
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
Expand All @@ -32,4 +33,5 @@ tower-http.workspace = true
tracing-appender.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url = { workspace = true, features = ["serde"] }
wildmatch = "2.3.3"
199 changes: 199 additions & 0 deletions crates/snot/src/cannon/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
mod net;
pub mod router;
pub mod sink;
pub mod source;

use std::{
collections::HashSet,
sync::{atomic::AtomicU32, Arc, Weak},
};

use anyhow::{bail, Result};

use tokio::{
sync::{mpsc::UnboundedSender, Mutex as AsyncMutex},
task::AbortHandle,
};
use tracing::warn;

use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Environment};

use self::{sink::TxSink, source::TxSource};

/*
STEP ONE
cannon transaction source: (GEN OR PLAYBACK)
- AOT: storage file
- REALTIME: generate executions from available agents?? via rpc
STEP 2
cannon query source:
/cannon/<id>/mainnet/latest/stateRoot forwards to one of the following:
- REALTIME-(GEN|PLAYBACK): (test_id, node-key) with a rest ports Client/Validator only
- AOT-GEN: ledger service locally (file mode)
- AOT-PLAYBACK: n/a
STEP 3
cannon broadcast ALWAYS HITS control plane at
/cannon/<id>/mainnet/transaction/broadcast
cannon TX OUTPUT pointing at
- REALTIME: (test_id, node-key)
- AOT: file
cannon rate
cannon buffer size
burst mode??
*/

/// Transaction cannon state
/// using the `TxSource` and `TxSink` for configuration.
#[derive(Debug)]
pub struct CannonInstance {
// a copy of the global state
global_state: Arc<GlobalState>,

source: TxSource,
sink: TxSink,

/// The test_id/storage associated with this cannon.
/// To point at an external node, create a topology with external node
/// To generate ahead-of-time, upload a test with a timeline referencing a
/// cannon pointing at a file
env: Weak<Environment>,

/// Local query service port. Only present if the TxSource uses a local query source.
query_port: Option<u16>,

// TODO: run the actual cannon in this task
task: AsyncMutex<AbortHandle>,

/// channel to send transactions to the the task
tx_sender: UnboundedSender<String>,
fired_txs: AtomicU32,
}

impl CannonInstance {
/// Create a new active transaction cannon
/// with the given source and sink.
///
/// Locks the global state's tests and storage for reading.
pub async fn new(
global_state: Arc<GlobalState>,
source: TxSource,
sink: TxSink,
test_id: usize,
) -> Result<Self> {
// mapping with async is ugly and blocking_read is scary
let env = {
let Some(env) = global_state.envs.read().await.get(&test_id).cloned() else {
bail!("test {test_id} not found")
};

env
};
let env2 = env.clone();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sender = tx.clone();

let query_port = source.get_query_port()?;

let fired_txs = AtomicU32::new(0);

let handle = tokio::spawn(async move {
// TODO: write tx to sink at desired rate
let _tx = rx.recv().await;

// TODO: if a sink or a source uses node_keys or storage
// env will be used
println!("{}", env2.storage.id);

// compare the tx id to an authorization id
let _pending_txs = HashSet::<String>::new();

// TODO: if a local query service exists, spawn it here
// kill on drop

// TODO: determine the rate that transactions need to be created
// based on the sink

// TODO: if source is realtime, generate authorizations and
// send them to any available agent

std::future::pending::<()>().await
});

Ok(Self {
global_state,
source,
sink,
env: Arc::downgrade(&env),
tx_sender,
query_port,
task: AsyncMutex::new(handle.abort_handle()),
fired_txs,
})
}

/// Called by axum to forward /cannon/<id>/mainnet/latest/stateRoot
/// to the ledger query service's /mainnet/latest/stateRoot
pub async fn proxy_state_root(&self) -> Result<String> {
match &self.source {
TxSource::RealTime { query, .. } => match query {
LedgerQueryService::Local(qs) => {
if let Some(port) = self.query_port {
qs.get_state_root(port).await
} else {
bail!("cannon is missing a query port")
}
}
LedgerQueryService::Node(key) => {
let Some(env) = self.env.upgrade() else {
unreachable!("called from a place where env is present")
};

// env_id must be Some because LedgerQueryService::Node requires it
let Some(agent_id) = env.get_agent_by_key(key) else {
bail!("cannon target agent not found")
};

let Some(client) = self.global_state.get_client(agent_id).await else {
bail!("cannon target agent is offline")
};

// call client's rpc method to get the state root
// this will fail if the client is not running a node
client.get_state_root().await
}
},
TxSource::Playback { .. } => {
bail!("cannon is configured to playback from file.")
}
}
}

/// Called by axum to forward /cannon/<id>/mainnet/transaction/broadcast
/// to the desired sink
pub fn proxy_broadcast(&self, body: String) -> Result<()> {
match &self.source {
TxSource::RealTime { .. } => {
self.tx_sender.send(body)?;
}
TxSource::Playback { .. } => {
warn!("cannon received broadcasted transaction in playback mode. ignoring.")
}
}
Ok(())
}
}

impl Drop for CannonInstance {
fn drop(&mut self) {
// cancel the task on drop
self.task.blocking_lock().abort();
}
}
7 changes: 7 additions & 0 deletions crates/snot/src/cannon/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener};

/// Get an available port on the local machine.
pub fn get_available_port() -> Option<u16> {
let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
Some(TcpListener::bind(addr).ok()?.local_addr().ok()?.port())
}
Loading

0 comments on commit e4c38c2

Please sign in to comment.