Skip to content

Commit

Permalink
feat(agent): add support for label and mode flags on client
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Mar 30, 2024
1 parent 6b4b497 commit b9733dc
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 44 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ http = "1.1"
indexmap = { version = "2.2", features = ["serde"] }
indicatif = { version = "0.17", features = ["rayon"] }
lazy_static = "1.4"
lasso = { version = "0.7.2", features = ["multi-threaded", "serialize"] }
lasso = { version = "0.7.2", features = ["multi-threaded"] }
rand = "0.8"
rand_chacha = "0.3"
rayon = "1"
Expand Down
44 changes: 42 additions & 2 deletions crates/snot-agent/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
env,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
};

use clap::Parser;
use snot_common::state::{AgentId, ModeConfig, PortConfig};
use http::Uri;
use snot_common::state::{AgentId, AgentMode, PortConfig};

pub const ENV_ENDPOINT: &str = "SNOT_ENDPOINT";
pub const ENV_ENDPOINT_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
Expand All @@ -20,6 +22,9 @@ pub struct Cli {
#[arg(long)]
pub id: Option<AgentId>,

#[arg(long, value_delimiter = ',', num_args = 1..)]
pub labels: Option<Vec<String>>,

/// Path to the directory containing the stored data and configuration
#[arg(long, default_value = "./snot-data")]
pub path: PathBuf,
Expand All @@ -37,5 +42,40 @@ pub struct Cli {
pub ports: PortConfig,

#[clap(flatten)]
pub modes: ModeConfig,
pub modes: AgentMode,
}

impl Cli {
pub fn endpoint_and_uri(&self) -> (SocketAddr, Uri) {
// get the endpoint
let endpoint = self
.endpoint
.or_else(|| {
env::var(ENV_ENDPOINT)
.ok()
.and_then(|s| s.as_str().parse().ok())
})
.unwrap_or(ENV_ENDPOINT_DEFAULT);

let mut query = format!("/agent?mode={}", u8::from(self.modes));

// add ?id=
if let Some(id) = self.id {
query.push_str(&format!("&id={}", id));
}

// add ?labels= or &labels= if id is present
if let Some(labels) = &self.labels {
query.push_str(&format!("&labels={}", labels.join(",")));
}

let ws_uri = Uri::builder()
.scheme("ws")
.authority(endpoint.to_string())
.path_and_query(query)
.build()
.unwrap();

(endpoint, ws_uri)
}
}
21 changes: 3 additions & 18 deletions crates/snot-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ mod rpc;
mod state;

use std::{
env,
os::unix::fs::PermissionsExt,
path::Path,
sync::{Arc, Mutex},
time::Duration,
};

use clap::Parser;
use cli::{Cli, ENV_ENDPOINT, ENV_ENDPOINT_DEFAULT};
use cli::Cli;
use futures::{executor::block_on, SinkExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
use http::HeaderValue;
Expand All @@ -27,7 +26,7 @@ use tokio::{
};
use tokio_tungstenite::{
connect_async,
tungstenite::{self, client::IntoClientRequest, http::Uri},
tungstenite::{self, client::IntoClientRequest},
};
use tracing::{error, info, level_filters::LevelFilter, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -89,21 +88,7 @@ async fn main() {
}

// get the endpoint
let endpoint = args
.endpoint
.or_else(|| {
env::var(ENV_ENDPOINT)
.ok()
.and_then(|s| s.as_str().parse().ok())
})
.unwrap_or(ENV_ENDPOINT_DEFAULT);

let ws_uri = Uri::builder()
.scheme("ws")
.authority(endpoint.to_string())
.path_and_query("/agent")
.build()
.unwrap();
let (endpoint, ws_uri) = args.endpoint_and_uri();

// create the data directory
tokio::fs::create_dir_all(&args.path)
Expand Down
1 change: 1 addition & 0 deletions crates/snot-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod rpc;
pub mod state;
pub use lasso;

pub mod prelude {
pub use crate::rpc::*;
Expand Down
65 changes: 57 additions & 8 deletions crates/snot-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,18 @@ pub struct PortConfig {
pub metrics: u16,
}

#[derive(Debug, Serialize, Deserialize, Parser)]
pub struct ModeConfig {
impl Display for PortConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"bft: {}, node: {}, rest: {}",
self.bft, self.node, self.rest
)
}
}

#[derive(Clone, Copy, Debug, Serialize, Deserialize, Parser)]
pub struct AgentMode {
/// Enable running a validator node
#[arg(long)]
pub validator: bool,
Expand All @@ -89,13 +99,52 @@ pub struct ModeConfig {
pub compute: bool,
}

impl Display for PortConfig {
impl From<AgentMode> for u8 {
fn from(mode: AgentMode) -> u8 {
(mode.validator as u8)
| (mode.prover as u8) << 1
| (mode.client as u8) << 2
| (mode.compute as u8) << 3
}
}

impl From<u8> for AgentMode {
fn from(mode: u8) -> Self {
Self {
validator: mode & 1 != 0,
prover: mode & 1 << 1 != 0,
client: mode & 1 << 2 != 0,
compute: mode & 1 << 3 != 0,
}
}
}

impl Display for AgentMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"bft: {}, node: {}, rest: {}",
self.bft, self.node, self.rest
)
let mut s = String::new();
if self.validator {
s.push_str("validator");
}
if self.prover {
if !s.is_empty() {
s.push_str(", ");
}
s.push_str("prover");
}
if self.client {
if !s.is_empty() {
s.push_str(", ");
}
s.push_str("client");
}
if self.compute {
if !s.is_empty() {
s.push_str(", ");
}
s.push_str("compute");
}

f.write_str(&s)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/snot/src/cannon/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl ComputeTarget {
ComputeTarget::Agent => {
// find a client, mark it as busy
let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| {
if !a.is_busy() && a.is_inventory() {
if a.can_compute() {
a.client_owned().map(|c| (c, a.make_busy()))
} else {
None
Expand Down
4 changes: 4 additions & 0 deletions crates/snot/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl Environment {
// agent best suited to be a node,
// instead of naively picking an agent to fill the needs of
// a node

// TODO: use node.agent and node.labels against the agent's id and labels
// TODO: use node.mode to determine if the agent can be a node

node_map.extend(
initial_nodes
.keys()
Expand Down
31 changes: 29 additions & 2 deletions crates/snot/src/schema/nodes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{fmt::Display, net::SocketAddr, str::FromStr};
use std::{collections::HashSet, fmt::Display, net::SocketAddr, str::FromStr};

use indexmap::IndexMap;
use lazy_static::lazy_static;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize};
use snot_common::state::{HeightRequest, NodeState, NodeType};
use snot_common::{
lasso::Spur,
state::{AgentId, HeightRequest, NodeState, NodeType},
INTERN,
};

use super::{NodeKey, NodeTargets};

Expand Down Expand Up @@ -35,6 +39,18 @@ fn please_be_online() -> bool {
true
}

/// Parse the labels as strings, but intern them on load
fn get_label<'de, D>(deserializer: D) -> Result<HashSet<Spur>, D::Error>
where
D: Deserializer<'de>,
{
let labels = Vec::<String>::deserialize(deserializer)?;
Ok(labels
.into_iter()
.map(|label| INTERN.get_or_intern(label))
.collect())
}

// TODO: could use some more clarification on some of these fields
/// A node in the testing infrastructure.
#[derive(Deserialize, Debug, Clone)]
Expand All @@ -53,8 +69,19 @@ pub struct Node {
/// inherited.
pub height: Option<usize>,

/// When specified, agents must have these labels
#[serde(default, deserialize_with = "get_label")]
pub labels: HashSet<Spur>,

/// When specified, an agent must have this id
#[serde(default)]
pub agent: Option<AgentId>,

/// List of validators for the node to connect to
#[serde(default)]
pub validators: NodeTargets,

/// List of peers for the node to connect to
#[serde(default)]
pub peers: NodeTargets,
}
Expand Down
Loading

0 comments on commit b9733dc

Please sign in to comment.