Skip to content

Commit

Permalink
feat(snot): WIP timeline execution and reconciliation changes
Browse files Browse the repository at this point in the history
Signed-off-by: Zander Franks <[email protected]>
  • Loading branch information
voximity committed Mar 27, 2024
1 parent 3327d7a commit e72c2d3
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 80 deletions.
48 changes: 4 additions & 44 deletions crates/snot/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;
use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey};
use tracing::{info, warn};

use self::timeline::reconcile_agents;
use crate::{
cannon::{sink::TxSink, source::TxSource, CannonInstance},
schema::{
Expand Down Expand Up @@ -356,8 +357,7 @@ impl Environment {

/// Reconcile all associated nodes with their initial state.
pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Result<()> {
let mut handles = vec![];
let mut agent_ids = vec![];
let mut pending_reconciliations = vec![];
{
let envs_lock = state.envs.read().await;
let env = envs_lock
Expand Down Expand Up @@ -403,51 +403,11 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Re
.collect();

let agent_state = AgentState::Node(env_id, node_state);
agent_ids.push(id);
handles.push(tokio::spawn(async move {
client.reconcile(agent_state.clone()).await
}));
pending_reconciliations.push((id, client, agent_state));
}
}

let num_attempted_reconciliations = handles.len();

info!("waiting for reconcile...");
let reconciliations = join_all(handles).await;
info!("reconcile done, updating agent states...");

let mut pool_lock = state.pool.write().await;
let mut success = 0;
for (agent_id, result) in agent_ids.into_iter().zip(reconciliations) {
// safety: we acquired this before when building handles, agent_id wouldn't be
// here if the corresponding agent didn't exist
let agent = pool_lock.get_mut(&agent_id).unwrap();

match result {
// oh god
Ok(Ok(Ok(state))) => {
agent.set_state(state);
success += 1;
}

// reconcile error
Ok(Err(e)) => warn!(
"agent {} experienced a reconcilation error: {e}",
agent.id()
),

// could be a tokio error or an RPC error
_ => warn!(
"agent {} failed to reconcile for an unknown reason",
agent.id()
),
}
}

info!(
"reconciliation result: {success}/{} nodes reconciled",
num_attempted_reconciliations
);
reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await;

Ok(())
}
147 changes: 112 additions & 35 deletions crates/snot/src/env/timeline.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::sync::Arc;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};

use anyhow::bail;
use futures_util::future::join_all;
use snot_common::state::AgentState;
use thiserror::Error;
use tokio::{select, task::JoinHandle};
use tokio::{select, sync::RwLock, task::JoinHandle};

use super::Environment;
use crate::{
schema::timeline::{Action, ActionInstance, EventDuration},
state::GlobalState,
state::{Agent, AgentClient, AgentId, GlobalState},
};

#[derive(Debug, Error)]
Expand All @@ -17,6 +21,60 @@ pub enum ExecutionError {
AgentOffline,
}

/// The tuple to pass into `reconcile_agents`.
pub type PendingAgentReconcile = (AgentId, AgentClient, AgentState);

/// Reconcile a bunch of agents at once.
pub async fn reconcile_agents<I>(iter: I, pool_mtx: &RwLock<HashMap<AgentId, Agent>>)
where
I: Iterator<Item = PendingAgentReconcile>,
{
use tracing::{info, warn};

let mut handles = vec![];
let mut agent_ids = vec![];

for (id, client, target) in iter {
agent_ids.push(id);
handles.push(tokio::spawn(async move { client.reconcile(target).await }));
}

let num_reconciliations = handles.len();
info!("beginning reconciliation...");
let reconciliations = join_all(handles).await;
info!("reconciliation complete, updating agent states...");

let mut pool_lock = pool_mtx.write().await;
let mut success = 0;
for (agent_id, result) in agent_ids.into_iter().zip(reconciliations) {
let Some(agent) = pool_lock.get_mut(&agent_id) else {
continue;
};

match result {
Ok(Ok(Ok(state))) => {
agent.set_state(state);
success += 1;
}

Ok(Err(e)) => warn!(
"agent {} experienced a reconcilation error: {e}",
agent.id(),
),

_ => warn!(
"agent {} failed to reconcile for an unknown reason",
agent.id(),
),
}
}

info!(
"reconciliation result: {success}/{} nodes reconciled",
num_reconciliations
);
}

impl Environment {
pub async fn execute(state: Arc<GlobalState>, id: usize) -> anyhow::Result<()> {
let env = Arc::clone(match state.envs.read().await.get(&id) {
Expand All @@ -41,52 +99,71 @@ impl Environment {
}
}

let mut pending_reconciliations: HashMap<usize, PendingAgentReconcile> =
HashMap::new();

macro_rules! set_node_field {
($agent:ident , $($key:ident = $val:expr),* ) => {
match pending_reconciliations.entry($agent.id()) {
Entry::Occupied(mut ent) => {
match ent.get_mut().2 {
AgentState::Inventory => (),
AgentState::Node(_, ref mut state) => {
$(state.$key = $val;)*
}
}
}
Entry::Vacant(ent) => {
ent.insert((
$agent.id(),
$agent.client_owned().ok_or(ExecutionError::AgentOffline)?,
$agent.state().clone().map_node(|mut n| {
$(n.$key = $val;)*
n
})
));
}
}
};
}

for ActionInstance { action, awaited } in &event.actions.0 {
let handle = match action {
// toggle online state
Action::Online(targets) | Action::Offline(targets) => {
let online = matches!(action, Action::Online(_));

// get target agents
// TODO: this is unbelievably ugly
let agents = env
.matching_agents(targets, &*pool)
.map(|agent| {
Ok((
agent.client_owned().ok_or(ExecutionError::AgentOffline)?,
agent.state().clone(),
))
})
.collect::<Result<Vec<_>, _>>()?;

// reconcile each client agent
tokio::spawn(async move {
let handles = agents
.into_iter()
.map(|(client, state)| {
let target_state = state.map_node(|mut n| {
n.online = online;
n
});

tokio::spawn(
async move { client.reconcile(target_state).await },
)
})
.collect::<Vec<_>>();
for agent in env.matching_agents(targets, &*pool) {
set_node_field!(agent, online = online);
}

let _reconciliations = join_all(handles.into_iter()).await;

// TODO: update agent state in control plane
})
// get target agents
// let agents = env
// .matching_agents(targets, &*pool)
// .map(|agent| {
// agent.map_to_node_state_reconcile(|mut n|
// {
// n.online = online;
// n
// })
// })
// .collect::<Option<Vec<_>>>()
// .ok_or(ExecutionError::AgentOffline)?;

// // reconcile each client agent
// let task_state = Arc::clone(&state);
// tokio::spawn(async move {
// reconcile_agents(agents.into_iter(),
// &task_state.pool).await;
// })
}

Action::Cannon(_) => unimplemented!(),
Action::Height(_) => unimplemented!(),
};

if *awaited {
awaiting_handles.push(handle);
// awaiting_handles.push(handle);
}
}

Expand Down
16 changes: 15 additions & 1 deletion crates/snot/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use bimap::BiMap;
use jwt::SignWithKey;
use snot_common::{
rpc::agent::{AgentServiceClient, ReconcileError},
state::{AgentState, PortConfig},
state::{AgentState, NodeState, PortConfig},
};
use surrealdb::{engine::local::Db, Surreal};
use tarpc::{client::RpcError, context};
Expand Down Expand Up @@ -176,6 +176,20 @@ impl Agent {
pub fn set_addrs(&mut self, external_addr: Option<IpAddr>, internal_addrs: Vec<IpAddr>) {
self.addrs = Some((external_addr, internal_addrs));
}

pub fn map_to_node_state_reconcile<F>(&self, f: F) -> Option<(usize, AgentClient, AgentState)>
where
F: Fn(NodeState) -> NodeState,
{
Some((
self.id(),
self.client_owned()?,
match &self.state {
AgentState::Node(id, state) => AgentState::Node(*id, f(state.clone())),
_ => return None,
},
))
}
}

impl AgentClient {
Expand Down

0 comments on commit e72c2d3

Please sign in to comment.