diff --git a/crates/snot/src/env/mod.rs b/crates/snot/src/env/mod.rs index c0320b11..550be138 100644 --- a/crates/snot/src/env/mod.rs +++ b/crates/snot/src/env/mod.rs @@ -18,6 +18,7 @@ use serde::Deserialize; use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; +use self::timeline::reconcile_agents; use crate::{ cannon::{sink::TxSink, source::TxSource, CannonInstance}, schema::{ @@ -356,8 +357,7 @@ impl Environment { /// Reconcile all associated nodes with their initial state. pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Result<()> { - let mut handles = vec![]; - let mut agent_ids = vec![]; + let mut pending_reconciliations = vec![]; { let envs_lock = state.envs.read().await; let env = envs_lock @@ -403,51 +403,11 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Re .collect(); let agent_state = AgentState::Node(env_id, node_state); - agent_ids.push(id); - handles.push(tokio::spawn(async move { - client.reconcile(agent_state.clone()).await - })); + pending_reconciliations.push((id, client, agent_state)); } } - let num_attempted_reconciliations = handles.len(); - - info!("waiting for reconcile..."); - let reconciliations = join_all(handles).await; - info!("reconcile done, updating agent states..."); - - let mut pool_lock = state.pool.write().await; - let mut success = 0; - for (agent_id, result) in agent_ids.into_iter().zip(reconciliations) { - // safety: we acquired this before when building handles, agent_id wouldn't be - // here if the corresponding agent didn't exist - let agent = pool_lock.get_mut(&agent_id).unwrap(); - - match result { - // oh god - Ok(Ok(Ok(state))) => { - agent.set_state(state); - success += 1; - } - - // reconcile error - Ok(Err(e)) => warn!( - "agent {} experienced a reconcilation error: {e}", - agent.id() - ), - - // could be a tokio error or an RPC error - _ => warn!( - "agent {} failed to reconcile for an unknown reason", - agent.id() - ), - } - } - - info!( - "reconciliation result: {success}/{} nodes reconciled", - num_attempted_reconciliations - ); + reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await; Ok(()) } diff --git a/crates/snot/src/env/timeline.rs b/crates/snot/src/env/timeline.rs index efd96f48..ef75b97b 100644 --- a/crates/snot/src/env/timeline.rs +++ b/crates/snot/src/env/timeline.rs @@ -1,14 +1,18 @@ -use std::sync::Arc; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, +}; use anyhow::bail; use futures_util::future::join_all; +use snot_common::state::AgentState; use thiserror::Error; -use tokio::{select, task::JoinHandle}; +use tokio::{select, sync::RwLock, task::JoinHandle}; use super::Environment; use crate::{ schema::timeline::{Action, ActionInstance, EventDuration}, - state::GlobalState, + state::{Agent, AgentClient, AgentId, GlobalState}, }; #[derive(Debug, Error)] @@ -17,6 +21,60 @@ pub enum ExecutionError { AgentOffline, } +/// The tuple to pass into `reconcile_agents`. +pub type PendingAgentReconcile = (AgentId, AgentClient, AgentState); + +/// Reconcile a bunch of agents at once. +pub async fn reconcile_agents(iter: I, pool_mtx: &RwLock>) +where + I: Iterator, +{ + use tracing::{info, warn}; + + let mut handles = vec![]; + let mut agent_ids = vec![]; + + for (id, client, target) in iter { + agent_ids.push(id); + handles.push(tokio::spawn(async move { client.reconcile(target).await })); + } + + let num_reconciliations = handles.len(); + info!("beginning reconciliation..."); + let reconciliations = join_all(handles).await; + info!("reconciliation complete, updating agent states..."); + + let mut pool_lock = pool_mtx.write().await; + let mut success = 0; + for (agent_id, result) in agent_ids.into_iter().zip(reconciliations) { + let Some(agent) = pool_lock.get_mut(&agent_id) else { + continue; + }; + + match result { + Ok(Ok(Ok(state))) => { + agent.set_state(state); + success += 1; + } + + Ok(Err(e)) => warn!( + "agent {} experienced a reconcilation error: {e}", + agent.id(), + ), + + _ => warn!( + "agent {} failed to reconcile for an unknown reason", + agent.id(), + ), + } + } + + info!( + "reconciliation result: {success}/{} nodes reconciled", + num_reconciliations + ); +} + impl Environment { pub async fn execute(state: Arc, id: usize) -> anyhow::Result<()> { let env = Arc::clone(match state.envs.read().await.get(&id) { @@ -41,44 +99,63 @@ impl Environment { } } + let mut pending_reconciliations: HashMap = + HashMap::new(); + + macro_rules! set_node_field { + ($agent:ident , $($key:ident = $val:expr),* ) => { + match pending_reconciliations.entry($agent.id()) { + Entry::Occupied(mut ent) => { + match ent.get_mut().2 { + AgentState::Inventory => (), + AgentState::Node(_, ref mut state) => { + $(state.$key = $val;)* + } + } + } + Entry::Vacant(ent) => { + ent.insert(( + $agent.id(), + $agent.client_owned().ok_or(ExecutionError::AgentOffline)?, + $agent.state().clone().map_node(|mut n| { + $(n.$key = $val;)* + n + }) + )); + } + } + }; + } + for ActionInstance { action, awaited } in &event.actions.0 { let handle = match action { // toggle online state Action::Online(targets) | Action::Offline(targets) => { let online = matches!(action, Action::Online(_)); - // get target agents - // TODO: this is unbelievably ugly - let agents = env - .matching_agents(targets, &*pool) - .map(|agent| { - Ok(( - agent.client_owned().ok_or(ExecutionError::AgentOffline)?, - agent.state().clone(), - )) - }) - .collect::, _>>()?; - - // reconcile each client agent - tokio::spawn(async move { - let handles = agents - .into_iter() - .map(|(client, state)| { - let target_state = state.map_node(|mut n| { - n.online = online; - n - }); - - tokio::spawn( - async move { client.reconcile(target_state).await }, - ) - }) - .collect::>(); + for agent in env.matching_agents(targets, &*pool) { + set_node_field!(agent, online = online); + } - let _reconciliations = join_all(handles.into_iter()).await; - - // TODO: update agent state in control plane - }) + // get target agents + // let agents = env + // .matching_agents(targets, &*pool) + // .map(|agent| { + // agent.map_to_node_state_reconcile(|mut n| + // { + // n.online = online; + // n + // }) + // }) + // .collect::>>() + // .ok_or(ExecutionError::AgentOffline)?; + + // // reconcile each client agent + // let task_state = Arc::clone(&state); + // tokio::spawn(async move { + // reconcile_agents(agents.into_iter(), + // &task_state.pool).await; + // }) } Action::Cannon(_) => unimplemented!(), @@ -86,7 +163,7 @@ impl Environment { }; if *awaited { - awaiting_handles.push(handle); + // awaiting_handles.push(handle); } } diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index e636b55a..a8dc6947 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -13,7 +13,7 @@ use bimap::BiMap; use jwt::SignWithKey; use snot_common::{ rpc::agent::{AgentServiceClient, ReconcileError}, - state::{AgentState, PortConfig}, + state::{AgentState, NodeState, PortConfig}, }; use surrealdb::{engine::local::Db, Surreal}; use tarpc::{client::RpcError, context}; @@ -176,6 +176,20 @@ impl Agent { pub fn set_addrs(&mut self, external_addr: Option, internal_addrs: Vec) { self.addrs = Some((external_addr, internal_addrs)); } + + pub fn map_to_node_state_reconcile(&self, f: F) -> Option<(usize, AgentClient, AgentState)> + where + F: Fn(NodeState) -> NodeState, + { + Some(( + self.id(), + self.client_owned()?, + match &self.state { + AgentState::Node(id, state) => AgentState::Node(*id, f(state.clone())), + _ => return None, + }, + )) + } } impl AgentClient {