Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sled-agent] Give InstanceManager a gun #6915

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 199 additions & 74 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ enum InstanceRequest {
state: VmmStateRequested,
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
},
Terminate {
mark_failed: bool,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
},
IssueSnapshotRequest {
disk_id: Uuid,
snapshot_id: Uuid,
Expand Down Expand Up @@ -293,9 +289,6 @@ impl InstanceRequest {
Self::PutState { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::Terminate { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::IssueSnapshotRequest { tx, .. }
| Self::AddExternalIp { tx, .. }
| Self::DeleteExternalIp { tx, .. }
Expand All @@ -306,6 +299,11 @@ impl InstanceRequest {
}
}

struct TerminateRequest {
mark_failed: bool,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
}

// A small task which tracks the state of the instance, by constantly querying
// the state of Propolis for updates.
//
Expand Down Expand Up @@ -469,7 +467,7 @@ struct InstanceRunner {
}

impl InstanceRunner {
async fn run(mut self) {
async fn run(mut self, mut terminate_rx: mpsc::Receiver<TerminateRequest>) {
use InstanceRequest::*;
while !self.should_terminate {
tokio::select! {
Expand Down Expand Up @@ -535,74 +533,103 @@ impl InstanceRunner {
self.terminate(mark_failed).await;
},
}

},
// Requests to terminate the instance take priority over any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a silly question, but: What happens if this task is already stuck awaiting a response to one of the commands below? IIUC that's what we're seeing in #6911--the InstanceRunner loop for some instance is stuck waiting on a Propolis request, so it's not looking at any of its message queues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, you're right, this will need to also select over any operation we do in any of these branches and the termination channel firing. I'll fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, @gjcolombo, commit 2a331ab changes this so that the termination channel firing will take priority over any in flight request to the instance, so even if that gets stuck, we will still pull the plug immediately.

I wondered if we wanted to add a grace period to allow the in-flight operation to finish, but after talking to @smklein, I don't think that's actually necessary, because terminate is only used by the most forceful attempts to stop an instance (the vmm_unregister API and killing an instance that was using an expunged disk). A "normal" attempt to stop the instance should go through instance_put_state with the Stopped state, which goes in the normal request queue.

// other request to the instance.
request = terminate_rx.recv() => {
self.handle_termination_request(request, None).await;
break;
}

// Handle external requests to act upon the instance.
request = self.rx.recv() => {
let request_variant = request.as_ref().map(|r| r.to_string());
let result = match request {
Some(RequestZoneBundle { tx }) => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(GetFilesystemPool { tx } ) => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(Terminate { mark_failed, tx }) => {
tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.terminate(mark_failed).await)
}))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
Some(AddExternalIp { ip, tx }) => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(DeleteExternalIp { ip, tx }) => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(RefreshExternalIps { tx }) => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
let request = match request {
Some(r) => r,
None => {
warn!(self.log, "Instance request channel closed; shutting down");
let mark_failed = false;
self.terminate(mark_failed).await;
break;
},
}
};
let request_variant = request.to_string();
// Okay, this is a little bit wacky: if we are waiting for
// one of the instance operations we run here to come back,
// and a termination request comes in, we want to give up on
// the outstanding operation and honor the termination
// request immediately. This is in case the instance
// operation has gotten stuck: we don't want it to prevent
// the instance from terminating because something else is
// wedged.
//
// Therefore, we're going to select between the future that
// actually performs the instance op and receiving another
// request from the termination channel.
let op = async {
match request {
RequestZoneBundle { tx } => {
tx.send(self.request_zone_bundle().await)
.map_err(|_| Error::FailedSendClientClosed)
},
GetFilesystemPool { tx } => {
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
CurrentState{ tx } => {
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
PutState{ state, tx } => {
tx.send(self.put_state(state).await
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
tx.send(
self.issue_snapshot_request(
disk_id,
snapshot_id
).await.map_err(|e| e.into())
)
.map_err(|_| Error::FailedSendClientClosed)
},
AddExternalIp { ip, tx } => {
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
DeleteExternalIp { ip, tx } => {
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
},
RefreshExternalIps { tx } => {
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
.map_err(|_| Error::FailedSendClientClosed)
}
}
};
tokio::select! {
biased;

request = terminate_rx.recv() => {
self.handle_termination_request(
request,
Some(&request_variant),
).await;
break;
}

if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant.unwrap(),
"err" => ?err,

);
}
result = op => {
if let Err(err) = result {
warn!(
self.log,
"Error handling request";
"request" => request_variant,
"err" => ?err,
);
}
}
};
}

}
Expand All @@ -627,9 +654,6 @@ impl InstanceRunner {
PutState { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
Terminate { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
IssueSnapshotRequest { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
}
Expand All @@ -644,6 +668,15 @@ impl InstanceRunner {
}
};
}

// Anyone else who was trying to ask us to go die will be happy to learn
// that we have now done so!
while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await
{
let _ = tx.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(self.current_state()),
}));
}
}

/// Yields this instance's ID.
Expand Down Expand Up @@ -1189,6 +1222,12 @@ pub struct Instance {
/// loop.
tx: mpsc::Sender<InstanceRequest>,

/// Sender for requests to terminate the instance.
///
/// These are sent over a separate channel so that they can be prioritized
/// over all other requests to the instance.
terminate_tx: mpsc::Sender<TerminateRequest>,

/// This is reference-counted so that the `Instance` struct may be cloned.
#[allow(dead_code)]
runner_handle: Arc<tokio::task::JoinHandle<()>>,
Expand Down Expand Up @@ -1287,6 +1326,19 @@ impl Instance {
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
let (tx_monitor, rx_monitor) = mpsc::channel(1);

// Request channel for terminating the instance.
//
// This is a separate channel from the main request channel (`self.rx`)
// because we would like to be able to prioritize requests to terminate, and
// handle them even when the instance's main request channel may have filled
// up.
//
// Note also that this is *not* part of the `InstanceRunner` struct,
// because it's necessary to split mutable borrows in order to allow
// selecting between the actual instance operation (which must mutate
// the `InstanceRunner`) and awaiting a termination request.
let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE);

let metadata = propolis_client::types::InstanceMetadata {
project_id: metadata.project_id,
silo_id: metadata.silo_id,
Expand Down Expand Up @@ -1341,9 +1393,14 @@ impl Instance {
};

let runner_handle =
tokio::task::spawn(async move { runner.run().await });
tokio::task::spawn(async move { runner.run(terminate_rx).await });

Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
Ok(Instance {
id,
tx,
runner_handle: Arc::new(runner_handle),
terminate_tx,
})
}

pub fn id(&self) -> InstanceUuid {
Expand Down Expand Up @@ -1406,9 +1463,19 @@ impl Instance {
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
mark_failed: bool,
) -> Result<(), Error> {
self.tx
.try_send(InstanceRequest::Terminate { mark_failed, tx })
.or_else(InstanceRequest::fail_try_send)
self.terminate_tx
.try_send(TerminateRequest { mark_failed, tx })
.or_else(|err| match err {
mpsc::error::TrySendError::Closed(TerminateRequest {
tx,
..
}) => tx.send(Err(Error::FailedSendChannelClosed.into())),
mpsc::error::TrySendError::Full(TerminateRequest {
tx,
..
}) => tx.send(Err(Error::FailedSendChannelFull.into())),
})
.map_err(|_| Error::FailedSendClientClosed)
}

pub fn issue_snapshot_request(
Expand Down Expand Up @@ -1745,6 +1812,64 @@ impl InstanceRunner {
Ok(PropolisSetup { client, running_zone })
}

async fn handle_termination_request(
&mut self,
req: Option<TerminateRequest>,
current_req: Option<&str>,
) {
match req {
Some(TerminateRequest { tx, mark_failed }) => {
if let Some(request) = current_req {
info!(
self.log,
"Received request to terminate instance while waiting \
on an ongoing request";
"request" => %request,
"mark_failed" => mark_failed,
);
} else {
info!(
self.log,
"Received request to terminate instance";
"mark_failed" => mark_failed,
);
}

let result = tx
.send(Ok(VmmUnregisterResponse {
updated_runtime: Some(
self.terminate(mark_failed).await,
),
}))
.map_err(|_| Error::FailedSendClientClosed);
if let Err(err) = result {
warn!(
self.log,
"Error handling request to terminate instance";
"err" => ?err,
);
}
}
None => {
if let Some(request) = current_req {
warn!(
self.log,
"Instance termination request channel closed while \
waiting on an ongoing request; shutting down";
"request" => %request,
);
} else {
warn!(
self.log,
"Instance termination request channel closed; \
shutting down";
);
}
self.terminate(false).await;
}
};
}

async fn terminate(&mut self, mark_failed: bool) -> SledVmmState {
self.terminate_inner().await;
self.state.terminate_rudely(mark_failed);
Expand Down
Loading