diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index 8a4c5cf669..b47aeb6508 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -119,6 +119,12 @@ pub enum Error { #[error("Failed to send request to Instance: Channel closed")] FailedSendChannelClosed, + #[error( + "Failed to send request to Instance: channel at capacity \ + ({QUEUE_SIZE})" + )] + FailedSendChannelFull, + #[error( "Failed to send request from Instance Runner: Client Channel closed" )] @@ -217,10 +223,10 @@ enum InstanceRequest { tx: oneshot::Sender>, }, GetFilesystemPool { - tx: oneshot::Sender>, + tx: oneshot::Sender, ManagerError>>, }, CurrentState { - tx: oneshot::Sender, + tx: oneshot::Sender>, }, PutState { state: VmmStateRequested, @@ -248,6 +254,58 @@ enum InstanceRequest { }, } +impl InstanceRequest { + /// Handle an error returned by [`mpsc::Sender::try_send`] when attempting + /// to send a request to the instance. + /// + /// This is a bit complex: the returned [`mpsc::error::TrySendError`] will + /// contain the [`InstanceRequest`] we were trying to send, and thus the + /// [`oneshot::Sender`] for that request's response. This function handles + /// the `TrySendError` by inspecting the error to determine whether the + /// channel has closed or is full, constructing the relevant [`Error`], and + /// extracting the response oneshot channel from the request, and then + /// sending back the error over that channel. + /// + /// If sending the error back to the client fails, this function returns an + /// error, so that the client having given up can be logged; otherwise, it returns `Ok(())`. + fn fail_try_send( + err: mpsc::error::TrySendError, + ) -> Result<(), Error> { + let (error, this) = match err { + mpsc::error::TrySendError::Closed(this) => { + (Error::FailedSendChannelClosed, this) + } + mpsc::error::TrySendError::Full(this) => { + (Error::FailedSendChannelFull, this) + } + }; + + match this { + Self::RequestZoneBundle { tx } => tx + .send(Err(BundleError::FailedSend(anyhow!(error)))) + .map_err(|_| Error::FailedSendClientClosed), + Self::GetFilesystemPool { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::CurrentState { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::PutState { tx, .. } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::Terminate { tx, .. } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::IssueSnapshotRequest { tx, .. } + | Self::AddExternalIp { tx, .. } + | Self::DeleteExternalIp { tx, .. } + | Self::RefreshExternalIps { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + } + } +} + // A small task which tracks the state of the instance, by constantly querying // the state of Propolis for updates. // @@ -488,11 +546,11 @@ impl InstanceRunner { .map_err(|_| Error::FailedSendClientClosed) }, Some(GetFilesystemPool { tx } ) => { - tx.send(self.get_filesystem_zpool()) + tx.send(Ok(self.get_filesystem_zpool())) .map_err(|_| Error::FailedSendClientClosed) }, Some(CurrentState{ tx }) => { - tx.send(self.current_state()) + tx.send(Ok(self.current_state())) .map_err(|_| Error::FailedSendClientClosed) }, Some(PutState{ state, tx }) => { @@ -562,9 +620,9 @@ impl InstanceRunner { RequestZoneBundle { tx } => tx .send(Err(BundleError::InstanceTerminating)) .map_err(|_| ()), - GetFilesystemPool { tx } => tx.send(None).map_err(|_| ()), + GetFilesystemPool { tx } => tx.send(Ok(None)).map_err(|_| ()), CurrentState { tx } => { - tx.send(self.current_state()).map_err(|_| ()) + tx.send(Ok(self.current_state())).map_err(|_| ()) } PutState { tx, .. } => { tx.send(Err(Error::Terminating.into())).map_err(|_| ()) @@ -1092,13 +1150,48 @@ fn propolis_error_code( } /// Describes a single Propolis server that incarnates a specific instance. +#[derive(Clone)] pub struct Instance { id: InstanceUuid, + /// Request channel for communicating with the instance task. + /// + /// # Extremely Serious Warning + /// + /// This channel is used by the `InstanceManager` task to communicate to the + /// instance task corresponding to each instance on this sled. Note that all + /// of the methods on this type which send [`InstanceRequest`]s over this + /// channel use [`mpsc::Sender::try_send`], which fails if the channel is at + /// capacity, and *not* [`mpsc::Sender::send`], which is an async method + /// that *waits* until capacity is available. THIS IS VERY IMPORTANT. + /// + /// This is because the `InstanceManager` task will call these methods in + /// its request-processing loop as it receives requests from clients, in + /// order to forward the request to the relevant instance. If the instance's + /// channel has filled up because the instance is currently processing a + /// slow request, `await`ing a call to [`mpsc::Sender::send`] will block the + /// `InstanceManager`'s main loop from proceeding until the instance task + /// has finished what it's doing and drained the next request from channel. + /// Critically, this means that requests to *other, unrelated instances* on + /// this sled would have to wait until this instance has finished what it's + /// doing. That means a single deadlocked instance task, which is waiting + /// for something that never completes, can render *all* instances on this + /// sled inaccessible. + /// + /// Therefore, any time we send requests to the `Instance` over this channel + /// from code that's called in the `InstanceManager`'s run loop MUST use + /// [`mpsc::Sender::try_send`] rather than [`mpsc::Sender::send`]. Should + /// the channel be at capacity, we return an + /// [`Error::FailedSendChannelFull`], which eventually becomes a 503 Service + /// Unavailable error when returned to the client. It is acceptable to call + /// [`mpsc::Sender::send`] on this channel ONLY from code which runs + /// exclusively in tasks that are not blocking the `InstanceManager`'s run + /// loop. tx: mpsc::Sender, + /// This is reference-counted so that the `Instance` struct may be cloned. #[allow(dead_code)] - runner_handle: tokio::task::JoinHandle<()>, + runner_handle: Arc>, } #[derive(Debug)] @@ -1250,7 +1343,7 @@ impl Instance { let runner_handle = tokio::task::spawn(async move { runner.run().await }); - Ok(Instance { id, tx, runner_handle }) + Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) }) } pub fn id(&self) -> InstanceUuid { @@ -1258,35 +1351,31 @@ impl Instance { } /// Create bundle from an instance zone. - pub async fn request_zone_bundle( + pub fn request_zone_bundle( &self, tx: oneshot::Sender>, - ) -> Result<(), BundleError> { + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::RequestZoneBundle { tx }) - .await - .map_err(|err| BundleError::FailedSend(anyhow!(err)))?; - Ok(()) + .try_send(InstanceRequest::RequestZoneBundle { tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn get_filesystem_zpool( + pub fn get_filesystem_zpool( &self, - ) -> Result, Error> { - let (tx, rx) = oneshot::channel(); + tx: oneshot::Sender, ManagerError>>, + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::GetFilesystemPool { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(rx.await?) + .try_send(InstanceRequest::GetFilesystemPool { tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn current_state(&self) -> Result { - let (tx, rx) = oneshot::channel(); + pub fn current_state( + &self, + tx: oneshot::Sender>, + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::CurrentState { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(rx.await?) + .try_send(InstanceRequest::CurrentState { tx }) + .or_else(InstanceRequest::fail_try_send) } /// Attempts to update the current state of the instance by launching a @@ -1300,84 +1389,72 @@ impl Instance { /// instance begins to stop when Propolis has just begun to handle a prior /// request to reboot, the instance's state may proceed from Stopping to /// Rebooting to Running to Stopping to Stopped. - pub async fn put_state( + pub fn put_state( &self, tx: oneshot::Sender>, state: VmmStateRequested, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::PutState { state, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::PutState { state, tx }) + .or_else(InstanceRequest::fail_try_send) } /// Rudely terminates this instance's Propolis (if it has one) and /// immediately transitions the instance to the Destroyed state. - pub async fn terminate( + pub fn terminate( &self, tx: oneshot::Sender>, mark_failed: bool, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::Terminate { mark_failed, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::Terminate { mark_failed, tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn issue_snapshot_request( + pub fn issue_snapshot_request( &self, tx: oneshot::Sender>, disk_id: Uuid, snapshot_id: Uuid, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::IssueSnapshotRequest { + .try_send(InstanceRequest::IssueSnapshotRequest { disk_id, snapshot_id, tx, }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .or_else(InstanceRequest::fail_try_send) } - pub async fn add_external_ip( + pub fn add_external_ip( &self, tx: oneshot::Sender>, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::AddExternalIp { ip: *ip, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::AddExternalIp { ip: *ip, tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn delete_external_ip( + pub fn delete_external_ip( &self, tx: oneshot::Sender>, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::DeleteExternalIp { ip: *ip, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::DeleteExternalIp { ip: *ip, tx }) + .or_else(InstanceRequest::fail_try_send) } /// Reinstalls an instance's set of external IPs within OPTE, using /// up-to-date IP<->IGW mappings. This will not disrupt existing flows. - pub async fn refresh_external_ips( + pub fn refresh_external_ips( &self, tx: oneshot::Sender>, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::RefreshExternalIps { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::RefreshExternalIps { tx }) + .or_else(InstanceRequest::fail_try_send) } } @@ -2104,7 +2181,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, start our "instance" // (backed by fakes and propolis_mock_server) inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); // even though we ignore this result at instance creation time in @@ -2198,7 +2274,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, try in vain to start // our "instance", but no propolis server is running inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); let timeout_fut = timeout(TIMEOUT_DURATION, put_rx); @@ -2305,7 +2380,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, try in vain to start // our "instance", but the zone never finishes installing inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); // Timeout our future waiting for the instance-state-change at 1s. This diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index af09def0c1..34fb8e493d 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -465,25 +465,25 @@ impl InstanceManagerRunner { tx.send(self.ensure_registered(propolis_id, instance, *sled_identifiers).await).map_err(|_| Error::FailedSendClientClosed) }, Some(EnsureUnregistered { propolis_id, tx }) => { - self.ensure_unregistered(tx, propolis_id).await + self.ensure_unregistered(tx, propolis_id) }, Some(EnsureState { propolis_id, target, tx }) => { - self.ensure_state(tx, propolis_id, target).await + self.ensure_state(tx, propolis_id, target) }, Some(IssueDiskSnapshot { propolis_id, disk_id, snapshot_id, tx }) => { - self.issue_disk_snapshot_request(tx, propolis_id, disk_id, snapshot_id).await + self.issue_disk_snapshot_request(tx, propolis_id, disk_id, snapshot_id) }, Some(CreateZoneBundle { name, tx }) => { - self.create_zone_bundle(tx, &name).await.map_err(Error::from) + self.create_zone_bundle(tx, &name).map_err(Error::from) }, Some(AddExternalIp { propolis_id, ip, tx }) => { - self.add_external_ip(tx, propolis_id, &ip).await + self.add_external_ip(tx, propolis_id, &ip) }, Some(DeleteExternalIp { propolis_id, ip, tx }) => { - self.delete_external_ip(tx, propolis_id, &ip).await + self.delete_external_ip(tx, propolis_id, &ip) }, Some(RefreshExternalIps { tx }) => { - self.refresh_external_ips(tx).await + self.refresh_external_ips(tx) } Some(GetState { propolis_id, tx }) => { // TODO(eliza): it could potentially be nice to @@ -491,7 +491,7 @@ impl InstanceManagerRunner { // than having to force `GetState` requests to // serialize with the requests that actually update // the state... - self.get_instance_state(tx, propolis_id).await + self.get_instance_state(tx, propolis_id) }, Some(OnlyUseDisks { disks, tx } ) => { self.use_only_these_disks(disks).await; @@ -631,14 +631,15 @@ impl InstanceManagerRunner { &self.jobs.get(&propolis_id).unwrap() } }; - - Ok(instance.current_state().await?) + let (tx, rx) = oneshot::channel(); + instance.current_state(tx)?; + rx.await? } /// Idempotently ensures this VM is not registered with this instance /// manager. If this Propolis job is registered and has a running zone, the /// zone is rudely terminated. - async fn ensure_unregistered( + fn ensure_unregistered( &mut self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -653,13 +654,13 @@ impl InstanceManagerRunner { // Otherwise, we pipeline the request, and send it to the instance, // where it can receive an appropriate response. let mark_failed = false; - instance.terminate(tx, mark_failed).await?; + instance.terminate(tx, mark_failed)?; Ok(()) } /// Idempotently attempts to drive the supplied Propolis into the supplied /// runtime state. - async fn ensure_state( + fn ensure_state( &mut self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -670,11 +671,11 @@ impl InstanceManagerRunner { .map_err(|_| Error::FailedSendClientClosed)?; return Ok(()); }; - instance.put_state(tx, target).await?; + instance.put_state(tx, target)?; Ok(()) } - async fn issue_disk_snapshot_request( + fn issue_disk_snapshot_request( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -686,12 +687,11 @@ impl InstanceManagerRunner { instance .issue_snapshot_request(tx, disk_id, snapshot_id) - .await .map_err(Error::from) } /// Create a zone bundle from a named instance zone, if it exists. - async fn create_zone_bundle( + fn create_zone_bundle( &self, tx: oneshot::Sender>, name: &str, @@ -711,10 +711,13 @@ impl InstanceManagerRunner { let Some(instance) = self.jobs.get(&vmm_id) else { return Err(BundleError::NoSuchZone { name: name.to_string() }); }; - instance.request_zone_bundle(tx).await + instance + .request_zone_bundle(tx) + .map_err(|e| BundleError::FailedSend(anyhow!(e)))?; + Ok(()) } - async fn add_external_ip( + fn add_external_ip( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -723,11 +726,11 @@ impl InstanceManagerRunner { let Some(instance) = self.get_propolis(propolis_id) else { return Err(Error::NoSuchVmm(propolis_id)); }; - instance.add_external_ip(tx, ip).await?; + instance.add_external_ip(tx, ip)?; Ok(()) } - async fn delete_external_ip( + fn delete_external_ip( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -737,18 +740,18 @@ impl InstanceManagerRunner { return Err(Error::NoSuchVmm(propolis_id)); }; - instance.delete_external_ip(tx, ip).await?; + instance.delete_external_ip(tx, ip)?; Ok(()) } - async fn refresh_external_ips( + fn refresh_external_ips( &self, tx: oneshot::Sender>, ) -> Result<(), Error> { let mut channels = vec![]; for (_, instance) in &self.jobs { let (tx, rx_new) = oneshot::channel(); - instance.refresh_external_ips(tx).await?; + instance.refresh_external_ips(tx)?; channels.push(rx_new); } @@ -766,7 +769,7 @@ impl InstanceManagerRunner { Ok(()) } - async fn get_instance_state( + fn get_instance_state( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -776,9 +779,7 @@ impl InstanceManagerRunner { .send(Err(Error::NoSuchVmm(propolis_id))) .map_err(|_| Error::FailedSendClientClosed); }; - - let state = instance.current_state().await?; - tx.send(Ok(state)).map_err(|_| Error::FailedSendClientClosed)?; + instance.current_state(tx)?; Ok(()) } @@ -804,9 +805,13 @@ impl InstanceManagerRunner { for (id, instance) in self.jobs.iter() { // If we can read the filesystem pool, consider it. Otherwise, move // on, to prevent blocking the cleanup of other instances. - let Ok(Some(filesystem_pool)) = - instance.get_filesystem_zpool().await - else { + // TODO(eliza): clone each instance and spawn a task to handle it, + // so that a single misbehaving instance cannot block the instance + // manager's run loop... + let (tx, rx) = oneshot::channel(); + // This will fail if the tx has been dropped, which we just...don't do. + let _ = instance.get_filesystem_zpool(tx); + let Ok(Ok(Some(filesystem_pool))) = rx.await else { info!(self.log, "use_only_these_disks: Cannot read filesystem pool"; "instance_id" => ?id); continue; }; @@ -820,7 +825,7 @@ impl InstanceManagerRunner { if let Some(instance) = self.jobs.remove(&id) { let (tx, rx) = oneshot::channel(); let mark_failed = true; - if let Err(e) = instance.terminate(tx, mark_failed).await { + if let Err(e) = instance.terminate(tx, mark_failed) { warn!(self.log, "use_only_these_disks: Failed to request instance removal"; "err" => ?e); continue; } diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 4a4be08f76..8a5b15aaaf 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -186,11 +186,21 @@ impl From for omicron_common::api::external::Error { impl From for dropshot::HttpError { fn from(err: Error) -> Self { const NO_SUCH_INSTANCE: &str = "NO_SUCH_INSTANCE"; + const INSTANCE_CHANNEL_FULL: &str = "INSTANCE_CHANNEL_FULL"; match err { Error::Instance(crate::instance_manager::Error::Instance( instance_error, )) => { match instance_error { + // The instance's request channel is full, so it cannot + // currently process this request. Shed load, but indicate + // to the client that it can try again later. + err @ crate::instance::Error::FailedSendChannelFull => { + HttpError::for_unavail( + Some(INSTANCE_CHANNEL_FULL.to_string()), + err.to_string(), + ) + } crate::instance::Error::Propolis(propolis_error) => { // Work around dropshot#693: HttpError::for_status // only accepts client errors and asserts on server