From f80d1910797475ca91c2e338502ffc2e6ed22e6f Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Mon, 13 May 2024 18:33:50 +0200 Subject: [PATCH] task: Drop the join waker of a task eagerly when the JoinHandle gets dropped or the task completes Currently, the waker registered with a JoinHandle is not dropped until the task allocation is dropped. This behaviour may cause the memory allocated by a task to not be freed when in the case of two tasks awaiting each others JoinHandle. This commit changes the behaviour by actively dropping the waker when the JoinHandle gets dropped (or the task completes in some cases). Closes #6505. --- tokio/src/runtime/task/harness.rs | 48 ++++++++++++++++++-- tokio/src/runtime/task/mod.rs | 5 ++ tokio/src/runtime/task/state.rs | 45 ++++++++++++------ tokio/src/runtime/tests/loom_multi_thread.rs | 30 ++++++++++++ 4 files changed, 111 insertions(+), 17 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 996f0f2d9b4..bf74d48dada 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -284,9 +284,11 @@ where } pub(super) fn drop_join_handle_slow(self) { - // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - if self.state().unset_join_interested().is_err() { + let snapshot = self.state().transition_to_join_handle_dropped(); + + if snapshot.is_complete() { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the @@ -301,6 +303,25 @@ where })); } + if !snapshot.is_join_waker_set() { + // If the JOIN_WAKER flag is unset at this point, the task is either + // already terminal or not complete so the `JoinHandle` is responsible + // for dropping the waker. + // Safety: + // If the JOIN_WAKER bit is not set the join handle has exclusive + // access to the waker as per rule 2 in task/mod.rs. + // This can only be the case at this point in two scenarios: + // 1. The task completed and the runtime unset `JOIN_WAKER` flag + // after accessing the waker during task completion. So the + // `JoinHandle` is the only one to access the join waker here. + // 2. The task is not completed so the `JoinHandle` was able to unset + // `JOIN_WAKER` bit itself to get mutable access to the waker. + // The runtime will not access the waker when this flag is unset. + unsafe { + self.trailer().set_waker(None); + } + } + // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); } @@ -311,7 +332,6 @@ where fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the @@ -320,13 +340,33 @@ where if !snapshot.is_join_interested() { // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the - // output. + // output. The join waker was already dropped by the + // `JoinHandle` before. self.core().drop_future_or_output(); } else if snapshot.is_join_waker_set() { // Notify the waker. Reading the waker field is safe per rule 4 // in task/mod.rs, since the JOIN_WAKER bit is set and the call // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); + + // If JOIN_INTEREST is still set at this point the `JoinHandle` + // was not dropped since setting COMPLETE so we unset JOIN_WAKER + // to give the responsibility of dropping the join waker back to + // the `JoinHandle`. `JoinHandle` is able to drop the waker when + // itself gets dropped. + if self.state().unset_waker_if_join_interested().is_err() { + // Unsetting JOIN_WAKER flag will fail if JOIN_INTERESTED is + // not set to indicate that the runtime has the responsibility + // to drop the join waker here as per rule 7 in task/mod.rs. + // Safety: + // If JOIN_INTEREST got unset since setting COMPLETE we are + // the only ones to have access to the join waker and need + // to drop it here because the `JoinHandle` of the task + // already got dropped. + unsafe { + self.trailer().set_waker(None); + } + } } })); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 33f54003d38..3de0e8d7d33 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -94,10 +94,15 @@ //! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is //! not already zero to gain exclusive access to the waker field per rule //! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one. +//! If the `JoinHandle` unsets `JOIN_WAKER` in the process of being dropped +//! to clear the waker field, only steps (i) and (ii) are relevant. //! //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. //! the task hasn't yet completed). //! +//! 7. If JOIN_INTEREST is zero and COMPLETE is one, then the runtime has +//! exclusive (mutable) access to the waker field. +//! //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a //! race. If step (i) fails, then the attempt to write a waker is aborted. If //! step (iii) fails because COMPLETE is set to one by another thread after diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 0fc7bb0329b..35db3e8e453 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -371,22 +371,21 @@ impl State { .map_err(|_| ()) } - /// Tries to unset the `JOIN_INTEREST` flag. - /// - /// Returns `Ok` if the operation happens before the task transitions to a - /// completed state, `Err` otherwise. - pub(super) fn unset_join_interested(&self) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_join_interested()); + /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER` + /// flag is also unset. + pub(super) fn transition_to_join_handle_dropped(&self) -> Snapshot { + self.fetch_update_action(|mut snapshot| { + assert!(snapshot.is_join_interested()); - if curr.is_complete() { - return None; - } + snapshot.unset_join_interested(); - let mut next = curr; - next.unset_join_interested(); + if !snapshot.is_complete() { + // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the + // `JoinHandle` exclusive access to the waker to drop it. + snapshot.unset_join_waker(); + } - Some(next) + (snapshot, Some(snapshot)) }) } @@ -430,6 +429,26 @@ impl State { }) } + /// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set. + /// + /// Returns `Ok` has been unset, `Err` otherwise. This operation requires + /// the task to be completed. + pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_complete()); + assert!(curr.is_join_waker_set()); + + if !curr.is_join_interested() { + return None; + } + + let mut next = curr; + next.unset_join_waker(); + + Some(next) + }) + } + pub(super) fn ref_inc(&self) { use std::process; use std::sync::atomic::Ordering::Relaxed; diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index ddd14b7fb3f..e2706e65c65 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,6 +10,7 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; +use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -459,3 +460,32 @@ impl Future for Track { }) } } + +#[test] +fn drop_tasks_with_reference_cycle() { + loom::model(|| { + let pool = mk_pool(2); + + pool.block_on(async move { + let (tx, mut rx) = channel(1); + + let (a_closer, mut wait_for_close_a) = channel::<()>(1); + let (b_closer, mut wait_for_close_b) = channel::<()>(1); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; + }); + + let b = spawn(async move { + let _ = a.await; + let _ = b_closer.send(()).await; + }); + + tx.send(b).await.unwrap(); + + futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; + }); + }); +}