Skip to content

Commit

Permalink
Merge pull request #110 from monadicus/fix-reconciliation-behavior
Browse files Browse the repository at this point in the history
fix: improved agent reconciliation behavior
  • Loading branch information
voximity authored Apr 12, 2024
2 parents 996fccd + 4eb7936 commit 6552277
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 18 deletions.
15 changes: 13 additions & 2 deletions crates/snops-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ pub struct AgentRpcServer {
}

impl AgentService for AgentRpcServer {
async fn handshake(self, _: context::Context, handshake: Handshake) {
async fn handshake(
self,
context: context::Context,
handshake: Handshake,
) -> Result<(), ReconcileError> {
if let Some(token) = handshake.jwt {
// cache the JWT in the state JWT mutex
self.state
Expand All @@ -57,6 +61,14 @@ impl AgentService for AgentRpcServer {
.await
.expect("failed to write jwt file");
}

// reconcile if state has changed
let needs_reconcile = &*self.state.agent_state.read().await != &handshake.state;
if needs_reconcile {
Self::reconcile(self, context, handshake.state).await?;
}

Ok(())
}

async fn reconcile(
Expand Down Expand Up @@ -140,7 +152,6 @@ impl AgentService for AgentRpcServer {
// download and decompress the storage
// skip if we don't need storage
let AgentState::Node(env_id, node) = &target else {
info!("agent is not running a node; skipping storage download");
break 'storage;
};
let height = &node.height.1;
Expand Down
3 changes: 2 additions & 1 deletion crates/snops-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use crate::state::{AgentState, PortConfig};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Handshake {
pub jwt: Option<String>,
pub state: AgentState,
}

/// The RPC service that agents implement as a server.
#[tarpc::service]
pub trait AgentService {
/// Handshake with some initial connection details.
async fn handshake(handshake: Handshake);
async fn handshake(handshake: Handshake) -> Result<(), ReconcileError>;

/// Control plane asks the agent for its external network address, along
/// with local addrs.
Expand Down
6 changes: 3 additions & 3 deletions crates/snops-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct AgentId(Spur);
pub type StorageId = usize;
pub type EnvId = usize;

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AgentState {
#[default]
// A node in the inventory can function as a transaction cannon
Expand All @@ -42,7 +42,7 @@ impl AgentState {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeState {
pub node_key: NodeKey,
pub ty: NodeType,
Expand All @@ -57,7 +57,7 @@ pub struct NodeState {
}

/// A representation of which key to use for the agent.
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum KeyState {
/// No private key provided
#[default]
Expand Down
86 changes: 82 additions & 4 deletions crates/snops/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,78 @@ impl Environment {
Ok(())
}

// TODO: this is almost exactly the same as `cleanup`, maybe we can merge it
// later
pub async fn forcefully_inventory(id: usize, state: &GlobalState) -> Result<(), EnvError> {
let mut envs_lock = state.envs.write().await;
let env = envs_lock
.get_mut(&id)
.ok_or(CleanupError::EnvNotFound(id))?;

// stop the timeline if it's running
if let Some(handle) = &*env.timeline_handle.lock().await {
handle.abort();
}

// reconcile all online agents
let (ids, handles): (Vec<_>, Vec<_>) = {
let mut agents = state.pool.write().await;

let mut ids = vec![];
let mut handles = vec![];
for peer in env.node_map.right_values() {
let Some(agent) = (match peer {
EnvPeer::Internal(id) => agents.get_mut(id),
_ => continue,
}) else {
continue;
};

let Some(client) = agent.client_owned() else {
// forcibly set the agent state if it is offline
agent.set_state(AgentState::Inventory);
continue;
};

ids.push(agent.id());
handles.push(tokio::spawn(async move {
client.reconcile(AgentState::Inventory).await
}));
}

(ids, handles)
};

info!("inventorying {} agents...", ids.len());
let reconciliations = join_all(handles).await;
info!("reconcile done, updating agent states...");

let mut agents = state.pool.write().await;
let mut success = 0;
let num_reconciles = ids.len();
for (id, result) in ids.into_iter().zip(reconciliations) {
match result {
// oh god
Ok(Ok(Ok(state))) => {
if let Some(agent) = agents.get_mut(&id) {
agent.set_state(state);
success += 1;
} else {
error!("agent {id} not found in pool after successful reconcile")
}
}

// reconcile error
Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"),
Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"),
Err(e) => error!("agent {id} experienced a join error: {e}"),
}
}
info!("inventory result: {success}/{num_reconciles} agents inventoried");

Ok(())
}

/// Lookup a env agent id by node key.
pub fn get_agent_by_key(&self, key: &NodeKey) -> Option<AgentId> {
self.node_map.get_by_left(key).and_then(|id| match id {
Expand Down Expand Up @@ -488,8 +560,14 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> Result<(),
}
}

reconcile_agents(pending_reconciliations.into_iter(), &state.pool)
.await
.map_err(ReconcileError::Batch)?;
Ok(())
if let Err(e) = reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await {
error!("an error occurred on initial reconciliation, inventorying all agents: {e}");
if let Err(e) = Environment::forcefully_inventory(env_id, state).await {
error!("an error occurred inventorying agents: {e}");
}

Err(ReconcileError::Batch(e).into())
} else {
Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/snops/src/env/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,21 @@ impl Environment {
// reconcile all nodes
let task_state = Arc::clone(&state);
let reconcile_handle = tokio::spawn(async move {
reconcile_agents(pending_reconciliations.into_values(), &task_state.pool)
.await?;
if let Err(e) = reconcile_agents(
pending_reconciliations.into_values(),
&task_state.pool,
)
.await
{
error!("failed to reconcile agents in timeline: {e}");
if let Err(e) =
Environment::forcefully_inventory(env_id, &task_state).await
{
error!("failed to inventory agents: {e}");
}

return Err(e.into());
};
Ok(())
});

Expand Down
21 changes: 15 additions & 6 deletions crates/snops/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,27 @@ async fn handle_socket(
break 'reconnect;
}

// attach the current known agent state to the handshake
handshake.state = agent.state().to_owned();

// mark the agent as connected
agent.mark_connected(client);

let id = agent.id();
info!("agent {id} reconnected");

// TODO: probably want to reconcile with old state?

// handshake with client
// note: this may cause a reconciliation, so this *may* be non-instant
// unwrap safety: this agent was just `mark_connected` with a valid client
let client = agent.rpc().cloned().unwrap();
tokio::spawn(async move {
// we do this in a separate task because we don't want to hold up pool insertion
if let Err(e) = client.handshake(tarpc::context::current(), handshake).await {
error!("failed to perform client handshake: {e}");
match client.handshake(tarpc::context::current(), handshake).await {
Ok(Ok(())) => (),
Ok(Err(e)) => {
error!("failed to perform client handshake reconciliation: {e}")
}
Err(e) => error!("failed to perform client handshake: {e}"),
}
});

Expand Down Expand Up @@ -208,8 +215,10 @@ async fn handle_socket(
// handshake with the client
tokio::spawn(async move {
// we do this in a separate task because we don't want to hold up pool insertion
if let Err(e) = client.handshake(tarpc::context::current(), handshake).await {
error!("failed to perform client handshake: {e}");
match client.handshake(tarpc::context::current(), handshake).await {
Ok(Ok(())) => (),
Ok(Err(e)) => error!("failed to perform client handshake reconciliation: {e}"),
Err(e) => error!("failed to perform client handshake: {e}"),
}
});

Expand Down

0 comments on commit 6552277

Please sign in to comment.