Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): proper label/mode based delegation #89

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/snot-agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use clap::Parser;
use http::Uri;
use snot_common::state::{AgentId, AgentMode, PortConfig};
use tracing::info;

pub const ENV_ENDPOINT: &str = "SNOT_ENDPOINT";
pub const ENV_ENDPOINT_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
Expand Down Expand Up @@ -66,6 +67,7 @@ impl Cli {

// add ?labels= or &labels= if id is present
if let Some(labels) = &self.labels {
info!("using labels: {:?}", labels);
query.push_str(&format!("&labels={}", labels.join(",")));
}

Expand Down
8 changes: 8 additions & 0 deletions crates/snot-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ impl NodeType {
Self::Prover => "--prover",
}
}

pub fn bit(self) -> usize {
match self {
Self::Validator => 0,
Self::Prover => 1,
Self::Client => 2,
}
}
}

impl Display for NodeType {
Expand Down
2 changes: 2 additions & 0 deletions crates/snot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ chrono = { workspace = true, features = ["serde"] }
clap.workspace = true
duration-str = { version = "0.7", default-features = false }
external-ip.workspace = true
fixedbitset = "0.5.6"
futures-util.workspace = true
hmac = "0.12.1"
indexmap.workspace = true
jwt = "0.16.0"
lazy_static.workspace = true
rand.workspace = true
rand_chacha.workspace = true
rayon.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["stream", "json"] }
serde.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/snot/src/cannon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl ExecutionContext {
let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id);
let query = match compute {
// agents already know the host of the control plane
ComputeTarget::Agent => suffix,
ComputeTarget::Agent { .. } => suffix,
// demox needs to locate it
ComputeTarget::Demox { .. } => {
let Some(host) = get_host(state).await else {
Expand Down
93 changes: 71 additions & 22 deletions crates/snot/src/cannon/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ use std::collections::HashSet;
use anyhow::{anyhow, bail, Result};
use serde::{Deserialize, Serialize};
use serde_json::json;
use snot_common::state::NodeKey;
use snot_common::{lasso::Spur, state::NodeKey, INTERN};

use crate::{env::Environment, schema::nodes::KeySource, state::GlobalState};
use crate::{
env::{set::find_compute_agent, Environment},
schema::nodes::KeySource,
state::GlobalState,
};

use super::{authorized::Authorize, net::get_available_port};

Expand Down Expand Up @@ -56,15 +60,62 @@ pub enum QueryTarget {
Node(NodeKey),
}

impl Default for QueryTarget {
fn default() -> Self {
QueryTarget::Local(LocalService { sync_from: None })
}
}

fn deser_labels<'de, D>(deser: D) -> Result<Option<Vec<Spur>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Option::<Vec<String>>::deserialize(deser)?.map(|s| {
s.into_iter()
.map(|s| INTERN.get_or_intern(s))
.collect::<Vec<Spur>>()
}))
}

fn ser_labels<S>(labels: &Option<Vec<Spur>>, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match labels {
Some(labels) => {
let labels = labels
.iter()
.map(|s| INTERN.resolve(s))
.collect::<Vec<&str>>();
serde::Serialize::serialize(&labels, ser)
}
None => serde::Serialize::serialize(&None::<String>, ser),
}
}

/// Which service is providing the compute power for executing transactions
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", untagged)]
pub enum ComputeTarget {
/// Use the agent pool to generate executions
#[default]
Agent,
Agent {
#[serde(
default,
deserialize_with = "deser_labels",
serialize_with = "ser_labels",
skip_serializing_if = "Option::is_none"
)]
labels: Option<Vec<Spur>>,
},
/// Use demox' API to generate executions
Demox { url: String },
#[serde(rename_all = "kebab-case")]
Demox { demox_api: String },
}

impl Default for ComputeTarget {
fn default() -> Self {
ComputeTarget::Agent { labels: None }
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -98,7 +149,9 @@ pub enum TxSource {
/// Generate transactions in real time
#[serde(rename_all = "kebab-case")]
RealTime {
#[serde(default)]
query: QueryTarget,
#[serde(default)]
compute: ComputeTarget,

/// defaults to TransferPublic
Expand Down Expand Up @@ -193,15 +246,12 @@ impl ComputeTarget {
auth: serde_json::Value,
) -> Result<()> {
match self {
ComputeTarget::Agent => {
ComputeTarget::Agent { labels } => {
// find a client, mark it as busy
let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| {
if a.can_compute() {
a.client_owned().map(|c| (c, a.make_busy()))
} else {
None
}
}) else {
let Some((client, _busy)) = find_compute_agent(
state.pool.read().await.values(),
&labels.clone().unwrap_or_default(),
) else {
bail!("no agents available to execute authorization")
};

Expand All @@ -212,7 +262,7 @@ impl ComputeTarget {

Ok(())
}
ComputeTarget::Demox { url } => {
ComputeTarget::Demox { demox_api: url } => {
let _body = json!({
"jsonrpc": "2.0",
"id": 1,
Expand All @@ -236,9 +286,7 @@ impl ComputeTarget {
mod test {
use super::*;
use crate::{
cannon::source::{
ComputeTarget, CreditsTxMode, LedgerQueryService, LocalQueryService, TxMode,
},
cannon::source::{ComputeTarget, CreditsTxMode, LocalService, TxMode},
schema::nodes::KeySource,
};
use std::str::FromStr;
Expand All @@ -255,8 +303,8 @@ mod test {
println!(
"{}",
serde_yaml::to_string(&TxSource::RealTime {
query: LedgerQueryService::Local(LocalQueryService { sync_from: None }),
compute: ComputeTarget::Agent,
query: QueryTarget::Local(LocalService { sync_from: None }),
compute: ComputeTarget::Agent { labels: None },
tx_modes: [TxMode::Credits(CreditsTxMode::TransferPublic)]
.into_iter()
.collect(),
Expand All @@ -266,4 +314,5 @@ mod test {
.unwrap()
);
}
} */
}
*/
61 changes: 34 additions & 27 deletions crates/snot/src/env/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod set;
pub mod timeline;

use std::{
Expand All @@ -10,7 +11,7 @@ use std::{
},
};

use anyhow::{anyhow, bail, ensure};
use anyhow::{anyhow, bail};
use bimap::{BiHashMap, BiMap};
use futures_util::future::join_all;
use indexmap::{map::Entry, IndexMap};
Expand All @@ -20,7 +21,7 @@ use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tracing::{info, warn};
use tracing::{error, info, warn};

use self::timeline::{reconcile_agents, ExecutionError};
use crate::{
Expand All @@ -30,6 +31,7 @@ use crate::{
source::TxSource,
CannonInstance,
},
env::set::{get_agent_mappings, labels_from_nodes, pair_with_nodes, BusyMode},
schema::{
nodes::{ExternalNode, Node},
storage::LoadedStorage,
Expand Down Expand Up @@ -174,33 +176,38 @@ impl Environment {
}
}

// delegate agents to become nodes
let pool = state.pool.read().await;
let available_agent = pool
.values()
.filter(|a| a.is_node_capable() && a.is_inventory());
let num_available_agents = available_agent.clone().count();

ensure!(
num_available_agents >= initial_nodes.len(),
"not enough available agents to satisfy node topology"
// get a set of all labels the nodes can reference
let labels = labels_from_nodes(&initial_nodes);

// temporarily lock the agent pool for reading to convert them into
// masks against the labels.
//
// this also contains a "busy" that atomically prevents multiple
// environment prepares from delegating the same agents as well
// as preventing two nodes from claiming the same agent
let agents = get_agent_mappings(
BusyMode::Env,
state.pool.read().await.values(),
&labels,
);

// TODO: remove this naive delegation, replace with
// some kind of "pick_agent" function that picks an
// agent best suited to be a node,
// instead of naively picking an agent to fill the needs of
// a node

// TODO: use node.agent and node.labels against the agent's id and labels
// TODO: use node.mode to determine if the agent can be a node

node_map.extend(
initial_nodes
.keys()
.cloned()
.zip(available_agent.map(|agent| EnvPeer::Internal(agent.id()))),
);
// ensure the "busy" is in scope until the initial reconcile completes and
// locks the agents into a non-inventory state
let _busy: Vec<_> = match pair_with_nodes(agents, &initial_nodes, &labels) {
Ok(pairs) => pairs,
Err(errors) => {
for error in &errors {
error!("delegation error: {error}");
}
return Err(anyhow!("{} delegation errors occurred", errors.len()));
}
}
.map(|(key, id, busy)| {
// extend the node map with the newly paired agent
node_map.insert(key, EnvPeer::Internal(id));
busy
})
.collect();

info!("delegated {} nodes to agents", node_map.len());
for (key, node) in &node_map {
Expand Down
Loading