Skip to content

Commit

Permalink
allow termination to interrupt stuck instance ops
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Oct 23, 2024
1 parent 068b17a commit 2a331ab
Showing 1 changed file with 122 additions and 65 deletions.
187 changes: 122 additions & 65 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,6 @@ struct InstanceRunner {
// Request channel on which most instance requests are made.
rx: mpsc::Receiver<InstanceRequest>,

// Request channel for terminating the instance.
//
// This is a separate channel from the main request channel (`self.rx`)
// because we would like to be able to prioritize requests to terminate, and
// handle them even when the instance's main request channel may have filled
// up.
terminate_rx: mpsc::Receiver<TerminateRequest>,

// Request channel on which monitor requests are made.
tx_monitor: mpsc::Sender<InstanceMonitorRequest>,
rx_monitor: mpsc::Receiver<InstanceMonitorRequest>,
Expand Down Expand Up @@ -475,7 +467,7 @@ struct InstanceRunner {
}

impl InstanceRunner {
async fn run(mut self) {
async fn run(mut self, mut terminate_rx: mpsc::Receiver<TerminateRequest>) {
use InstanceRequest::*;
while !self.should_terminate {
tokio::select! {
Expand Down Expand Up @@ -544,7 +536,7 @@ impl InstanceRunner {
},
// Requests to terminate the instance take priority over any
// other request to the instance.
request = self.terminate_rx.recv() => {
request = terminate_rx.recv() => {
let Some(TerminateRequest { mark_failed, tx}) = request else {
warn!(
self.log,
Expand All @@ -569,64 +561,119 @@ impl InstanceRunner {

// Handle external requests to act upon the instance.
request = self.rx.recv() => {
let request_variant = request.as_ref().map(|r| r.to_string());
let result = match request {
Some(RequestZoneBundle { tx }) => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(GetFilesystemPool { tx } ) => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(AddExternalIp { ip, tx }) => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(DeleteExternalIp { ip, tx }) => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(RefreshExternalIps { tx }) => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
let request = match request {
Some(r) => r,
None => {
warn!(self.log, "Instance request channel closed; shutting down");
let mark_failed = false;
self.terminate(mark_failed).await;
break;
},
}
};
let request_variant = request.to_string();
// Okay, this is a little bit wacky: if we are waiting for
// one of the instance operations we run here to come back,
// and a termination request comes in, we want to give up on
// the outstanding operation and honor the termination
// request immediately. This is in case the instance
// operation has gotten stuck: we don't want it to prevent
// the instance from terminating because something else is
// wedged.
//
// Therefore, we're going to select between the future that
// actually performs the instance op and receiving another
// request from the termination channel.
let op = async {
match request {
RequestZoneBundle { tx } => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
GetFilesystemPool { tx } => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
CurrentState{ tx } => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
PutState{ state, tx } => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
AddExternalIp { ip, tx } => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
DeleteExternalIp { ip, tx } => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
RefreshExternalIps { tx } => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
}
};
tokio::select! {
biased;

request = terminate_rx.recv() => {
match request {
Some(TerminateRequest { tx, mark_failed }) => {
info!(
self.log,
"Received request to terminate instance \
while waiting on an ongoing request";
"request" => request_variant,
);
let result = tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.terminate(mark_failed).await)
}))
.map_err(|_| Error::FailedSendClientClosed);
if let Err(err) = result {
warn!(
self.log,
"Error handling request to terminate instance";
"err" => ?err,
);
}
break;
},
None => {
warn!(
self.log,
"Instance termination request channel closed; \
shutting down";
);
self.terminate(false).await;
break;
},
};
}

if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant.unwrap(),
"err" => ?err,

);
}
result = op => {
if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant,
"err" => ?err,
);
}
}
};
}

}
Expand Down Expand Up @@ -668,8 +715,7 @@ impl InstanceRunner {

// Anyone else who was trying to ask us to go die will be happy to learn
// that we have now done so!
while let Some(TerminateRequest { tx, .. }) =
self.terminate_rx.recv().await
while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await
{
let _ = tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.current_state()),
Expand Down Expand Up @@ -1323,6 +1369,18 @@ impl Instance {

let (tx, rx) = mpsc::channel(QUEUE_SIZE);
let (tx_monitor, rx_monitor) = mpsc::channel(1);

// Request channel for terminating the instance.
//
// This is a separate channel from the main request channel (`self.rx`)
// because we would like to be able to prioritize requests to terminate, and
// handle them even when the instance's main request channel may have filled
// up.
//
// Note also that this is *not* part of the `InstanceRunner` struct,
// because it's necessary to split mutable borrows in order to allow
// selecting between the actual instance operation (which must mutate
// the `InstanceRunner`) and awaiting a termination request.
let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE);

let metadata = propolis_client::types::InstanceMetadata {
Expand All @@ -1340,7 +1398,6 @@ impl Instance {
rx,
tx_monitor,
rx_monitor,
terminate_rx,
monitor_handle: None,
// NOTE: Mostly lies.
properties: propolis_client::types::InstanceProperties {
Expand Down Expand Up @@ -1380,7 +1437,7 @@ impl Instance {
};

let runner_handle =
tokio::task::spawn(async move { runner.run().await });
tokio::task::spawn(async move { runner.run(terminate_rx).await });

Ok(Instance {
id,
Expand Down

0 comments on commit 2a331ab

Please sign in to comment.