Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix action pipeline not cancelling persistent tasks. #1782

Merged
merged 4 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .moon/workspace.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ docker:
- '*.config.js'
- '*.json'
# unstable_remote:
# host: 'grpc://0.0.0.0:9092'
# host: 'http://0.0.0.0:8080'
# # host: 'grpc://0.0.0.0:9092'
# cache:
# compression: 'zstd'
# mtls:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

- Fixed a panic that could occur during command argument parsing.
- Fixed an issue where remote cached blobs would sometimes fail to be created locally.
- Fixed an issue when `ctrl+c`ing multiple persistent tasks would sometimes leave them running in
the background.

#### ⚙️ Internal

Expand Down
58 changes: 3 additions & 55 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 81 additions & 41 deletions crates/action-pipeline/src/action_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock, Semaphore};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn};

Expand Down Expand Up @@ -176,22 +177,24 @@ impl ActionPipeline {

actions.push(action);

if actions.len() == total_actions {
debug!("Finished pipeline, received all results");
break;
} else if abort_token.is_cancelled() {
if abort_token.is_cancelled() {
debug!("Aborting pipeline (because something failed)");
break;
} else if cancel_token.is_cancelled() {
debug!("Cancelling pipeline (via signal)");
break;
} else if actions.len() == total_actions {
debug!("Finished pipeline, received all results");
break;
}
}

drop(receiver);

// Clean up any open handles
queue_handle.abort();
// Wait for the queue to abort all running tasks
let _ = queue_handle.await;

// Force abort the signal handler
signal_handle.abort();

self.aborted = abort_token.is_cancelled();
Expand Down Expand Up @@ -284,48 +287,71 @@ impl ActionPipeline {
));

// Run this in isolation by exhausting the current list of handles
if node.is_interactive() {
exhaust_job_handles(&mut job_handles).await;
if node.is_interactive()
&& exhaust_job_handles(&mut job_handles, &job_context).await
{
return;
}
}

// Ensure all non-persistent actions have finished
exhaust_job_handles(&mut job_handles).await;
if exhaust_job_handles(&mut job_handles, &job_context).await {
return;
}

// Then run all persistent actions in parallel
if !persistent_indices.is_empty() {
debug!(
indices = ?persistent_indices,
"Running {} persistent actions",
persistent_indices.len()
);
if persistent_indices.is_empty() {
return;
}

debug!(
indices = ?persistent_indices,
"Running {} persistent actions",
persistent_indices.len()
);

persistent_indices
.into_iter()
.flat_map(|node_index| {
let node = action_graph.get_node_from_index(&node_index)?;
persistent_indices
.into_iter()
.flat_map(|node_index| {
let node = action_graph.get_node_from_index(&node_index)?;

// Since the task is persistent, set the state early since
// it "never finishes", otherwise the runner will error about
// a missing hash if it's a dependency of another persistent task
if let ActionNode::RunTask(inner) = node {
action_context
.set_target_state(inner.target.clone(), TargetState::Passthrough);
}

Some((node.to_owned(), node_index.index()))
})
.for_each(|(node, node_index)| {
job_handles.spawn(dispatch_job(
node,
node_index,
job_context.clone(),
Arc::clone(&app_context),
Arc::clone(&action_context),
));
});

// Since these tasks are persistent and never complete,
// we need to continually check if they've been aborted or
// cancelled, otherwise we will end up with zombie processes
loop {
sleep(Duration::from_millis(150)).await;

// No tasks running, so don't hang forever
if job_context.result_sender.is_closed() {
break;
}

// Since the task is persistent, set the state early since
// it "never finishes", otherwise the runner will error about
// a missing hash if it's a dependency of another persistent task
if let ActionNode::RunTask(inner) = node {
action_context
.set_target_state(inner.target.clone(), TargetState::Passthrough);
}
if job_context.is_aborted_or_cancelled() {
debug!("Shutting down {} persistent jobs", job_handles.len());

Some((node.to_owned(), node_index.index()))
})
.for_each(|(node, node_index)| {
job_handles.spawn(dispatch_job(
node,
node_index,
job_context.clone(),
Arc::clone(&app_context),
Arc::clone(&action_context),
));
});

exhaust_job_handles(&mut job_handles).await;
job_handles.shutdown().await;
break;
}
}
}))
}
Expand Down Expand Up @@ -445,7 +471,21 @@ async fn dispatch_job_with_permit(
}

#[instrument(skip_all)]
async fn exhaust_job_handles<T: 'static>(set: &mut JoinSet<T>) {
while set.join_next().await.is_some() {}
async fn exhaust_job_handles<T: 'static>(set: &mut JoinSet<T>, job_context: &JobContext) -> bool {
while set.join_next().await.is_some() {
// If the pipeline was aborted or cancelled (signal),
// loop through and abort all currently running handles
if job_context.is_aborted_or_cancelled() {
set.shutdown().await;
set.detach_all();

// Aborted
return true;
}
}

set.detach_all();

// Not aborted
false
}
1 change: 1 addition & 0 deletions crates/cli/tests/run_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ mod target_scopes {
.env_remove("CI")
.current_dir(sandbox.path().join("base"));
});

let output = assert.output();

assert!(predicate::str::contains("base:runFromProject").eval(&output));
Expand Down
3 changes: 1 addition & 2 deletions crates/remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ moon_action = { path = "../action" }
moon_common = { path = "../common" }
moon_config = { path = "../config" }
async-trait = { workspace = true }
bazel-remote-apis = "0.11.0"
bazel-remote-apis = { version = "0.12.0", features = ["serde"] }
chrono = { workspace = true }
miette = { workspace = true }
prost-types = "0.13.4"
reqwest = { workspace = true, features = ["json"] }
rustc-hash = { workspace = true }
scc = { workspace = true }
Expand Down
10 changes: 6 additions & 4 deletions crates/remote/src/fs_digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use bazel_remote_apis::build::bazel::remote::execution::v2::{
Digest, NodeProperties, OutputDirectory, OutputFile, OutputSymlink,
};
use bazel_remote_apis::google::protobuf::{Timestamp, UInt32Value};
use chrono::NaiveDateTime;
use moon_common::path::{PathExt, WorkspaceRelativePathBuf};
use prost_types::Timestamp;
use sha2::{Digest as Sha256Digest, Sha256};
use starbase_utils::fs::FsError;
use starbase_utils::glob;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn create_timestamp_from_naive(time: NaiveDateTime) -> Option<Timestamp> {

#[cfg(unix)]
fn is_file_executable(_path: &Path, props: &NodeProperties) -> bool {
props.unix_mode.is_some_and(|mode| mode & 0o111 != 0)
props.unix_mode.is_some_and(|mode| mode.value & 0o111 != 0)
}

#[cfg(windows)]
Expand All @@ -79,7 +79,9 @@ pub fn compute_node_properties(metadata: &Metadata) -> NodeProperties {
{
use std::os::unix::fs::PermissionsExt;

props.unix_mode = Some(metadata.permissions().mode());
props.unix_mode = Some(UInt32Value {
value: metadata.permissions().mode(),
});
}

props
Expand Down Expand Up @@ -192,7 +194,7 @@ fn apply_node_properties(path: &Path, props: &NodeProperties) -> miette::Result<
if let Some(mode) = &props.unix_mode {
use std::os::unix::fs::PermissionsExt;

fs::set_permissions(path, fs::Permissions::from_mode(*mode)).map_err(|error| {
fs::set_permissions(path, fs::Permissions::from_mode(mode.value)).map_err(|error| {
FsError::Perms {
path: path.to_path_buf(),
error: Box::new(error),
Expand Down
Loading
Loading