Skip to content

Commit

Permalink
refactor(agent): convert agent ids into interned strings
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Mar 30, 2024
1 parent 029c2de commit 6b4b497
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 54 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ http = "1.1"
indexmap = { version = "2.2", features = ["serde"] }
indicatif = { version = "0.17", features = ["rayon"] }
lazy_static = "1.4"
lasso = { version = "0.7.2", features = ["multi-threaded", "serialize"] }
rand = "0.8"
rand_chacha = "0.3"
rayon = "1"
Expand Down
27 changes: 12 additions & 15 deletions crates/snot-agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use clap::Parser;
use snot_common::state::{AgentId, ModeConfig, PortConfig};

pub const ENV_ENDPOINT: &str = "SNOT_ENDPOINT";
pub const ENV_ENDPOINT_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
Expand All @@ -16,29 +17,25 @@ pub struct Cli {
/// Control plane endpoint address
pub endpoint: Option<SocketAddr>,

#[arg(long, default_value = "./snot-data")]
#[arg(long)]
pub id: Option<AgentId>,

/// Path to the directory containing the stored data and configuration
#[arg(long, default_value = "./snot-data")]
pub path: PathBuf,

#[arg(long, default_value_t = false)]
/// Enable the agent to fetch its external address. Necessary to determine
/// which agents are on shared networks, and for
/// external-to-external connections
#[arg(long)]
pub external: bool,

#[clap(long = "bind", default_value_t = IpAddr::V4(Ipv4Addr::UNSPECIFIED))]
pub bind_addr: IpAddr,
/// Specify the IP address and port for the node server
#[clap(long = "node", default_value_t = 4130)]
pub node: u16,
/// Specify the IP address and port for the BFT
#[clap(long = "bft", default_value = "5000")]
pub bft: u16,
/// Specify the IP address and port for the REST server
#[clap(long = "rest", default_value = "3030")]
pub rest: u16,
/// Specify the port for the metrics
#[clap(long = "metrics", default_value_t = 9000)]
pub metrics: u16,
// TODO: specify allowed modes (--validator --client --tx-gen)

#[clap(flatten)]
pub ports: PortConfig,

#[clap(flatten)]
pub modes: ModeConfig,
}
2 changes: 1 addition & 1 deletion crates/snot-agent/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn init(state: Arc<GlobalState>) {
let response = match client
.get(format!(
"http://{}/",
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), state.cli.metrics,)
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), state.cli.ports.metrics)
))
.send()
.await
Expand Down
18 changes: 7 additions & 11 deletions crates/snot-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ impl AgentService for AgentRpcServer {
.arg("--bind")
.arg(state.cli.bind_addr.to_string())
.arg("--bft")
.arg(state.cli.bft.to_string())
.arg(state.cli.ports.bft.to_string())
.arg("--rest")
.arg(state.cli.rest.to_string())
.arg(state.cli.ports.rest.to_string())
.arg("--metrics")
.arg(state.cli.metrics.to_string())
.arg(state.cli.ports.metrics.to_string())
.arg("--node")
.arg(state.cli.node.to_string());
.arg(state.cli.ports.node.to_string());

if let Some(pk) = node.private_key {
command.arg("--private-key").arg(pk);
Expand Down Expand Up @@ -374,11 +374,7 @@ impl AgentService for AgentRpcServer {

async fn get_addrs(self, _: context::Context) -> (PortConfig, Option<IpAddr>, Vec<IpAddr>) {
(
PortConfig {
bft: self.state.cli.bft,
node: self.state.cli.node,
rest: self.state.cli.rest,
},
self.state.cli.ports.clone(),
self.state.external_addr,
self.state.internal_addrs.clone(),
)
Expand All @@ -394,7 +390,7 @@ impl AgentService for AgentRpcServer {

let url = format!(
"http://127.0.0.1:{}/mainnet/latest/stateRoot",
self.state.cli.rest
self.state.cli.ports.rest
);
let response = reqwest::get(&url)
.await
Expand All @@ -415,7 +411,7 @@ impl AgentService for AgentRpcServer {

let url = format!(
"http://127.0.0.1:{}/mainnet/transaction/broadcast",
self.state.cli.rest
self.state.cli.ports.rest
);
let response = reqwest::Client::new()
.post(url)
Expand Down
2 changes: 2 additions & 0 deletions crates/snot-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ version = "0.1.0"
edition = "2021"

[dependencies]
clap.workspace = true
futures.workspace = true
lasso.workspace = true
lazy_static.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/snot-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ pub mod prelude {
pub use crate::rpc::*;
pub use crate::state::*;
}

lazy_static::lazy_static! {
pub static ref INTERN: lasso::ThreadedRodeo = lasso::ThreadedRodeo::default();
}
93 changes: 89 additions & 4 deletions crates/snot-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@ use std::{
fmt::{Display, Write},
net::SocketAddr,
str::FromStr,
sync::atomic::{AtomicUsize, Ordering},
};

use clap::Parser;
use lasso::Spur;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{de::Error, Deserialize, Serialize};

pub type AgentId = usize;
use crate::INTERN;

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct AgentId(Spur);

pub type StorageId = usize;
pub type EnvId = usize;

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub enum AgentState {
#[default]
// A node in the inventory can function as a transaction cannon
Inventory,
/// Test id mapping to node state
Node(usize, NodeState),
Node(EnvId, NodeState),
}

impl AgentState {
Expand All @@ -43,11 +51,42 @@ pub struct NodeState {
pub validators: Vec<AgentPeer>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Parser)]
pub struct PortConfig {
pub bft: u16,
/// Specify the IP address and port for the node server
#[clap(long = "node", default_value_t = 4130)]
pub node: u16,

/// Specify the IP address and port for the BFT
#[clap(long = "bft", default_value_t = 5000)]
pub bft: u16,

/// Specify the IP address and port for the REST server
#[clap(long = "rest", default_value_t = 3030)]
pub rest: u16,

/// Specify the port for the metrics
#[clap(long = "metrics", default_value_t = 9000)]
pub metrics: u16,
}

#[derive(Debug, Serialize, Deserialize, Parser)]
pub struct ModeConfig {
/// Enable running a validator node
#[arg(long)]
pub validator: bool,

/// Enable running a prover node
#[arg(long)]
pub prover: bool,

/// Enable running a client node
#[arg(long)]
pub client: bool,

/// Enable functioning as a compute target when inventoried
#[arg(long)]
pub compute: bool,
}

impl Display for PortConfig {
Expand Down Expand Up @@ -148,6 +187,7 @@ lazy_static! {
r"^(?P<ty>client|validator|prover)\/(?P<id>[A-Za-z0-9\-]+)(?:@(?P<ns>[A-Za-z0-9\-]+))?$"
)
.unwrap();
static ref AGENT_ID_REGEX: Regex = Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\-_.]{0,63}$").unwrap();
}

impl FromStr for NodeKey {
Expand Down Expand Up @@ -208,3 +248,48 @@ impl Serialize for NodeKey {
serializer.serialize_str(&self.to_string())
}
}

impl Default for AgentId {
fn default() -> Self {
static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
let id = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
Self(INTERN.get_or_intern(format!("agent-{}", id)))
}
}

impl FromStr for AgentId {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if !AGENT_ID_REGEX.is_match(s) {
return Err("invalid agent id: expected pattern [A-Za-z0-9][A-Za-z0-9\\-_.]{{,63}}");
}

Ok(AgentId(INTERN.get_or_intern(s)))
}
}

impl<'de> Deserialize<'de> for AgentId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <&str>::deserialize(deserializer)?;
Self::from_str(s).map_err(D::Error::custom)
}
}

impl Display for AgentId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", INTERN.resolve(&self.0))
}
}

impl Serialize for AgentId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
4 changes: 2 additions & 2 deletions crates/snot/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl Environment {
pub fn matching_nodes<'a>(
&'a self,
targets: &'a NodeTargets,
pool: &'a HashMap<usize, Agent>,
pool: &'a HashMap<AgentId, Agent>,
port_type: PortType,
) -> impl Iterator<Item = AgentPeer> + 'a {
self.node_map
Expand Down Expand Up @@ -391,7 +391,7 @@ impl Environment {
pub fn matching_agents<'a>(
&'a self,
targets: &'a NodeTargets,
pool: &'a HashMap<usize, Agent>,
pool: &'a HashMap<AgentId, Agent>,
) -> impl Iterator<Item = &'a Agent> + 'a {
self.matching_nodes(targets, pool, PortType::Node) // ignore node type
.filter_map(|agent_peer| match agent_peer {
Expand Down
6 changes: 3 additions & 3 deletions crates/snot/src/env/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use anyhow::bail;
use futures_util::future::join_all;
use snot_common::state::AgentState;
use snot_common::state::{AgentId, AgentState};
use thiserror::Error;
use tokio::{select, sync::RwLock, task::JoinError};
use tracing::{debug, info, warn};
Expand All @@ -18,7 +18,7 @@ use crate::{
CannonInstance,
},
schema::timeline::{Action, ActionInstance, EventDuration},
state::{Agent, AgentClient, AgentId, GlobalState},
state::{Agent, AgentClient, GlobalState},
};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -156,7 +156,7 @@ impl Environment {
let mut reconcile_async = false;

// the pending reconciliations
let mut pending_reconciliations: HashMap<usize, PendingAgentReconcile> =
let mut pending_reconciliations: HashMap<AgentId, PendingAgentReconcile> =
HashMap::new();

macro_rules! set_node_field {
Expand Down
4 changes: 2 additions & 2 deletions crates/snot/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::{
};
use serde::Deserialize;
use serde_json::json;
use snot_common::rpc::agent::AgentMetric;
use snot_common::{rpc::agent::AgentMetric, state::AgentId};

use super::AppState;
use crate::cannon::router::redirect_cannon_routes;
Expand Down Expand Up @@ -53,7 +53,7 @@ async fn get_agents(state: State<AppState>) -> impl IntoResponse {
Json(json!({ "count": state.pool.read().await.len() }))
}

async fn get_agent_tps(state: State<AppState>, Path(id): Path<usize>) -> Response {
async fn get_agent_tps(state: State<AppState>, Path(id): Path<AgentId>) -> Response {
let pool = state.pool.read().await;
let Some(agent) = pool.get(&id) else {
return StatusCode::NOT_FOUND.into_response();
Expand Down
3 changes: 1 addition & 2 deletions crates/snot/src/server/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use serde::{Deserialize, Serialize};
use sha2::Sha256;

use crate::state::AgentId;
use snot_common::state::AgentId;

lazy_static! {
pub static ref JWT_NONCE: u16 = ChaChaRng::from_entropy().gen();
Expand Down
2 changes: 1 addition & 1 deletion crates/snot/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async fn handle_socket(mut socket: WebSocket, headers: HeaderMap, state: AppStat
let client =
AgentServiceClient::new(tarpc::client::Config::default(), client_transport).spawn();

let id: usize = 'insertion: {
let id: AgentId = 'insertion: {
let client = client.clone();
let mut pool = state.pool.write().await;

Expand Down
Loading

0 comments on commit 6b4b497

Please sign in to comment.