diff --git a/crates/snops/src/env/mod.rs b/crates/snops/src/env/mod.rs index db1cf9fa..956bb16d 100644 --- a/crates/snops/src/env/mod.rs +++ b/crates/snops/src/env/mod.rs @@ -12,7 +12,6 @@ use std::{ use bimap::BiMap; use dashmap::DashMap; -use futures_util::future::join_all; use indexmap::{map::Entry, IndexMap, IndexSet}; use serde::{Deserialize, Serialize}; use snops_common::state::{ @@ -452,7 +451,7 @@ impl Environment { } // reconcile the nodes - initial_reconcile(env_id, &state).await?; + initial_reconcile(env_id, &state, prev_env.is_none()).await?; // instance cannons that are marked for immediate use for (name, count) in immediate_cannons { @@ -506,15 +505,15 @@ impl Environment { Ok(()) } - pub async fn cleanup(id: &EnvId, state: &GlobalState) -> Result<(), EnvError> { + pub async fn cleanup(id: EnvId, state: &GlobalState) -> Result<(), EnvError> { // clear the env state info!("clearing env {id} state..."); let (_, env) = state .envs - .remove(id) - .ok_or(CleanupError::EnvNotFound(*id))?; - if let Err(e) = PersistEnv::delete(&state.db, *id) { + .remove(&id) + .ok_or(CleanupError::EnvNotFound(id))?; + if let Err(e) = PersistEnv::delete(&state.db, id) { error!("failed to save delete {id} to persistence: {e}"); } @@ -525,130 +524,32 @@ impl Environment { handle.abort(); } - // reconcile all online agents - let (ids, handles): (Vec<_>, Vec<_>) = { + if let Err(e) = reconcile_agents( + state, env.node_peers .right_values() // find all agents associated with the env .filter_map(|peer| match peer { - EnvPeer::Internal(id) => state.pool.get(id), + EnvPeer::Internal(id) => Some(*id), _ => None, }) - // map the agents to rpc clients - .filter_map(|agent| agent.client_owned().map(|client| (agent.id(), client))) - // inventory reconcile the agents - .map(|(id, client)| { - ( + // this collect is necessary because the iter sent to reconcile_agents + // must be owned by this thread. Without this, the iter would hold a reference + // to the env.node_peers.right_values(), which is NOT Send + .collect::>() + .into_iter() + .filter_map(|id| { + Some(( id, - tokio::spawn(async move { client.reconcile(AgentState::Inventory).await }), - ) - }) - .unzip() - }; - - info!("inventorying {} agents...", ids.len()); - let reconciliations = join_all(handles).await; - info!("reconcile done, updating agent states..."); - - let mut success = 0; - let num_reconciles = ids.len(); - for (id, result) in ids.into_iter().zip(reconciliations) { - match result { - // oh god - Ok(Ok(Ok(agent_state))) => { - if let Some(mut agent) = state.pool.get_mut(&id) { - agent.set_state(agent_state); - if let Err(e) = agent.save(&state.db, id) { - error!("failed to save agent {id} to the database: {e}"); - } - success += 1; - } else { - error!("agent {id} not found in pool after successful reconcile") - } - } - - // reconcile error - Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"), - Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"), - Err(e) => error!("agent {id} experienced a join error: {e}"), - } - } - info!("cleanup result: {success}/{num_reconciles} agents inventoried"); - - Ok(()) - } - - // TODO: this is almost exactly the same as `cleanup`, maybe we can merge it - // later - pub async fn forcefully_inventory(id: EnvId, state: &GlobalState) -> Result<(), EnvError> { - let env = state - .get_env(id) - .ok_or(CleanupError::EnvNotFound(id))? - .clone(); - - // stop the timeline if it's running - if let Some(handle) = &*env.timeline_handle.lock().await { - handle.abort(); - } - - // reconcile all online agents - let (ids, handles): (Vec<_>, Vec<_>) = { - let mut ids = vec![]; - let mut handles = vec![]; - for peer in env.node_peers.right_values() { - let Some(mut agent) = (match peer { - EnvPeer::Internal(id) => state.pool.get_mut(id), - _ => continue, - }) else { - continue; - }; - - let Some(client) = agent.client_owned() else { - // forcibly set the agent state if it is offline - agent.set_state(AgentState::Inventory); - if let Err(e) = agent.save(&state.db, id) { - error!("failed to save agent {id} to the database: {e}"); - } - continue; - }; - - ids.push(agent.id()); - handles.push(tokio::spawn(async move { - client.reconcile(AgentState::Inventory).await - })); - } - - (ids, handles) - }; - - info!("inventorying {} agents...", ids.len()); - let reconciliations = join_all(handles).await; - info!("reconcile done, updating agent states..."); - - let mut success = 0; - let num_reconciles = ids.len(); - for (id, result) in ids.into_iter().zip(reconciliations) { - match result { - // oh god - Ok(Ok(Ok(agent_state))) => { - if let Some(mut agent) = state.pool.get_mut(&id) { - agent.set_state(agent_state); - if let Err(e) = agent.save(&state.db, id) { - error!("failed to save agent {id} to the database: {e}"); - } - success += 1; - } else { - error!("agent {id} not found in pool after successful reconcile") - } - } - - // reconcile error - Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"), - Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"), - Err(e) => error!("agent {id} experienced a join error: {e}"), - } + state.pool.get(&id)?.client_owned(), + AgentState::Inventory, + )) + }), + ) + .await + { + error!("an error occurred while attempting to inventory newly freed agents: {e}"); } - info!("inventory result: {success}/{num_reconciles} agents inventoried"); Ok(()) } @@ -717,7 +618,11 @@ impl Environment { } /// Reconcile all associated nodes with their initial state. -pub async fn initial_reconcile(env_id: EnvId, state: &GlobalState) -> Result<(), EnvError> { +pub async fn initial_reconcile( + env_id: EnvId, + state: &GlobalState, + is_new_env: bool, +) -> Result<(), EnvError> { let mut pending_reconciliations = vec![]; { let env = state @@ -763,8 +668,13 @@ pub async fn initial_reconcile(env_id: EnvId, state: &GlobalState) -> Result<(), } if let Err(e) = reconcile_agents(state, pending_reconciliations.into_iter()).await { + // if this is a patch to an existing environment, avoid inventorying the agents + if !is_new_env { + return Err(ReconcileError::Batch(e).into()); + } + error!("an error occurred on initial reconciliation, inventorying all agents: {e}"); - if let Err(e) = Environment::forcefully_inventory(env_id, state).await { + if let Err(e) = Environment::cleanup(env_id, state).await { error!("an error occurred inventorying agents: {e}"); } diff --git a/crates/snops/src/env/timeline.rs b/crates/snops/src/env/timeline.rs index 1f765a03..f064d9b7 100644 --- a/crates/snops/src/env/timeline.rs +++ b/crates/snops/src/env/timeline.rs @@ -321,12 +321,14 @@ impl Environment { reconcile_agents(&task_state, pending_reconciliations.into_values()) .await { - error!("failed to reconcile agents in timeline: {e}"); - if let Err(e) = - Environment::forcefully_inventory(env_id, &task_state).await - { + // TODO: timeline setting to enable cleanup on error + // in many cases, maintaining the failure state is easier to + // troubleshoot. can shoot alerts here too + + /* error!("failed to reconcile agents in timeline: {e}"); + if let Err(e) = Environment::cleanup(env_id, &task_state).await { error!("failed to inventory agents: {e}"); - } + } */ return Err(e.into()); }; diff --git a/crates/snops/src/server/api.rs b/crates/snops/src/server/api.rs index cc611fcc..507a83f5 100644 --- a/crates/snops/src/server/api.rs +++ b/crates/snops/src/server/api.rs @@ -206,7 +206,7 @@ async fn post_timeline( async fn delete_timeline( Path((env_id, t_id)): Path<(String, String)>, State(state): State, -) -> impl IntoResponse { +) -> Response { let Some(env_id) = id_or_none(&env_id) else { return StatusCode::NOT_FOUND.into_response(); }; @@ -221,15 +221,12 @@ async fn delete_timeline( } } -async fn delete_env( - Path(env_id): Path, - State(state): State, -) -> impl IntoResponse { +async fn delete_env(Path(env_id): Path, State(state): State) -> Response { let Some(env_id) = id_or_none(&env_id) else { return StatusCode::NOT_FOUND.into_response(); }; - match Environment::cleanup(&env_id, &state).await { + match Environment::cleanup(env_id, &state).await { Ok(_) => status_ok(), Err(e) => ServerError::from(e).into_response(), }