Skip to content

Commit

Permalink
[sled-agent] Give InstanceManager a gun (#6915)
Browse files Browse the repository at this point in the history
Stacked on top of #6913 

Presently, sled-agent sends requests to terminate an instance to the
`InstanceRunner` task over the same `tokio::sync::mpsc` request channel
as all other requests sent to that instance. This means that the
`InstanceRunner` will attempt to terminate the instance only once other
requests received before the termination request have been processed,
and an instance cannot be terminated if its request channel has filled
up. Similarly, if an instance's `InstanceRunner` task is waiting for an
in-flight request to the VMM to complete, the request to terminate the
instance will not be seen until the current request to Propolis has
returned. This means that if the instance has gotten stuck for some
reason --- e.g., because it is attempting a Crucible snapshot that
cannot complete because a physical disk has gone missing, as seen in
#6911 --- the instance cannot be terminated. Sadly, in this case, the
only way to resolve the stuck request is to terminate the instance, but
we cannot do so *because* the instance is stuck.

This seems unfortunate: Ii we try to kill an instance because it's doing
something that it will never be able to finish, it shouldn't be able to
say "no, you can't kill me, I'm too *busy* to die!". Instead, requests
to terminate the instance should be prioritized over other requests.
This commit does that.

Rather than sending termination requests to the `InstanceRunner` over
the same channel as all other requests, we instead introduce a separate
channel that's *just* for termination requests, which is preferred over
the request channel in the biased `tokio::select!` in the
`InstanceRunner` run loop. This means that a full request channel cannot
stop a termination request from being sent. When a request to the VMM is
in flight, the future that awaits that request's completion is now one
branch of a similar `tokio::select!` with the termination channel. This
way, if a termination request comes in while the `InstanceRunner` is
awaiting an in-flight instance operation, it will still be notified
immediately of the termination request, cancel whatever operation it's
waiting for, and go ahead and terminate the VMM immediately. This is the
correct behavior here, since the terminate operation is intended to
forcefully terminate the VMM *now*, and is used internally for purposes
such as `use_only_these_disks` killing instances that are using a
no-longer-extant disk, or the control plane requesting that the
sled-agent forcibly unregister the instance. "Normal" requests to stop
the instance gracefully will go through the `instance_put_state` API
instead, sending requests through the normal request channel and
allowing in flight operations to complete.
  • Loading branch information
hawkw authored Oct 30, 2024
1 parent bc41f54 commit e313b65
Showing 1 changed file with 199 additions and 74 deletions.
273 changes: 199 additions & 74 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ enum InstanceRequest {
state: VmmStateRequested,
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
},
Terminate {
mark_failed: bool,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
},
IssueSnapshotRequest {
disk_id: Uuid,
snapshot_id: Uuid,
Expand Down Expand Up @@ -293,9 +289,6 @@ impl InstanceRequest {
Self::PutState { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::Terminate { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::IssueSnapshotRequest { tx, .. }
| Self::AddExternalIp { tx, .. }
| Self::DeleteExternalIp { tx, .. }
Expand All @@ -306,6 +299,11 @@ impl InstanceRequest {
}
}

struct TerminateRequest {
mark_failed: bool,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
}

// A small task which tracks the state of the instance, by constantly querying
// the state of Propolis for updates.
//
Expand Down Expand Up @@ -469,7 +467,7 @@ struct InstanceRunner {
}

impl InstanceRunner {
async fn run(mut self) {
async fn run(mut self, mut terminate_rx: mpsc::Receiver<TerminateRequest>) {
use InstanceRequest::*;
while !self.should_terminate {
tokio::select! {
Expand Down Expand Up @@ -535,74 +533,103 @@ impl InstanceRunner {
self.terminate(mark_failed).await;
},
}

},
// Requests to terminate the instance take priority over any
// other request to the instance.
request = terminate_rx.recv() => {
self.handle_termination_request(request, None).await;
break;
}

// Handle external requests to act upon the instance.
request = self.rx.recv() => {
let request_variant = request.as_ref().map(|r| r.to_string());
let result = match request {
Some(RequestZoneBundle { tx }) => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(GetFilesystemPool { tx } ) => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(Terminate { mark_failed, tx }) => {
tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.terminate(mark_failed).await)
}))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(AddExternalIp { ip, tx }) => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(DeleteExternalIp { ip, tx }) => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(RefreshExternalIps { tx }) => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
let request = match request {
Some(r) => r,
None => {
warn!(self.log, "Instance request channel closed; shutting down");
let mark_failed = false;
self.terminate(mark_failed).await;
break;
},
}
};
let request_variant = request.to_string();
// Okay, this is a little bit wacky: if we are waiting for
// one of the instance operations we run here to come back,
// and a termination request comes in, we want to give up on
// the outstanding operation and honor the termination
// request immediately. This is in case the instance
// operation has gotten stuck: we don't want it to prevent
// the instance from terminating because something else is
// wedged.
//
// Therefore, we're going to select between the future that
// actually performs the instance op and receiving another
// request from the termination channel.
let op = async {
match request {
RequestZoneBundle { tx } => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
GetFilesystemPool { tx } => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
CurrentState{ tx } => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
PutState{ state, tx } => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
AddExternalIp { ip, tx } => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
DeleteExternalIp { ip, tx } => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
RefreshExternalIps { tx } => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
}
};
tokio::select! {
biased;

request = terminate_rx.recv() => {
self.handle_termination_request(
request,
Some(&request_variant),
).await;
break;
}

if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant.unwrap(),
"err" => ?err,

);
}
result = op => {
if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant,
"err" => ?err,
);
}
}
};
}

}
Expand All @@ -627,9 +654,6 @@ impl InstanceRunner {
PutState { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
Terminate { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
IssueSnapshotRequest { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
Expand All @@ -644,6 +668,15 @@ impl InstanceRunner {
}
};
}

// Anyone else who was trying to ask us to go die will be happy to learn
// that we have now done so!
while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await
{
let _ = tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.current_state()),
}));
}
}

/// Yields this instance's ID.
Expand Down Expand Up @@ -1193,6 +1226,12 @@ pub struct Instance {
/// loop.
tx: mpsc::Sender<InstanceRequest>,

/// Sender for requests to terminate the instance.
///
/// These are sent over a separate channel so that they can be prioritized
/// over all other requests to the instance.
terminate_tx: mpsc::Sender<TerminateRequest>,

/// This is reference-counted so that the `Instance` struct may be cloned.
#[allow(dead_code)]
runner_handle: Arc<tokio::task::JoinHandle<()>>,
Expand Down Expand Up @@ -1291,6 +1330,19 @@ impl Instance {
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
let (tx_monitor, rx_monitor) = mpsc::channel(1);

// Request channel for terminating the instance.
//
// This is a separate channel from the main request channel (`self.rx`)
// because we would like to be able to prioritize requests to terminate, and
// handle them even when the instance's main request channel may have filled
// up.
//
// Note also that this is *not* part of the `InstanceRunner` struct,
// because it's necessary to split mutable borrows in order to allow
// selecting between the actual instance operation (which must mutate
// the `InstanceRunner`) and awaiting a termination request.
let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE);

let metadata = propolis_client::types::InstanceMetadata {
project_id: metadata.project_id,
silo_id: metadata.silo_id,
Expand Down Expand Up @@ -1345,9 +1397,14 @@ impl Instance {
};

let runner_handle =
tokio::task::spawn(async move { runner.run().await });
tokio::task::spawn(async move { runner.run(terminate_rx).await });

Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
Ok(Instance {
id,
tx,
runner_handle: Arc::new(runner_handle),
terminate_tx,
})
}

pub fn id(&self) -> InstanceUuid {
Expand Down Expand Up @@ -1410,9 +1467,19 @@ impl Instance {
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
mark_failed: bool,
) -> Result<(), Error> {
self.tx
.try_send(InstanceRequest::Terminate { mark_failed, tx })
.or_else(InstanceRequest::fail_try_send)
self.terminate_tx
.try_send(TerminateRequest { mark_failed, tx })
.or_else(|err| match err {
mpsc::error::TrySendError::Closed(TerminateRequest {
tx,
..
}) => tx.send(Err(Error::FailedSendChannelClosed.into())),
mpsc::error::TrySendError::Full(TerminateRequest {
tx,
..
}) => tx.send(Err(Error::FailedSendChannelFull.into())),
})
.map_err(|_| Error::FailedSendClientClosed)
}

pub fn issue_snapshot_request(
Expand Down Expand Up @@ -1749,6 +1816,64 @@ impl InstanceRunner {
Ok(PropolisSetup { client, running_zone })
}

async fn handle_termination_request(
&mut self,
req: Option<TerminateRequest>,
current_req: Option<&str>,
) {
match req {
Some(TerminateRequest { tx, mark_failed }) => {
if let Some(request) = current_req {
info!(
self.log,
"Received request to terminate instance while waiting \
on an ongoing request";
"request" => %request,
"mark_failed" => mark_failed,
);
} else {
info!(
self.log,
"Received request to terminate instance";
"mark_failed" => mark_failed,
);
}

let result = tx
.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(
self.terminate(mark_failed).await,
),
}))
.map_err(|_| Error::FailedSendClientClosed);
if let Err(err) = result {
warn!(
self.log,
"Error handling request to terminate instance";
"err" => ?err,
);
}
}
None => {
if let Some(request) = current_req {
warn!(
self.log,
"Instance termination request channel closed while \
waiting on an ongoing request; shutting down";
"request" => %request,
);
} else {
warn!(
self.log,
"Instance termination request channel closed; \
shutting down";
);
}
self.terminate(false).await;
}
};
}

async fn terminate(&mut self, mark_failed: bool) -> SledVmmState {
self.terminate_inner().await;
self.state.terminate_rudely(mark_failed);
Expand Down

0 comments on commit e313b65

Please sign in to comment.