diff --git a/Cargo.lock b/Cargo.lock index 6673139c..5299b91b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1390,6 +1390,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixedbitset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a73ce1c73b7c2bae1da2d2881eadd020bafb406731036ad248e8a7c2df392c" + [[package]] name = "flate2" version = "1.0.28" @@ -3080,7 +3086,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ - "fixedbitset", + "fixedbitset 0.4.2", "indexmap 2.2.5", ] @@ -5382,6 +5388,7 @@ dependencies = [ "clap", "duration-str", "external-ip", + "fixedbitset 0.5.6", "futures-util", "hmac", "indexmap 2.2.5", @@ -5389,6 +5396,7 @@ dependencies = [ "lazy_static", "rand", "rand_chacha", + "rayon", "regex", "reqwest 0.12.0", "serde", diff --git a/crates/snot-agent/src/cli.rs b/crates/snot-agent/src/cli.rs index c4581d4d..c531676d 100644 --- a/crates/snot-agent/src/cli.rs +++ b/crates/snot-agent/src/cli.rs @@ -7,6 +7,7 @@ use std::{ use clap::Parser; use http::Uri; use snot_common::state::{AgentId, AgentMode, PortConfig}; +use tracing::info; pub const ENV_ENDPOINT: &str = "SNOT_ENDPOINT"; pub const ENV_ENDPOINT_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234); @@ -66,6 +67,7 @@ impl Cli { // add ?labels= or &labels= if id is present if let Some(labels) = &self.labels { + info!("using labels: {:?}", labels); query.push_str(&format!("&labels={}", labels.join(","))); } diff --git a/crates/snot-common/src/state.rs b/crates/snot-common/src/state.rs index d1d7ecd0..5ad1b0b1 100644 --- a/crates/snot-common/src/state.rs +++ b/crates/snot-common/src/state.rs @@ -206,6 +206,14 @@ impl NodeType { Self::Prover => "--prover", } } + + pub fn bit(self) -> usize { + match self { + Self::Validator => 0, + Self::Prover => 1, + Self::Client => 2, + } + } } impl Display for NodeType { diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 1925264f..40a32acc 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -12,6 +12,7 @@ chrono = { workspace = true, features = ["serde"] } clap.workspace = true duration-str = { version = "0.7", default-features = false } external-ip.workspace = true +fixedbitset = "0.5.6" futures-util.workspace = true hmac = "0.12.1" indexmap.workspace = true @@ -19,6 +20,7 @@ jwt = "0.16.0" lazy_static.workspace = true rand.workspace = true rand_chacha.workspace = true +rayon.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["stream", "json"] } serde.workspace = true diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs index bfe83bbe..f26c1067 100644 --- a/crates/snot/src/cannon/mod.rs +++ b/crates/snot/src/cannon/mod.rs @@ -341,7 +341,7 @@ impl ExecutionContext { let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id); let query = match compute { // agents already know the host of the control plane - ComputeTarget::Agent => suffix, + ComputeTarget::Agent { .. } => suffix, // demox needs to locate it ComputeTarget::Demox { .. } => { let Some(host) = get_host(state).await else { diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs index 38bc52dd..e02b0170 100644 --- a/crates/snot/src/cannon/source.rs +++ b/crates/snot/src/cannon/source.rs @@ -3,9 +3,13 @@ use std::collections::HashSet; use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Serialize}; use serde_json::json; -use snot_common::state::NodeKey; +use snot_common::{lasso::Spur, state::NodeKey, INTERN}; -use crate::{env::Environment, schema::nodes::KeySource, state::GlobalState}; +use crate::{ + env::{set::find_compute_agent, Environment}, + schema::nodes::KeySource, + state::GlobalState, +}; use super::{authorized::Authorize, net::get_available_port}; @@ -56,15 +60,62 @@ pub enum QueryTarget { Node(NodeKey), } +impl Default for QueryTarget { + fn default() -> Self { + QueryTarget::Local(LocalService { sync_from: None }) + } +} + +fn deser_labels<'de, D>(deser: D) -> Result>, D::Error> +where + D: serde::Deserializer<'de>, +{ + Ok(Option::>::deserialize(deser)?.map(|s| { + s.into_iter() + .map(|s| INTERN.get_or_intern(s)) + .collect::>() + })) +} + +fn ser_labels(labels: &Option>, ser: S) -> Result +where + S: serde::Serializer, +{ + match labels { + Some(labels) => { + let labels = labels + .iter() + .map(|s| INTERN.resolve(s)) + .collect::>(); + serde::Serialize::serialize(&labels, ser) + } + None => serde::Serialize::serialize(&None::, ser), + } +} + /// Which service is providing the compute power for executing transactions -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case", untagged)] pub enum ComputeTarget { /// Use the agent pool to generate executions - #[default] - Agent, + Agent { + #[serde( + default, + deserialize_with = "deser_labels", + serialize_with = "ser_labels", + skip_serializing_if = "Option::is_none" + )] + labels: Option>, + }, /// Use demox' API to generate executions - Demox { url: String }, + #[serde(rename_all = "kebab-case")] + Demox { demox_api: String }, +} + +impl Default for ComputeTarget { + fn default() -> Self { + ComputeTarget::Agent { labels: None } + } } #[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] @@ -98,7 +149,9 @@ pub enum TxSource { /// Generate transactions in real time #[serde(rename_all = "kebab-case")] RealTime { + #[serde(default)] query: QueryTarget, + #[serde(default)] compute: ComputeTarget, /// defaults to TransferPublic @@ -193,15 +246,12 @@ impl ComputeTarget { auth: serde_json::Value, ) -> Result<()> { match self { - ComputeTarget::Agent => { + ComputeTarget::Agent { labels } => { // find a client, mark it as busy - let Some((client, _busy)) = state.pool.read().await.values().find_map(|a| { - if a.can_compute() { - a.client_owned().map(|c| (c, a.make_busy())) - } else { - None - } - }) else { + let Some((client, _busy)) = find_compute_agent( + state.pool.read().await.values(), + &labels.clone().unwrap_or_default(), + ) else { bail!("no agents available to execute authorization") }; @@ -212,7 +262,7 @@ impl ComputeTarget { Ok(()) } - ComputeTarget::Demox { url } => { + ComputeTarget::Demox { demox_api: url } => { let _body = json!({ "jsonrpc": "2.0", "id": 1, @@ -236,9 +286,7 @@ impl ComputeTarget { mod test { use super::*; use crate::{ - cannon::source::{ - ComputeTarget, CreditsTxMode, LedgerQueryService, LocalQueryService, TxMode, - }, + cannon::source::{ComputeTarget, CreditsTxMode, LocalService, TxMode}, schema::nodes::KeySource, }; use std::str::FromStr; @@ -255,8 +303,8 @@ mod test { println!( "{}", serde_yaml::to_string(&TxSource::RealTime { - query: LedgerQueryService::Local(LocalQueryService { sync_from: None }), - compute: ComputeTarget::Agent, + query: QueryTarget::Local(LocalService { sync_from: None }), + compute: ComputeTarget::Agent { labels: None }, tx_modes: [TxMode::Credits(CreditsTxMode::TransferPublic)] .into_iter() .collect(), @@ -266,4 +314,5 @@ mod test { .unwrap() ); } -} */ +} + */ diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index 0b2029b4..553d92bc 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -1,3 +1,4 @@ +pub mod set; pub mod timeline; use std::{ @@ -10,7 +11,7 @@ use std::{ }, }; -use anyhow::{anyhow, bail, ensure}; +use anyhow::{anyhow, bail}; use bimap::{BiHashMap, BiMap}; use futures_util::future::join_all; use indexmap::{map::Entry, IndexMap}; @@ -20,7 +21,7 @@ use tokio::{ sync::{Mutex, RwLock}, task::JoinHandle, }; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use self::timeline::{reconcile_agents, ExecutionError}; use crate::{ @@ -30,6 +31,7 @@ use crate::{ source::TxSource, CannonInstance, }, + env::set::{get_agent_mappings, labels_from_nodes, pair_with_nodes, BusyMode}, schema::{ nodes::{ExternalNode, Node}, storage::LoadedStorage, @@ -174,33 +176,38 @@ impl Environment { } } - // delegate agents to become nodes - let pool = state.pool.read().await; - let available_agent = pool - .values() - .filter(|a| a.is_node_capable() && a.is_inventory()); - let num_available_agents = available_agent.clone().count(); - - ensure!( - num_available_agents >= initial_nodes.len(), - "not enough available agents to satisfy node topology" + // get a set of all labels the nodes can reference + let labels = labels_from_nodes(&initial_nodes); + + // temporarily lock the agent pool for reading to convert them into + // masks against the labels. + // + // this also contains a "busy" that atomically prevents multiple + // environment prepares from delegating the same agents as well + // as preventing two nodes from claiming the same agent + let agents = get_agent_mappings( + BusyMode::Env, + state.pool.read().await.values(), + &labels, ); - // TODO: remove this naive delegation, replace with - // some kind of "pick_agent" function that picks an - // agent best suited to be a node, - // instead of naively picking an agent to fill the needs of - // a node - - // TODO: use node.agent and node.labels against the agent's id and labels - // TODO: use node.mode to determine if the agent can be a node - - node_map.extend( - initial_nodes - .keys() - .cloned() - .zip(available_agent.map(|agent| EnvPeer::Internal(agent.id()))), - ); + // ensure the "busy" is in scope until the initial reconcile completes and + // locks the agents into a non-inventory state + let _busy: Vec<_> = match pair_with_nodes(agents, &initial_nodes, &labels) { + Ok(pairs) => pairs, + Err(errors) => { + for error in &errors { + error!("delegation error: {error}"); + } + return Err(anyhow!("{} delegation errors occurred", errors.len())); + } + } + .map(|(key, id, busy)| { + // extend the node map with the newly paired agent + node_map.insert(key, EnvPeer::Internal(id)); + busy + }) + .collect(); info!("delegated {} nodes to agents", node_map.len()); for (key, node) in &node_map { diff --git a/crates/snot/src/env/set.rs b/crates/snot/src/env/set.rs new file mode 100644 index 00000000..b54d6ac2 --- /dev/null +++ b/crates/snot/src/env/set.rs @@ -0,0 +1,257 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::{mpsc, Arc, Weak}, +}; + +use fixedbitset::FixedBitSet; +use indexmap::IndexMap; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use snot_common::{ + lasso::Spur, + state::{AgentId, NodeKey}, +}; +use thiserror::Error; + +use crate::state::{Agent, AgentClient, Busy}; + +use super::EnvNode; + +pub struct AgentMapping { + id: AgentId, + claim: Weak, + mask: FixedBitSet, +} + +/// Ways of describing how an agent can be busy +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BusyMode { + /// The agent is busy with a compute task + Compute, + /// The agent is busy with an env task + Env, +} + +impl AgentMapping { + pub fn new(mode: BusyMode, agent: &Agent, labels: &[Spur]) -> Option { + if !agent.is_inventory() { + return None; + } + + // check if the agent is available in the given mode + let claim = match mode { + BusyMode::Compute => { + if !agent.can_compute() { + return None; + } + agent.get_compute_claim() + } + BusyMode::Env => { + if !(agent.is_node_capable() && agent.is_inventory()) { + return None; + } + agent.get_env_claim() + } + }; + + // check if the agent is already claimed + if claim.strong_count() > 1 { + return None; + } + + Some(Self { + id: agent.id(), + claim, + mask: agent.mask(labels), + }) + } + + /// Attempt to atomically claim the agent + pub fn claim(&self) -> Option> { + // avoid needlessly upgrading the weak pointer + if self.claim.strong_count() > 1 { + return None; + } + + let arc = self.claim.upgrade()?; + // 2 because the agent owns arc, and this would be the second + // there is a slim chance that two nodes could claim the same agent. if we run into this + // we can add an AtomicBool to the mapping to determine if the agent is claimed by + // the node on this thread + (Arc::strong_count(&arc) == 2).then_some(arc) + } + + /// Attempt to atomically claim the agent if there is a mask subset + pub fn claim_if_subset(&self, mask: &FixedBitSet) -> Option> { + if mask.is_subset(&self.mask) { + self.claim() + } else { + None + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum DelegationError { + #[error("insufficient number of agents to satisfy the request")] + InsufficientAgentCount, + #[error("agent {0} not found for node {1}")] + AgentNotFound(AgentId, NodeKey), + #[error("agent {0} already claimed for node {1}")] + AgentAlreadyClaimed(AgentId, NodeKey), + #[error("agent {0} does not support the mode needed for {1}")] + AgentMissingMode(AgentId, NodeKey), + #[error("could not find any agents for node {0}")] + NoAvailableAgents(NodeKey), +} + +/// Convert an iterator of agents into a vec of agent mappings +/// This is necessary the so the pool of agents can be dropped for longer running tasks +pub fn get_agent_mappings<'a, I: Iterator>( + mode: BusyMode, + agents: I, + labels: &[Spur], +) -> Vec { + agents + .filter_map(|agent| AgentMapping::new(mode, agent, labels)) + .collect() +} + +/// Get a list of unique labels given a node config +pub fn labels_from_nodes(nodes: &IndexMap) -> Vec { + let mut labels = HashSet::new(); + + for node in nodes.values() { + match node { + EnvNode::Internal(n) => { + labels.extend(&n.labels); + } + EnvNode::External(_) => {} + } + } + + labels.into_iter().collect() +} + +/// Find an agent that can compute and has the given labels using a fixedbitset (SIMD) +/// +/// This approach would make more sense if we had a variety of masks (sets of labels) +/// Rather than checking against a finite mask. +fn _find_compute_agent_by_mask<'a, I: Iterator>( + mut agents: I, + labels: &[Spur], +) -> Option<(&'a Agent, Arc)> { + // replace with + let mut mask = FixedBitSet::with_capacity(labels.len() + 4); + mask.insert_range(4..labels.len() + 4); + + agents.find_map(|agent| { + AgentMapping::new(BusyMode::Compute, agent, labels) + .and_then(|m| m.claim_if_subset(&mask).map(|arc| (agent, arc))) + }) +} + +/// Find an agent that can compute and has the given labels by checking each label individually +pub fn find_compute_agent<'a, I: Iterator>( + mut agents: I, + labels: &[Spur], +) -> Option<(AgentClient, Arc)> { + agents.find_map(|a| { + if !a.can_compute() || a.is_compute_claimed() || !labels.iter().all(|l| a.has_label(*l)) { + return None; + } + let arc = a.make_busy(); + a.client_owned() + .and_then(|c| (Arc::strong_count(&arc) == 2).then_some((c, arc))) + }) +} + +/// Given a map of nodes and list of agent mappings, attempt to pair each node with an agent in parallel +pub fn pair_with_nodes( + agents: Vec, + nodes: &IndexMap, + labels: &[Spur], +) -> Result)>, Vec> { + // errors that occurred while pairing nodes with agents + let (errors_tx, errors_rx) = mpsc::channel(); + // nodes that were successfully claimed. dropping this will automatically unclaim the agents + let (claimed_tx, claimed_rx) = mpsc::channel(); + + let (want_ids, want_labels) = nodes + .iter() + // filter out external nodes + // split into nodes that want specific agents and nodes that want specific labels + .filter_map(|(key, env_node)| match env_node { + EnvNode::Internal(n) => match n.agent { + Some(agent) => Some((Some((key, agent)), None)), + None => Some((None, Some((key, n.mask(key, labels))))), + }, + EnvNode::External(_) => None, + }) + // unzip and filter out the Nones + .fold((vec![], vec![]), |(mut vec_a, mut vec_b), (a, b)| { + if let Some(a) = a { + vec_a.push(a); + } + if let Some(b) = b { + vec_b.push(b); + } + (vec_a, vec_b) + }); + + if agents.len() < want_ids.len() + want_labels.len() { + return Err(vec![DelegationError::InsufficientAgentCount]); + } + + // another optimization that could be made is to sort nodes based on the number of agents with the specific labels. + // this would be useful for when some agents have unique labels as well as other common labels and + // there are nodes asking for agents with either. + + // TODO: potential performance improvement by splitting this agent map up available modes + // eg. client map, prover map, validator map, then pick by the key.ty + + // handle the nodes that want specific agents first + let agent_map = agents.iter().map(|a| (a.id, a)).collect::>(); + + // walk through all the nodes that want specific agents and attempt to pair them with an agent + want_ids.into_par_iter().for_each(|(key, id)| { + // ensure the agent exists + let Some(agent) = agent_map.get(&id) else { + let _ = errors_tx.send(DelegationError::AgentNotFound(id, key.clone())); + return; + }; + + // ensure this agent supports the needed mode + if !agent.mask.contains(key.ty.bit()) { + let _ = errors_tx.send(DelegationError::AgentMissingMode(id, key.clone())); + return; + } + + // attempt to claim the agent + if let Some(claim) = agent.claim() { + let _ = claimed_tx.send((key.clone(), id, claim)); + } else { + let _ = errors_tx.send(DelegationError::AgentAlreadyClaimed(id, key.clone())); + } + }); + + // walk through all the nodes that want specific labels/modes and attempt to pair them with an agent + // that has the matching mask + want_labels.into_par_iter().for_each(|(key, mask)| { + // find the first agent that can be claimed that fits the mask + if let Some((id, claim)) = agents + .iter() + .find_map(|a| a.claim_if_subset(&mask).map(|c| (a.id, c))) + { + let _ = claimed_tx.send((key.clone(), id, claim)); + } else { + let _ = errors_tx.send(DelegationError::NoAvailableAgents(key.clone())); + } + }); + + let errors = errors_rx.try_iter().collect::>(); + if errors.is_empty() { + Ok(claimed_rx.into_iter()) + } else { + Err(errors) + } +} diff --git a/crates/snot/src/schema/nodes.rs b/crates/snot/src/schema/nodes.rs index 49c6eb8f..b2626953 100644 --- a/crates/snot/src/schema/nodes.rs +++ b/crates/snot/src/schema/nodes.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, fmt::Display, net::SocketAddr, str::FromStr}; +use fixedbitset::FixedBitSet; use indexmap::IndexMap; use lazy_static::lazy_static; use serde::{de::Visitor, Deserialize, Deserializer, Serialize}; @@ -73,7 +74,7 @@ pub struct Node { #[serde(default, deserialize_with = "get_label")] pub labels: HashSet, - /// When specified, an agent must have this id + /// When specified, an agent must have this id. Overrides the labels field. #[serde(default)] pub agent: Option, @@ -102,6 +103,18 @@ impl Node { peers: vec![], } } + + pub fn mask(&self, key: &NodeKey, labels: &[Spur]) -> FixedBitSet { + let mut mask = FixedBitSet::with_capacity(labels.len() + 4); + mask.insert(key.ty.bit()); + + for (i, label) in labels.iter().enumerate() { + if self.labels.contains(label) { + mask.insert(i + 4); + } + } + mask + } } #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index 0043e58c..4316494a 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -86,7 +86,7 @@ where Ok(AgentMode::from(u8::deserialize(deser)?)) } -fn deser_labels<'de, D>(deser: D) -> Result>, D::Error> +pub fn deser_labels<'de, D>(deser: D) -> Result>, D::Error> where D: serde::Deserializer<'de>, { diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index a91e6318..ec078112 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -1,17 +1,18 @@ use std::{ collections::{HashMap, HashSet}, net::IpAddr, - sync::Arc, + sync::{Arc, Weak}, time::Instant, }; use anyhow::{anyhow, Result}; use bimap::BiMap; +use fixedbitset::FixedBitSet; use jwt::SignWithKey; use snot_common::{ lasso::Spur, rpc::agent::{AgentServiceClient, ReconcileError}, - state::{AgentId, AgentMode, AgentState, NodeState, PortConfig}, + state::{AgentId, AgentMode, AgentState, NodeState, NodeType, PortConfig}, INTERN, }; use surrealdb::{engine::local::Db, Surreal}; @@ -57,7 +58,9 @@ pub struct Agent { mode: AgentMode, /// Count of how many executions this agent is currently working on - busy: Arc, + compute_claim: Arc, + /// Count of how many environments this agent is currently + env_claim: Arc, /// The external address of the agent, along with its local addresses. ports: Option, @@ -79,7 +82,8 @@ impl Agent { .into_iter() .map(|s| INTERN.get_or_intern(s)) .collect(), - busy: Arc::new(Busy), + compute_claim: Arc::new(Busy), + env_claim: Arc::new(Busy), claims: Claims { id, nonce: *JWT_NONCE, @@ -110,7 +114,12 @@ impl Agent { } /// Check if an agent has a specific label - pub fn has_label(&self, label: &str) -> bool { + pub fn has_label(&self, label: Spur) -> bool { + self.labels.contains(&label) + } + + /// Check if an agent has a specific label + pub fn has_label_str(&self, label: &str) -> bool { INTERN .get(label) .map_or(false, |label| self.labels.contains(&label)) @@ -120,6 +129,30 @@ impl Agent { self.labels.iter().map(|s| INTERN.resolve(s)).collect() } + // Get the mask of this agent + pub fn mask(&self, labels: &[Spur]) -> FixedBitSet { + let mut mask = FixedBitSet::with_capacity(labels.len() + 4); + if self.mode.validator { + mask.insert(NodeType::Validator.bit()); + } + if self.mode.prover { + mask.insert(NodeType::Prover.bit()); + } + if self.mode.client { + mask.insert(NodeType::Client.bit()); + } + if self.mode.compute { + mask.insert(3); + } + + for (i, label) in labels.iter().enumerate() { + if self.labels.contains(label) { + mask.insert(i + 4); + } + } + mask + } + /// Check if an agent is in inventory state pub fn is_inventory(&self) -> bool { matches!(self.state, AgentState::Inventory) @@ -127,17 +160,33 @@ impl Agent { /// Check if an agent is available for compute tasks pub fn can_compute(&self) -> bool { - self.is_inventory() && self.mode.compute && !self.is_busy() + self.is_inventory() && self.mode.compute && !self.is_compute_claimed() } - /// Check if a agent is working on an authorization - pub fn is_busy(&self) -> bool { - Arc::strong_count(&self.busy) > 1 + /// Check if an agent is working on an authorization + pub fn is_compute_claimed(&self) -> bool { + Arc::strong_count(&self.compute_claim) > 1 } /// Mark an agent as busy. This is used to prevent multiple authorizations pub fn make_busy(&self) -> Arc { - Arc::clone(&self.busy) + Arc::clone(&self.compute_claim) + } + + /// Mark an agent as busy. This is used to prevent multiple authorizations + pub fn get_compute_claim(&self) -> Weak { + Arc::downgrade(&self.compute_claim) + } + + /// Check if an agent is owned by an environment + pub fn is_env_claimed(&self) -> bool { + Arc::strong_count(&self.env_claim) > 1 + } + + /// Get a weak reference to the env claim, which can be used to later lock this + /// agent for an environment. + pub fn get_env_claim(&self) -> Weak { + Arc::downgrade(&self.env_claim) } /// The ID of this agent. diff --git a/scripts/agent.sh b/scripts/agent.sh index fe02008b..9e5a515b 100755 --- a/scripts/agent.sh +++ b/scripts/agent.sh @@ -22,6 +22,6 @@ cargo run --release -p snot-agent -- \ --rest "303$INDEX" \ --metrics "900$INDEX" \ --node "413$INDEX" \ - --labels "local" \ + --labels "local,local-$INDEX" \ --client --validator --compute \ $@ diff --git a/specs/test-cannon-record.yaml b/specs/test-cannon-record.yaml index 86d5a41e..62ae02de 100644 --- a/specs/test-cannon-record.yaml +++ b/specs/test-cannon-record.yaml @@ -10,8 +10,8 @@ version: cannon.snarkos.testing.monadic.us/v1 name: committee-tx-public source: - query: { mode: local } - compute: agent + # query: { mode: local } + # compute: { labels: [local] } tx-modes: [transfer-public] private-keys: [committee.$] addresses: [committee.$]