diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index dc4dd1ff54..06019cf810 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -409,14 +409,6 @@ struct InstanceRunner { // Request channel on which most instance requests are made. rx: mpsc::Receiver, - // Request channel for terminating the instance. - // - // This is a separate channel from the main request channel (`self.rx`) - // because we would like to be able to prioritize requests to terminate, and - // handle them even when the instance's main request channel may have filled - // up. - terminate_rx: mpsc::Receiver, - // Request channel on which monitor requests are made. tx_monitor: mpsc::Sender, rx_monitor: mpsc::Receiver, @@ -475,7 +467,7 @@ struct InstanceRunner { } impl InstanceRunner { - async fn run(mut self) { + async fn run(mut self, mut terminate_rx: mpsc::Receiver) { use InstanceRequest::*; while !self.should_terminate { tokio::select! { @@ -544,7 +536,7 @@ impl InstanceRunner { }, // Requests to terminate the instance take priority over any // other request to the instance. - request = self.terminate_rx.recv() => { + request = terminate_rx.recv() => { let Some(TerminateRequest { mark_failed, tx}) = request else { warn!( self.log, @@ -569,64 +561,119 @@ impl InstanceRunner { // Handle external requests to act upon the instance. request = self.rx.recv() => { - let request_variant = request.as_ref().map(|r| r.to_string()); - let result = match request { - Some(RequestZoneBundle { tx }) => { - tx.send(self.request_zone_bundle().await) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(GetFilesystemPool { tx } ) => { - tx.send(Ok(self.get_filesystem_zpool())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(CurrentState{ tx }) => { - tx.send(Ok(self.current_state())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(PutState{ state, tx }) => { - tx.send(self.put_state(state).await - .map(|r| VmmPutStateResponse { updated_runtime: Some(r) }) - .map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => { - tx.send( - self.issue_snapshot_request( - disk_id, - snapshot_id - ).await.map_err(|e| e.into()) - ) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(AddExternalIp { ip, tx }) => { - tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(DeleteExternalIp { ip, tx }) => { - tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(RefreshExternalIps { tx }) => { - tx.send(self.refresh_external_ips().map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - } + let request = match request { + Some(r) => r, None => { warn!(self.log, "Instance request channel closed; shutting down"); let mark_failed = false; self.terminate(mark_failed).await; break; - }, + } }; + let request_variant = request.to_string(); + // Okay, this is a little bit wacky: if we are waiting for + // one of the instance operations we run here to come back, + // and a termination request comes in, we want to give up on + // the outstanding operation and honor the termination + // request immediately. This is in case the instance + // operation has gotten stuck: we don't want it to prevent + // the instance from terminating because something else is + // wedged. + // + // Therefore, we're going to select between the future that + // actually performs the instance op and receiving another + // request from the termination channel. + let op = async { + match request { + RequestZoneBundle { tx } => { + tx.send(self.request_zone_bundle().await) + .map_err(|_| Error::FailedSendClientClosed) + }, + GetFilesystemPool { tx } => { + tx.send(Ok(self.get_filesystem_zpool())) + .map_err(|_| Error::FailedSendClientClosed) + }, + CurrentState{ tx } => { + tx.send(Ok(self.current_state())) + .map_err(|_| Error::FailedSendClientClosed) + }, + PutState{ state, tx } => { + tx.send(self.put_state(state).await + .map(|r| VmmPutStateResponse { updated_runtime: Some(r) }) + .map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + IssueSnapshotRequest { disk_id, snapshot_id, tx } => { + tx.send( + self.issue_snapshot_request( + disk_id, + snapshot_id + ).await.map_err(|e| e.into()) + ) + .map_err(|_| Error::FailedSendClientClosed) + }, + AddExternalIp { ip, tx } => { + tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + DeleteExternalIp { ip, tx } => { + tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + RefreshExternalIps { tx } => { + tx.send(self.refresh_external_ips().map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + } + } + }; + tokio::select! { + biased; + + request = terminate_rx.recv() => { + match request { + Some(TerminateRequest { tx, mark_failed }) => { + info!( + self.log, + "Received request to terminate instance \ + while waiting on an ongoing request"; + "request" => request_variant, + ); + let result = tx.send(Ok(VmmUnregisterResponse { + updated_runtime: Some(self.terminate(mark_failed).await) + })) + .map_err(|_| Error::FailedSendClientClosed); + if let Err(err) = result { + warn!( + self.log, + "Error handling request to terminate instance"; + "err" => ?err, + ); + } + break; + }, + None => { + warn!( + self.log, + "Instance termination request channel closed; \ + shutting down"; + ); + self.terminate(false).await; + break; + }, + }; + } - if let Err(err) = result { - warn!( - self.log, - "Error handling request"; - "request" => request_variant.unwrap(), - "err" => ?err, - - ); - } + result = op => { + if let Err(err) = result { + warn!( + self.log, + "Error handling request"; + "request" => request_variant, + "err" => ?err, + ); + } + } + }; } } @@ -668,8 +715,7 @@ impl InstanceRunner { // Anyone else who was trying to ask us to go die will be happy to learn // that we have now done so! - while let Some(TerminateRequest { tx, .. }) = - self.terminate_rx.recv().await + while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await { let _ = tx.send(Ok(VmmUnregisterResponse { updated_runtime: Some(self.current_state()), @@ -1323,6 +1369,18 @@ impl Instance { let (tx, rx) = mpsc::channel(QUEUE_SIZE); let (tx_monitor, rx_monitor) = mpsc::channel(1); + + // Request channel for terminating the instance. + // + // This is a separate channel from the main request channel (`self.rx`) + // because we would like to be able to prioritize requests to terminate, and + // handle them even when the instance's main request channel may have filled + // up. + // + // Note also that this is *not* part of the `InstanceRunner` struct, + // because it's necessary to split mutable borrows in order to allow + // selecting between the actual instance operation (which must mutate + // the `InstanceRunner`) and awaiting a termination request. let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE); let metadata = propolis_client::types::InstanceMetadata { @@ -1340,7 +1398,6 @@ impl Instance { rx, tx_monitor, rx_monitor, - terminate_rx, monitor_handle: None, // NOTE: Mostly lies. properties: propolis_client::types::InstanceProperties { @@ -1380,7 +1437,7 @@ impl Instance { }; let runner_handle = - tokio::task::spawn(async move { runner.run().await }); + tokio::task::spawn(async move { runner.run(terminate_rx).await }); Ok(Instance { id,