From e3cee3801869a28d2112642a8d8dbbc94b9e2a82 Mon Sep 17 00:00:00 2001 From: Falco Hirschenberger Date: Sun, 24 Nov 2024 07:57:35 +0100 Subject: [PATCH] Add tokio taskname registration for use in tokio-console (#89) This introduces the `tracing` feature that has to be enabled and also the environment variable `RUSTFLAGS="--cfg tokio_unstable"` must be set to enable tokio taskname registration. See example program `tokio-console.rs` for usage. --------- Co-authored-by: Finomnis --- .github/workflows/ci.yml | 9 +-- .github/workflows/coverage.yml | 3 +- .github/workflows/rust-clippy.yml | 1 - Cargo.toml | 18 ++++++ examples/tokio_console.rs | 66 +++++++++++++++++++ src/lib.rs | 1 + src/runner.rs | 101 ++++++++++++++++-------------- src/subsystem/subsystem_handle.rs | 8 ++- src/tokio_task.rs | 23 +++++++ src/toplevel.rs | 15 +++-- 10 files changed, 183 insertions(+), 62 deletions(-) create mode 100644 examples/tokio_console.rs create mode 100644 src/tokio_task.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22ab910..5842f9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,14 +30,14 @@ jobs: uses: taiki-e/install-action@cross - name: Build - run: cross build --all-features --release --target=${{ matrix.target }} + run: cross build --release --target=${{ matrix.target }} build-examples: name: Build Examples runs-on: ubuntu-latest needs: [lints, docs] env: - RUSTFLAGS: "-D warnings" + RUSTFLAGS: "-D warnings --cfg tokio_unstable" steps: - name: Checkout sources uses: actions/checkout@v4 @@ -112,9 +112,6 @@ jobs: - name: Check with minimal versions run: cargo minimal-versions check --workspace --ignore-private - - name: Test with minimal versions - run: cargo minimal-versions test -- --test-threads 1 - min-versions-msrv: name: Minimal Dependency Versions (MSRV) runs-on: ubuntu-latest @@ -165,7 +162,7 @@ jobs: run: cargo fmt --all -- --check - name: Run cargo clippy - run: cargo clippy --all-features --all-targets -- -D warnings + run: cargo clippy --all-targets -- -D warnings docs: name: Documentation diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index efe80ea..fd8ad51 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -23,10 +23,9 @@ jobs: uses: actions/checkout@v4 - name: Install llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - #- uses: Swatinem/rust-cache@v1 - name: Compute Coverage run: - cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json + cargo llvm-cov --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 env: diff --git a/.github/workflows/rust-clippy.yml b/.github/workflows/rust-clippy.yml index 5faca56..b244db5 100644 --- a/.github/workflows/rust-clippy.yml +++ b/.github/workflows/rust-clippy.yml @@ -43,7 +43,6 @@ jobs: - name: Run rust-clippy run: cargo clippy - --all-features --all-targets --message-format=json | clippy-sarif | tee rust-clippy-results.sarif | sarif-fmt continue-on-error: true diff --git a/Cargo.toml b/Cargo.toml index dbe911a..01d8810 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,15 @@ exclude = [ "/UPCOMING_VERSION_CHANGES.txt", ] +[features] +# Enable task naming and task caller location. +tracing = ["tokio/tracing"] + +[[example]] +name = "tokio_console" +required-features = ["tracing"] + + [dependencies] tracing = { version = "0.1.37", default-features = false } @@ -67,6 +76,11 @@ headers = ">= 0.3.5" # Required to fix minimal-versions serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions +gcc = ">= 0.3.4" # Required to fix minimal-versions + +# tokio-console +console-subscriber = "0.2.0" + # For testing unix signals [target.'cfg(unix)'.dev-dependencies] nix = { version = "0.29.0", default-features = false, features = ["signal"] } @@ -74,3 +88,7 @@ nix = { version = "0.29.0", default-features = false, features = ["signal"] } # Make leak sanitizer more reliable [profile.dev] opt-level = 1 + +# Define `tokio_unstable` config for linter +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } diff --git a/examples/tokio_console.rs b/examples/tokio_console.rs new file mode 100644 index 0000000..b5c8570 --- /dev/null +++ b/examples/tokio_console.rs @@ -0,0 +1,66 @@ +//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's +//! runtime behaviour. Subsystems will appear under their registration names. +//! +//! Run this example with: +//! +//! ``` +//! RUSTFLAGS="--cfg tokio_unstable" cargo run --features "tracing" --example tokio_console +//! ``` +//! +//! Then, open the `tokio-console` application (see https://crates.io/crates/tokio-console) to +//! follow the subsystem tasks live. + +use miette::Result; +use tokio::time::{sleep, Duration}; +use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel}; +use tracing::Level; +use tracing_subscriber::{fmt::writer::MakeWriterExt, prelude::*}; + +async fn child(subsys: SubsystemHandle) -> Result<()> { + sleep(Duration::from_millis(3000)) + .cancel_on_shutdown(&subsys) + .await + .ok(); + Ok(()) +} + +async fn parent(subsys: SubsystemHandle) -> Result<()> { + tracing::info!("Parent started."); + + let mut iteration = 0; + while !subsys.is_shutdown_requested() { + subsys.start(SubsystemBuilder::new(format!("child{iteration}"), child)); + iteration += 1; + + sleep(Duration::from_millis(1000)) + .cancel_on_shutdown(&subsys) + .await + .ok(); + } + + tracing::info!("Parent stopped."); + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + // Init tokio-console server and tracing + let console_layer = console_subscriber::spawn(); + tracing_subscriber::registry() + .with(console_layer) + .with( + tracing_subscriber::fmt::layer() + .with_writer(std::io::stdout.with_max_level(Level::DEBUG)) + .compact(), + ) + .init(); + + // Setup and execute subsystem tree + Toplevel::new(|s| async move { + s.start(SubsystemBuilder::new("parent", parent)); + }) + .catch_signals() + .handle_shutdown_requests(Duration::from_millis(1000)) + .await + .map_err(Into::into) +} diff --git a/src/lib.rs b/src/lib.rs index 9ecc147..3a745be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,6 +113,7 @@ mod into_subsystem; mod runner; mod signal_handling; mod subsystem; +mod tokio_task; mod toplevel; mod utils; diff --git a/src/runner.rs b/src/runner.rs index 837465c..c7f7dc8 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -21,6 +21,7 @@ pub(crate) struct SubsystemRunner { } impl SubsystemRunner { + #[track_caller] pub(crate) fn new( name: Arc, subsystem: Subsys, @@ -32,8 +33,8 @@ impl SubsystemRunner { Fut: 'static + Future> + Send, Err: Into, { - let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await }; - let aborthandle = tokio::spawn(future).abort_handle(); + let future = run_subsystem(name, subsystem, subsystem_handle, guard); + let aborthandle = crate::tokio_task::spawn(future, "subsystem_runner").abort_handle(); SubsystemRunner { aborthandle } } } @@ -44,12 +45,14 @@ impl Drop for SubsystemRunner { } } -async fn run_subsystem( +#[track_caller] +fn run_subsystem( name: Arc, subsystem: Subsys, mut subsystem_handle: SubsystemHandle, guard: AliveGuard, -) where +) -> impl Future + 'static +where Subsys: 'static + FnOnce(SubsystemHandle) -> Fut + Send, Fut: 'static + Future> + Send, Err: Into, @@ -57,54 +60,58 @@ async fn run_subsystem( let mut redirected_subsystem_handle = subsystem_handle.delayed_clone(); let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) }; - let join_handle = tokio::spawn(future); + let join_handle = crate::tokio_task::spawn(future, &name); - // Abort on drop - guard.on_cancel({ - let abort_handle = join_handle.abort_handle(); - let name = Arc::clone(&name); - move || { - if !abort_handle.is_finished() { - tracing::warn!("Subsystem cancelled: '{}'", name); + async move { + // Abort on drop + guard.on_cancel({ + let abort_handle = join_handle.abort_handle(); + let name = Arc::clone(&name); + move || { + if !abort_handle.is_finished() { + tracing::warn!("Subsystem cancelled: '{}'", name); + } + abort_handle.abort(); } - abort_handle.abort(); - } - }); + }); - let failure = match join_handle.await { - Ok(Ok(())) => None, - Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))), - Err(e) => { - // We can assume that this is a panic, because a cancellation - // can never happen as long as we still hold `guard`. - assert!(e.is_panic()); - Some(SubsystemError::Panicked(name)) - } - }; + let failure = match join_handle.await { + Ok(Ok(())) => None, + Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))), + Err(e) => { + // We can assume that this is a panic, because a cancellation + // can never happen as long as we still hold `guard`. + assert!(e.is_panic()); + Some(SubsystemError::Panicked(name)) + } + }; - // Retrieve the handle that was passed into the subsystem. - // Originally it was intended to pass the handle as reference, but due - // to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters) - // it was decided to pass ownership instead. - // - // It is still important that the handle does not leak out of the subsystem. - let subsystem_handle = match redirected_subsystem_handle.try_recv() { - Ok(s) => s, - Err(_) => { - tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!"); - panic!("The SubsystemHandle object must not be leaked out of the subsystem!"); + // Retrieve the handle that was passed into the subsystem. + // Originally it was intended to pass the handle as reference, but due + // to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters) + // it was decided to pass ownership instead. + // + // It is still important that the handle does not leak out of the subsystem. + let subsystem_handle = match redirected_subsystem_handle.try_recv() { + Ok(s) => s, + Err(_) => { + tracing::error!( + "The SubsystemHandle object must not be leaked out of the subsystem!" + ); + panic!("The SubsystemHandle object must not be leaked out of the subsystem!"); + } + }; + + // Raise potential errors + let joiner_token = subsystem_handle.joiner_token; + if let Some(failure) = failure { + joiner_token.raise_failure(failure); } - }; - // Raise potential errors - let joiner_token = subsystem_handle.joiner_token; - if let Some(failure) = failure { - joiner_token.raise_failure(failure); + // Wait for children to finish before we destroy the `SubsystemHandle` object. + // Otherwise the children would be cancelled immediately. + // + // This is the main mechanism that forwards a cancellation to all the children. + joiner_token.downgrade().join().await; } - - // Wait for children to finish before we destroy the `SubsystemHandle` object. - // Otherwise the children would be cancelled immediately. - // - // This is the main mechanism that forwards a cancellation to all the children. - joiner_token.downgrade().join().await; } diff --git a/src/subsystem/subsystem_handle.rs b/src/subsystem/subsystem_handle.rs index 08dd32b..1547029 100644 --- a/src/subsystem/subsystem_handle.rs +++ b/src/subsystem/subsystem_handle.rs @@ -72,6 +72,7 @@ impl SubsystemHandle { /// Ok(()) /// } /// ``` + #[track_caller] pub fn start( &self, builder: SubsystemBuilder, @@ -82,7 +83,11 @@ impl SubsystemHandle { Err: Into, { self.start_with_abs_name( - Arc::from(format!("{}/{}", self.inner.name, builder.name)), + if self.inner.name.as_ref() == "/" { + Arc::from(format!("/{}", builder.name)) + } else { + Arc::from(format!("{}/{}", self.inner.name, builder.name)) + }, builder.subsystem, ErrorActions { on_failure: Atomic::new(builder.failure_action), @@ -92,6 +97,7 @@ impl SubsystemHandle { ) } + #[track_caller] pub(crate) fn start_with_abs_name( &self, name: Arc, diff --git a/src/tokio_task.rs b/src/tokio_task.rs new file mode 100644 index 0000000..75ed1eb --- /dev/null +++ b/src/tokio_task.rs @@ -0,0 +1,23 @@ +use std::future::Future; +use tokio::task::JoinHandle; + +#[cfg(not(all(tokio_unstable, feature = "tracing")))] +#[track_caller] +pub(crate) fn spawn(f: F, _name: &str) -> JoinHandle +where + ::Output: Send + 'static, +{ + tokio::spawn(f) +} + +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[track_caller] +pub(crate) fn spawn(f: F, name: &str) -> JoinHandle +where + ::Output: Send + 'static, +{ + tokio::task::Builder::new() + .name(name) + .spawn(f) + .expect("a task should be spawned") +} diff --git a/src/toplevel.rs b/src/toplevel.rs index 9e7b4c1..d252bc3 100644 --- a/src/toplevel.rs +++ b/src/toplevel.rs @@ -57,6 +57,7 @@ impl Toplevel { /// * `subsystem` - The subsystem that should be spawned as the root node. /// Usually the job of this subsystem is to spawn further subsystems. #[allow(clippy::new_without_default)] + #[track_caller] pub fn new(subsystem: Subsys) -> Self where Subsys: 'static + FnOnce(SubsystemHandle) -> Fut + Send, @@ -78,7 +79,7 @@ impl Toplevel { }); let toplevel_subsys = root_handle.start_with_abs_name( - Arc::from(""), + Arc::from("/"), move |s| async move { subsystem(s).await; Result::<(), ErrType>::Ok(()) @@ -118,13 +119,17 @@ impl Toplevel { /// /// Especially the caveats from [tokio::signal::unix::Signal] are important for Unix targets. /// + #[track_caller] pub fn catch_signals(self) -> Self { let shutdown_token = self.root_handle.get_cancellation_token().clone(); - tokio::spawn(async move { - wait_for_signal().await; - shutdown_token.cancel(); - }); + crate::tokio_task::spawn( + async move { + wait_for_signal().await; + shutdown_token.cancel(); + }, + "catch_signals", + ); self }