diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index b47aeb6508..521f458d6c 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -232,10 +232,6 @@ enum InstanceRequest { state: VmmStateRequested, tx: oneshot::Sender>, }, - Terminate { - mark_failed: bool, - tx: oneshot::Sender>, - }, IssueSnapshotRequest { disk_id: Uuid, snapshot_id: Uuid, @@ -293,9 +289,6 @@ impl InstanceRequest { Self::PutState { tx, .. } => tx .send(Err(error.into())) .map_err(|_| Error::FailedSendClientClosed), - Self::Terminate { tx, .. } => tx - .send(Err(error.into())) - .map_err(|_| Error::FailedSendClientClosed), Self::IssueSnapshotRequest { tx, .. } | Self::AddExternalIp { tx, .. } | Self::DeleteExternalIp { tx, .. } @@ -306,6 +299,11 @@ impl InstanceRequest { } } +struct TerminateRequest { + mark_failed: bool, + tx: oneshot::Sender>, +} + // A small task which tracks the state of the instance, by constantly querying // the state of Propolis for updates. // @@ -469,7 +467,7 @@ struct InstanceRunner { } impl InstanceRunner { - async fn run(mut self) { + async fn run(mut self, mut terminate_rx: mpsc::Receiver) { use InstanceRequest::*; while !self.should_terminate { tokio::select! { @@ -535,74 +533,103 @@ impl InstanceRunner { self.terminate(mark_failed).await; }, } - }, + // Requests to terminate the instance take priority over any + // other request to the instance. + request = terminate_rx.recv() => { + self.handle_termination_request(request, None).await; + break; + } + // Handle external requests to act upon the instance. request = self.rx.recv() => { - let request_variant = request.as_ref().map(|r| r.to_string()); - let result = match request { - Some(RequestZoneBundle { tx }) => { - tx.send(self.request_zone_bundle().await) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(GetFilesystemPool { tx } ) => { - tx.send(Ok(self.get_filesystem_zpool())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(CurrentState{ tx }) => { - tx.send(Ok(self.current_state())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(PutState{ state, tx }) => { - tx.send(self.put_state(state).await - .map(|r| VmmPutStateResponse { updated_runtime: Some(r) }) - .map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(Terminate { mark_failed, tx }) => { - tx.send(Ok(VmmUnregisterResponse { - updated_runtime: Some(self.terminate(mark_failed).await) - })) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => { - tx.send( - self.issue_snapshot_request( - disk_id, - snapshot_id - ).await.map_err(|e| e.into()) - ) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(AddExternalIp { ip, tx }) => { - tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(DeleteExternalIp { ip, tx }) => { - tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - }, - Some(RefreshExternalIps { tx }) => { - tx.send(self.refresh_external_ips().map_err(|e| e.into())) - .map_err(|_| Error::FailedSendClientClosed) - } + let request = match request { + Some(r) => r, None => { warn!(self.log, "Instance request channel closed; shutting down"); let mark_failed = false; self.terminate(mark_failed).await; break; - }, + } }; + let request_variant = request.to_string(); + // Okay, this is a little bit wacky: if we are waiting for + // one of the instance operations we run here to come back, + // and a termination request comes in, we want to give up on + // the outstanding operation and honor the termination + // request immediately. This is in case the instance + // operation has gotten stuck: we don't want it to prevent + // the instance from terminating because something else is + // wedged. + // + // Therefore, we're going to select between the future that + // actually performs the instance op and receiving another + // request from the termination channel. + let op = async { + match request { + RequestZoneBundle { tx } => { + tx.send(self.request_zone_bundle().await) + .map_err(|_| Error::FailedSendClientClosed) + }, + GetFilesystemPool { tx } => { + tx.send(Ok(self.get_filesystem_zpool())) + .map_err(|_| Error::FailedSendClientClosed) + }, + CurrentState{ tx } => { + tx.send(Ok(self.current_state())) + .map_err(|_| Error::FailedSendClientClosed) + }, + PutState{ state, tx } => { + tx.send(self.put_state(state).await + .map(|r| VmmPutStateResponse { updated_runtime: Some(r) }) + .map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + IssueSnapshotRequest { disk_id, snapshot_id, tx } => { + tx.send( + self.issue_snapshot_request( + disk_id, + snapshot_id + ).await.map_err(|e| e.into()) + ) + .map_err(|_| Error::FailedSendClientClosed) + }, + AddExternalIp { ip, tx } => { + tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + DeleteExternalIp { ip, tx } => { + tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + }, + RefreshExternalIps { tx } => { + tx.send(self.refresh_external_ips().map_err(|e| e.into())) + .map_err(|_| Error::FailedSendClientClosed) + } + } + }; + tokio::select! { + biased; + + request = terminate_rx.recv() => { + self.handle_termination_request( + request, + Some(&request_variant), + ).await; + break; + } - if let Err(err) = result { - warn!( - self.log, - "Error handling request"; - "request" => request_variant.unwrap(), - "err" => ?err, - - ); - } + result = op => { + if let Err(err) = result { + warn!( + self.log, + "Error handling request"; + "request" => request_variant, + "err" => ?err, + ); + } + } + }; } } @@ -627,9 +654,6 @@ impl InstanceRunner { PutState { tx, .. } => { tx.send(Err(Error::Terminating.into())).map_err(|_| ()) } - Terminate { tx, .. } => { - tx.send(Err(Error::Terminating.into())).map_err(|_| ()) - } IssueSnapshotRequest { tx, .. } => { tx.send(Err(Error::Terminating.into())).map_err(|_| ()) } @@ -644,6 +668,15 @@ impl InstanceRunner { } }; } + + // Anyone else who was trying to ask us to go die will be happy to learn + // that we have now done so! + while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await + { + let _ = tx.send(Ok(VmmUnregisterResponse { + updated_runtime: Some(self.current_state()), + })); + } } /// Yields this instance's ID. @@ -1189,6 +1222,12 @@ pub struct Instance { /// loop. tx: mpsc::Sender, + /// Sender for requests to terminate the instance. + /// + /// These are sent over a separate channel so that they can be prioritized + /// over all other requests to the instance. + terminate_tx: mpsc::Sender, + /// This is reference-counted so that the `Instance` struct may be cloned. #[allow(dead_code)] runner_handle: Arc>, @@ -1287,6 +1326,19 @@ impl Instance { let (tx, rx) = mpsc::channel(QUEUE_SIZE); let (tx_monitor, rx_monitor) = mpsc::channel(1); + // Request channel for terminating the instance. + // + // This is a separate channel from the main request channel (`self.rx`) + // because we would like to be able to prioritize requests to terminate, and + // handle them even when the instance's main request channel may have filled + // up. + // + // Note also that this is *not* part of the `InstanceRunner` struct, + // because it's necessary to split mutable borrows in order to allow + // selecting between the actual instance operation (which must mutate + // the `InstanceRunner`) and awaiting a termination request. + let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE); + let metadata = propolis_client::types::InstanceMetadata { project_id: metadata.project_id, silo_id: metadata.silo_id, @@ -1341,9 +1393,14 @@ impl Instance { }; let runner_handle = - tokio::task::spawn(async move { runner.run().await }); + tokio::task::spawn(async move { runner.run(terminate_rx).await }); - Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) }) + Ok(Instance { + id, + tx, + runner_handle: Arc::new(runner_handle), + terminate_tx, + }) } pub fn id(&self) -> InstanceUuid { @@ -1406,9 +1463,19 @@ impl Instance { tx: oneshot::Sender>, mark_failed: bool, ) -> Result<(), Error> { - self.tx - .try_send(InstanceRequest::Terminate { mark_failed, tx }) - .or_else(InstanceRequest::fail_try_send) + self.terminate_tx + .try_send(TerminateRequest { mark_failed, tx }) + .or_else(|err| match err { + mpsc::error::TrySendError::Closed(TerminateRequest { + tx, + .. + }) => tx.send(Err(Error::FailedSendChannelClosed.into())), + mpsc::error::TrySendError::Full(TerminateRequest { + tx, + .. + }) => tx.send(Err(Error::FailedSendChannelFull.into())), + }) + .map_err(|_| Error::FailedSendClientClosed) } pub fn issue_snapshot_request( @@ -1745,6 +1812,64 @@ impl InstanceRunner { Ok(PropolisSetup { client, running_zone }) } + async fn handle_termination_request( + &mut self, + req: Option, + current_req: Option<&str>, + ) { + match req { + Some(TerminateRequest { tx, mark_failed }) => { + if let Some(request) = current_req { + info!( + self.log, + "Received request to terminate instance while waiting \ + on an ongoing request"; + "request" => %request, + "mark_failed" => mark_failed, + ); + } else { + info!( + self.log, + "Received request to terminate instance"; + "mark_failed" => mark_failed, + ); + } + + let result = tx + .send(Ok(VmmUnregisterResponse { + updated_runtime: Some( + self.terminate(mark_failed).await, + ), + })) + .map_err(|_| Error::FailedSendClientClosed); + if let Err(err) = result { + warn!( + self.log, + "Error handling request to terminate instance"; + "err" => ?err, + ); + } + } + None => { + if let Some(request) = current_req { + warn!( + self.log, + "Instance termination request channel closed while \ + waiting on an ongoing request; shutting down"; + "request" => %request, + ); + } else { + warn!( + self.log, + "Instance termination request channel closed; \ + shutting down"; + ); + } + self.terminate(false).await; + } + }; + } + async fn terminate(&mut self, mark_failed: bool) -> SledVmmState { self.terminate_inner().await; self.state.terminate_rudely(mark_failed);