Skip to content

Commit

Permalink
refactor(env): merge cleanup and force_reconcile, fix offline agents …
Browse files Browse the repository at this point in the history
…not reconciling
  • Loading branch information
Meshiest committed Apr 19, 2024
1 parent 1beb4e3 commit c9f12ee
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 135 deletions.
158 changes: 34 additions & 124 deletions crates/snops/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{

use bimap::BiMap;
use dashmap::DashMap;
use futures_util::future::join_all;
use indexmap::{map::Entry, IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
use snops_common::state::{
Expand Down Expand Up @@ -452,7 +451,7 @@ impl Environment {
}

// reconcile the nodes
initial_reconcile(env_id, &state).await?;
initial_reconcile(env_id, &state, prev_env.is_none()).await?;

// instance cannons that are marked for immediate use
for (name, count) in immediate_cannons {
Expand Down Expand Up @@ -506,15 +505,15 @@ impl Environment {
Ok(())
}

pub async fn cleanup(id: &EnvId, state: &GlobalState) -> Result<(), EnvError> {
pub async fn cleanup(id: EnvId, state: &GlobalState) -> Result<(), EnvError> {
// clear the env state
info!("clearing env {id} state...");

let (_, env) = state
.envs
.remove(id)
.ok_or(CleanupError::EnvNotFound(*id))?;
if let Err(e) = PersistEnv::delete(&state.db, *id) {
.remove(&id)
.ok_or(CleanupError::EnvNotFound(id))?;
if let Err(e) = PersistEnv::delete(&state.db, id) {
error!("failed to save delete {id} to persistence: {e}");
}

Expand All @@ -525,130 +524,32 @@ impl Environment {
handle.abort();
}

// reconcile all online agents
let (ids, handles): (Vec<_>, Vec<_>) = {
if let Err(e) = reconcile_agents(
state,
env.node_peers
.right_values()
// find all agents associated with the env
.filter_map(|peer| match peer {
EnvPeer::Internal(id) => state.pool.get(id),
EnvPeer::Internal(id) => Some(*id),
_ => None,
})
// map the agents to rpc clients
.filter_map(|agent| agent.client_owned().map(|client| (agent.id(), client)))
// inventory reconcile the agents
.map(|(id, client)| {
(
// this collect is necessary because the iter sent to reconcile_agents
// must be owned by this thread. Without this, the iter would hold a reference
// to the env.node_peers.right_values(), which is NOT Send
.collect::<Vec<_>>()
.into_iter()
.filter_map(|id| {
Some((
id,
tokio::spawn(async move { client.reconcile(AgentState::Inventory).await }),
)
})
.unzip()
};

info!("inventorying {} agents...", ids.len());
let reconciliations = join_all(handles).await;
info!("reconcile done, updating agent states...");

let mut success = 0;
let num_reconciles = ids.len();
for (id, result) in ids.into_iter().zip(reconciliations) {
match result {
// oh god
Ok(Ok(Ok(agent_state))) => {
if let Some(mut agent) = state.pool.get_mut(&id) {
agent.set_state(agent_state);
if let Err(e) = agent.save(&state.db, id) {
error!("failed to save agent {id} to the database: {e}");
}
success += 1;
} else {
error!("agent {id} not found in pool after successful reconcile")
}
}

// reconcile error
Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"),
Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"),
Err(e) => error!("agent {id} experienced a join error: {e}"),
}
}
info!("cleanup result: {success}/{num_reconciles} agents inventoried");

Ok(())
}

// TODO: this is almost exactly the same as `cleanup`, maybe we can merge it
// later
pub async fn forcefully_inventory(id: EnvId, state: &GlobalState) -> Result<(), EnvError> {
let env = state
.get_env(id)
.ok_or(CleanupError::EnvNotFound(id))?
.clone();

// stop the timeline if it's running
if let Some(handle) = &*env.timeline_handle.lock().await {
handle.abort();
}

// reconcile all online agents
let (ids, handles): (Vec<_>, Vec<_>) = {
let mut ids = vec![];
let mut handles = vec![];
for peer in env.node_peers.right_values() {
let Some(mut agent) = (match peer {
EnvPeer::Internal(id) => state.pool.get_mut(id),
_ => continue,
}) else {
continue;
};

let Some(client) = agent.client_owned() else {
// forcibly set the agent state if it is offline
agent.set_state(AgentState::Inventory);
if let Err(e) = agent.save(&state.db, id) {
error!("failed to save agent {id} to the database: {e}");
}
continue;
};

ids.push(agent.id());
handles.push(tokio::spawn(async move {
client.reconcile(AgentState::Inventory).await
}));
}

(ids, handles)
};

info!("inventorying {} agents...", ids.len());
let reconciliations = join_all(handles).await;
info!("reconcile done, updating agent states...");

let mut success = 0;
let num_reconciles = ids.len();
for (id, result) in ids.into_iter().zip(reconciliations) {
match result {
// oh god
Ok(Ok(Ok(agent_state))) => {
if let Some(mut agent) = state.pool.get_mut(&id) {
agent.set_state(agent_state);
if let Err(e) = agent.save(&state.db, id) {
error!("failed to save agent {id} to the database: {e}");
}
success += 1;
} else {
error!("agent {id} not found in pool after successful reconcile")
}
}

// reconcile error
Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"),
Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"),
Err(e) => error!("agent {id} experienced a join error: {e}"),
}
state.pool.get(&id)?.client_owned(),
AgentState::Inventory,
))
}),
)
.await
{
error!("an error occurred while attempting to inventory newly freed agents: {e}");
}
info!("inventory result: {success}/{num_reconciles} agents inventoried");

Ok(())
}
Expand Down Expand Up @@ -717,7 +618,11 @@ impl Environment {
}

/// Reconcile all associated nodes with their initial state.
pub async fn initial_reconcile(env_id: EnvId, state: &GlobalState) -> Result<(), EnvError> {
pub async fn initial_reconcile(
env_id: EnvId,
state: &GlobalState,
is_new_env: bool,
) -> Result<(), EnvError> {
let mut pending_reconciliations = vec![];
{
let env = state
Expand Down Expand Up @@ -763,8 +668,13 @@ pub async fn initial_reconcile(env_id: EnvId, state: &GlobalState) -> Result<(),
}

if let Err(e) = reconcile_agents(state, pending_reconciliations.into_iter()).await {
// if this is a patch to an existing environment, avoid inventorying the agents
if !is_new_env {
return Err(ReconcileError::Batch(e).into());
}

error!("an error occurred on initial reconciliation, inventorying all agents: {e}");
if let Err(e) = Environment::forcefully_inventory(env_id, state).await {
if let Err(e) = Environment::cleanup(env_id, state).await {
error!("an error occurred inventorying agents: {e}");
}

Expand Down
12 changes: 7 additions & 5 deletions crates/snops/src/env/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,14 @@ impl Environment {
reconcile_agents(&task_state, pending_reconciliations.into_values())
.await
{
error!("failed to reconcile agents in timeline: {e}");
if let Err(e) =
Environment::forcefully_inventory(env_id, &task_state).await
{
// TODO: timeline setting to enable cleanup on error
// in many cases, maintaining the failure state is easier to
// troubleshoot. can shoot alerts here too

/* error!("failed to reconcile agents in timeline: {e}");
if let Err(e) = Environment::cleanup(env_id, &task_state).await {
error!("failed to inventory agents: {e}");
}
} */

return Err(e.into());
};
Expand Down
9 changes: 3 additions & 6 deletions crates/snops/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async fn post_timeline(
async fn delete_timeline(
Path((env_id, t_id)): Path<(String, String)>,
State(state): State<AppState>,
) -> impl IntoResponse {
) -> Response {
let Some(env_id) = id_or_none(&env_id) else {
return StatusCode::NOT_FOUND.into_response();
};
Expand All @@ -221,15 +221,12 @@ async fn delete_timeline(
}
}

async fn delete_env(
Path(env_id): Path<String>,
State(state): State<AppState>,
) -> impl IntoResponse {
async fn delete_env(Path(env_id): Path<String>, State(state): State<AppState>) -> Response {
let Some(env_id) = id_or_none(&env_id) else {
return StatusCode::NOT_FOUND.into_response();
};

match Environment::cleanup(&env_id, &state).await {
match Environment::cleanup(env_id, &state).await {
Ok(_) => status_ok(),
Err(e) => ServerError::from(e).into_response(),
}
Expand Down

0 comments on commit c9f12ee

Please sign in to comment.