Skip to content

Commit

Permalink
[sled-agent] Don't block InstanceManager on full MPSCs (#6913)
Browse files Browse the repository at this point in the history
Sled-agent's `InstanceManager` task is responsible for managing the
table of all instances presently running on the sled. When the
sled-agent receives a request relating to an individual instance on the
sled, it's sent to the `InstanceManager` over a `tokio::sync::mpsc`
channel, and is then dispatched by the `InstanceManager` to the
`InstanceRunner` task responsible for that individual instance by
sending it over a *second* `tokio::sync::mpsc` channel. This is where
things start to get interesting.[^1]

`tokio::sync::mpsc` is a *bounded* channel: there is a maximum number of
messages which may be queued by a given MPSC channel at any given time.
The `mpsc::Sender::send` method is an `async fn`, and if the channel is
at capacity, that method will _wait_ until there is once again space in
the channel to send the message being sent. Presently,
`mpsc::Sender::send` is called by the `InstanceManager`'s main run loop
when dispatching a request to an individual instance. As you may have
already started to piece together, this means that if a given
`InstanceRunner` task is not able to process requests fast enough to
drain its channel, the entire `InstanceManager` loop will wait when
dispatching a request to that instance until the queue has been drained.
This means that if one instance's runner task has gotten stuck on
something, like waiting for a Crucible flush that will never complete
(as seen in #6911), that instance will prevent requests being dispatched
to *any other instance* managed by the sled-agent. This is quite
unfortunate!

This commit fixes this behavior by changing the functions that send
requests to an individual instance's task to instead *shed load* when
that instance's request queue is full. We now use the
`mpsc::Sender::try_send` method, rather than `mpsc::Sender::send`, which
does not wait and instead immediately returns an error when the channel
is full. This allows the `InstanceManager` to instead return an error to
the client indicating the channel is full, and move on to processing
requests to other instances which may not be stuck. Thus, a single stuck
instance can no longer block requests from being dispatched to other,
perfectly fine instances.

The error returned when the channel is at capacity is converted to an
HTTP 503 Service Unavailable error by the API. This indicates to the
client that their request to that instance was not able to be processed
at this time, but that it may be processed successfully in the
future.[^2] Now, we can shed load while allowing clients to retry later,
which seems much better than the present situation.

[^1]: In the sense of "may you live in interesting times", naturally.
[^2]: I also considered returning 429 Too Many Requests here, but my
    understanding is that that status code is supposed to indicate that
    too many requests have been received from *that specific client*. In
    this case, we haven't hit a per-client rate limit; we're just
    overloaded by requests more broadly, so it's not that particular
    client's fault.
  • Loading branch information
hawkw authored Oct 25, 2024
1 parent f498e47 commit 78ee981
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 92 deletions.
194 changes: 134 additions & 60 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub enum Error {
#[error("Failed to send request to Instance: Channel closed")]
FailedSendChannelClosed,

#[error(
"Failed to send request to Instance: channel at capacity \
({QUEUE_SIZE})"
)]
FailedSendChannelFull,

#[error(
"Failed to send request from Instance Runner: Client Channel closed"
)]
Expand Down Expand Up @@ -217,10 +223,10 @@ enum InstanceRequest {
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
},
GetFilesystemPool {
tx: oneshot::Sender<Option<ZpoolName>>,
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
},
CurrentState {
tx: oneshot::Sender<SledVmmState>,
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
},
PutState {
state: VmmStateRequested,
Expand Down Expand Up @@ -248,6 +254,58 @@ enum InstanceRequest {
},
}

impl InstanceRequest {
/// Handle an error returned by [`mpsc::Sender::try_send`] when attempting
/// to send a request to the instance.
///
/// This is a bit complex: the returned [`mpsc::error::TrySendError`] will
/// contain the [`InstanceRequest`] we were trying to send, and thus the
/// [`oneshot::Sender`] for that request's response. This function handles
/// the `TrySendError` by inspecting the error to determine whether the
/// channel has closed or is full, constructing the relevant [`Error`], and
/// extracting the response oneshot channel from the request, and then
/// sending back the error over that channel.
///
/// If sending the error back to the client fails, this function returns an
/// error, so that the client having given up can be logged; otherwise, it returns `Ok(())`.
fn fail_try_send(
err: mpsc::error::TrySendError<Self>,
) -> Result<(), Error> {
let (error, this) = match err {
mpsc::error::TrySendError::Closed(this) => {
(Error::FailedSendChannelClosed, this)
}
mpsc::error::TrySendError::Full(this) => {
(Error::FailedSendChannelFull, this)
}
};

match this {
Self::RequestZoneBundle { tx } => tx
.send(Err(BundleError::FailedSend(anyhow!(error))))
.map_err(|_| Error::FailedSendClientClosed),
Self::GetFilesystemPool { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::CurrentState { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::PutState { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::Terminate { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::IssueSnapshotRequest { tx, .. }
| Self::AddExternalIp { tx, .. }
| Self::DeleteExternalIp { tx, .. }
| Self::RefreshExternalIps { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
}
}
}

// A small task which tracks the state of the instance, by constantly querying
// the state of Propolis for updates.
//
Expand Down Expand Up @@ -488,11 +546,11 @@ impl InstanceRunner {
.map_err(|_| Error::FailedSendClientClosed)
},
Some(GetFilesystemPool { tx } ) => {
tx.send(self.get_filesystem_zpool())
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
tx.send(self.current_state())
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
Expand Down Expand Up @@ -562,9 +620,9 @@ impl InstanceRunner {
RequestZoneBundle { tx } => tx
.send(Err(BundleError::InstanceTerminating))
.map_err(|_| ()),
GetFilesystemPool { tx } => tx.send(None).map_err(|_| ()),
GetFilesystemPool { tx } => tx.send(Ok(None)).map_err(|_| ()),
CurrentState { tx } => {
tx.send(self.current_state()).map_err(|_| ())
tx.send(Ok(self.current_state())).map_err(|_| ())
}
PutState { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
Expand Down Expand Up @@ -1092,13 +1150,48 @@ fn propolis_error_code(
}

/// Describes a single Propolis server that incarnates a specific instance.
#[derive(Clone)]
pub struct Instance {
id: InstanceUuid,

/// Request channel for communicating with the instance task.
///
/// # Extremely Serious Warning
///
/// This channel is used by the `InstanceManager` task to communicate to the
/// instance task corresponding to each instance on this sled. Note that all
/// of the methods on this type which send [`InstanceRequest`]s over this
/// channel use [`mpsc::Sender::try_send`], which fails if the channel is at
/// capacity, and *not* [`mpsc::Sender::send`], which is an async method
/// that *waits* until capacity is available. THIS IS VERY IMPORTANT.
///
/// This is because the `InstanceManager` task will call these methods in
/// its request-processing loop as it receives requests from clients, in
/// order to forward the request to the relevant instance. If the instance's
/// channel has filled up because the instance is currently processing a
/// slow request, `await`ing a call to [`mpsc::Sender::send`] will block the
/// `InstanceManager`'s main loop from proceeding until the instance task
/// has finished what it's doing and drained the next request from channel.
/// Critically, this means that requests to *other, unrelated instances* on
/// this sled would have to wait until this instance has finished what it's
/// doing. That means a single deadlocked instance task, which is waiting
/// for something that never completes, can render *all* instances on this
/// sled inaccessible.
///
/// Therefore, any time we send requests to the `Instance` over this channel
/// from code that's called in the `InstanceManager`'s run loop MUST use
/// [`mpsc::Sender::try_send`] rather than [`mpsc::Sender::send`]. Should
/// the channel be at capacity, we return an
/// [`Error::FailedSendChannelFull`], which eventually becomes a 503 Service
/// Unavailable error when returned to the client. It is acceptable to call
/// [`mpsc::Sender::send`] on this channel ONLY from code which runs
/// exclusively in tasks that are not blocking the `InstanceManager`'s run
/// loop.
tx: mpsc::Sender<InstanceRequest>,

/// This is reference-counted so that the `Instance` struct may be cloned.
#[allow(dead_code)]
runner_handle: tokio::task::JoinHandle<()>,
runner_handle: Arc<tokio::task::JoinHandle<()>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -1250,43 +1343,39 @@ impl Instance {
let runner_handle =
tokio::task::spawn(async move { runner.run().await });

Ok(Instance { id, tx, runner_handle })
Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
}

pub fn id(&self) -> InstanceUuid {
self.id
}

/// Create bundle from an instance zone.
pub async fn request_zone_bundle(
pub fn request_zone_bundle(
&self,
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
) -> Result<(), BundleError> {
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::RequestZoneBundle { tx })
.await
.map_err(|err| BundleError::FailedSend(anyhow!(err)))?;
Ok(())
.try_send(InstanceRequest::RequestZoneBundle { tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn get_filesystem_zpool(
pub fn get_filesystem_zpool(
&self,
) -> Result<Option<ZpoolName>, Error> {
let (tx, rx) = oneshot::channel();
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::GetFilesystemPool { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(rx.await?)
.try_send(InstanceRequest::GetFilesystemPool { tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn current_state(&self) -> Result<SledVmmState, Error> {
let (tx, rx) = oneshot::channel();
pub fn current_state(
&self,
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::CurrentState { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(rx.await?)
.try_send(InstanceRequest::CurrentState { tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Attempts to update the current state of the instance by launching a
Expand All @@ -1300,84 +1389,72 @@ impl Instance {
/// instance begins to stop when Propolis has just begun to handle a prior
/// request to reboot, the instance's state may proceed from Stopping to
/// Rebooting to Running to Stopping to Stopped.
pub async fn put_state(
pub fn put_state(
&self,
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
state: VmmStateRequested,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::PutState { state, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::PutState { state, tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Rudely terminates this instance's Propolis (if it has one) and
/// immediately transitions the instance to the Destroyed state.
pub async fn terminate(
pub fn terminate(
&self,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
mark_failed: bool,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::Terminate { mark_failed, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::Terminate { mark_failed, tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn issue_snapshot_request(
pub fn issue_snapshot_request(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
disk_id: Uuid,
snapshot_id: Uuid,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::IssueSnapshotRequest {
.try_send(InstanceRequest::IssueSnapshotRequest {
disk_id,
snapshot_id,
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.or_else(InstanceRequest::fail_try_send)
}

pub async fn add_external_ip(
pub fn add_external_ip(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::AddExternalIp { ip: *ip, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::AddExternalIp { ip: *ip, tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn delete_external_ip(
pub fn delete_external_ip(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Reinstalls an instance's set of external IPs within OPTE, using
/// up-to-date IP<->IGW mappings. This will not disrupt existing flows.
pub async fn refresh_external_ips(
pub fn refresh_external_ips(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::RefreshExternalIps { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::RefreshExternalIps { tx })
.or_else(InstanceRequest::fail_try_send)
}
}

Expand Down Expand Up @@ -2104,7 +2181,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, start our "instance"
// (backed by fakes and propolis_mock_server)
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

// even though we ignore this result at instance creation time in
Expand Down Expand Up @@ -2198,7 +2274,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, try in vain to start
// our "instance", but no propolis server is running
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

let timeout_fut = timeout(TIMEOUT_DURATION, put_rx);
Expand Down Expand Up @@ -2305,7 +2380,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, try in vain to start
// our "instance", but the zone never finishes installing
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

// Timeout our future waiting for the instance-state-change at 1s. This
Expand Down
Loading

0 comments on commit 78ee981

Please sign in to comment.