Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved agent reconciliation behavior #110

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions crates/snops-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ pub struct AgentRpcServer {
}

impl AgentService for AgentRpcServer {
async fn handshake(self, _: context::Context, handshake: Handshake) {
async fn handshake(
self,
context: context::Context,
handshake: Handshake,
) -> Result<(), ReconcileError> {
if let Some(token) = handshake.jwt {
// cache the JWT in the state JWT mutex
self.state
Expand All @@ -57,6 +61,14 @@ impl AgentService for AgentRpcServer {
.await
.expect("failed to write jwt file");
}

// reconcile if state has changed
let needs_reconcile = &*self.state.agent_state.read().await != &handshake.state;
if needs_reconcile {
Self::reconcile(self, context, handshake.state).await?;
}

Ok(())
}

async fn reconcile(
Expand Down Expand Up @@ -140,7 +152,6 @@ impl AgentService for AgentRpcServer {
// download and decompress the storage
// skip if we don't need storage
let AgentState::Node(env_id, node) = &target else {
info!("agent is not running a node; skipping storage download");
break 'storage;
};
let height = &node.height.1;
Expand Down
3 changes: 2 additions & 1 deletion crates/snops-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use crate::state::{AgentState, PortConfig};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Handshake {
pub jwt: Option<String>,
pub state: AgentState,
}

/// The RPC service that agents implement as a server.
#[tarpc::service]
pub trait AgentService {
/// Handshake with some initial connection details.
async fn handshake(handshake: Handshake);
async fn handshake(handshake: Handshake) -> Result<(), ReconcileError>;

/// Control plane asks the agent for its external network address, along
/// with local addrs.
Expand Down
6 changes: 3 additions & 3 deletions crates/snops-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct AgentId(Spur);
pub type StorageId = usize;
pub type EnvId = usize;

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AgentState {
#[default]
// A node in the inventory can function as a transaction cannon
Expand All @@ -42,7 +42,7 @@ impl AgentState {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeState {
pub node_key: NodeKey,
pub ty: NodeType,
Expand All @@ -57,7 +57,7 @@ pub struct NodeState {
}

/// A representation of which key to use for the agent.
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum KeyState {
/// No private key provided
#[default]
Expand Down
86 changes: 82 additions & 4 deletions crates/snops/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,78 @@ impl Environment {
Ok(())
}

// TODO: this is almost exactly the same as `cleanup`, maybe we can merge it
// later
pub async fn forcefully_inventory(id: usize, state: &GlobalState) -> Result<(), EnvError> {
let mut envs_lock = state.envs.write().await;
let env = envs_lock
.get_mut(&id)
.ok_or(CleanupError::EnvNotFound(id))?;

// stop the timeline if it's running
if let Some(handle) = &*env.timeline_handle.lock().await {
handle.abort();
}

// reconcile all online agents
let (ids, handles): (Vec<_>, Vec<_>) = {
let mut agents = state.pool.write().await;

let mut ids = vec![];
let mut handles = vec![];
for peer in env.node_map.right_values() {
let Some(agent) = (match peer {
EnvPeer::Internal(id) => agents.get_mut(id),
_ => continue,
}) else {
continue;
};

let Some(client) = agent.client_owned() else {
// forcibly set the agent state if it is offline
agent.set_state(AgentState::Inventory);
continue;
};

ids.push(agent.id());
handles.push(tokio::spawn(async move {
client.reconcile(AgentState::Inventory).await
}));
}

(ids, handles)
};

info!("inventorying {} agents...", ids.len());
let reconciliations = join_all(handles).await;
info!("reconcile done, updating agent states...");

let mut agents = state.pool.write().await;
let mut success = 0;
let num_reconciles = ids.len();
for (id, result) in ids.into_iter().zip(reconciliations) {
match result {
// oh god
Ok(Ok(Ok(state))) => {
if let Some(agent) = agents.get_mut(&id) {
agent.set_state(state);
success += 1;
} else {
error!("agent {id} not found in pool after successful reconcile")
}
}

// reconcile error
Ok(Ok(Err(e))) => error!("agent {id} experienced a reconcilation error: {e}"),
Ok(Err(e)) => error!("agent {id} experienced a rpc error: {e}"),
Err(e) => error!("agent {id} experienced a join error: {e}"),
}
}
info!("inventory result: {success}/{num_reconciles} agents inventoried");

Ok(())
}

/// Lookup a env agent id by node key.
pub fn get_agent_by_key(&self, key: &NodeKey) -> Option<AgentId> {
self.node_map.get_by_left(key).and_then(|id| match id {
Expand Down Expand Up @@ -488,8 +560,14 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> Result<(),
}
}

reconcile_agents(pending_reconciliations.into_iter(), &state.pool)
.await
.map_err(ReconcileError::Batch)?;
Ok(())
if let Err(e) = reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await {
error!("an error occurred on initial reconciliation, inventorying all agents: {e}");
if let Err(e) = Environment::forcefully_inventory(env_id, state).await {
error!("an error occurred inventorying agents: {e}");
}

Err(ReconcileError::Batch(e).into())
} else {
Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/snops/src/env/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,21 @@ impl Environment {
// reconcile all nodes
let task_state = Arc::clone(&state);
let reconcile_handle = tokio::spawn(async move {
reconcile_agents(pending_reconciliations.into_values(), &task_state.pool)
.await?;
if let Err(e) = reconcile_agents(
pending_reconciliations.into_values(),
&task_state.pool,
)
.await
{
error!("failed to reconcile agents in timeline: {e}");
if let Err(e) =
Environment::forcefully_inventory(env_id, &task_state).await
{
error!("failed to inventory agents: {e}");
}

return Err(e.into());
};
Ok(())
});

Expand Down
21 changes: 15 additions & 6 deletions crates/snops/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,27 @@ async fn handle_socket(
break 'reconnect;
}

// attach the current known agent state to the handshake
handshake.state = agent.state().to_owned();

// mark the agent as connected
agent.mark_connected(client);

let id = agent.id();
info!("agent {id} reconnected");

// TODO: probably want to reconcile with old state?

// handshake with client
// note: this may cause a reconciliation, so this *may* be non-instant
// unwrap safety: this agent was just `mark_connected` with a valid client
let client = agent.rpc().cloned().unwrap();
tokio::spawn(async move {
// we do this in a separate task because we don't want to hold up pool insertion
if let Err(e) = client.handshake(tarpc::context::current(), handshake).await {
error!("failed to perform client handshake: {e}");
match client.handshake(tarpc::context::current(), handshake).await {
Ok(Ok(())) => (),
Ok(Err(e)) => {
error!("failed to perform client handshake reconciliation: {e}")
}
Err(e) => error!("failed to perform client handshake: {e}"),
}
});

Expand Down Expand Up @@ -208,8 +215,10 @@ async fn handle_socket(
// handshake with the client
tokio::spawn(async move {
// we do this in a separate task because we don't want to hold up pool insertion
if let Err(e) = client.handshake(tarpc::context::current(), handshake).await {
error!("failed to perform client handshake: {e}");
match client.handshake(tarpc::context::current(), handshake).await {
Ok(Ok(())) => (),
Ok(Err(e)) => error!("failed to perform client handshake reconciliation: {e}"),
Err(e) => error!("failed to perform client handshake: {e}"),
}
});

Expand Down