From 76f47c59c66412b9ac4c8a91dc8d21a292a7a67f Mon Sep 17 00:00:00 2001 From: Duncan Fairbanks Date: Thu, 5 Dec 2024 19:37:43 -0800 Subject: [PATCH] fix: don't create static variables containing a tokio::runtime::Handle There was a bug in the approach implemented in #30. The issue could manifest as a panic with message: "A Tokio 1.x context was found, but it is being shutdown." This would occur in the following scenario: 1. `.compat()` was used within a tokio context other than the fallback runtime. 2. That runtime was shutdown. 3. `.compat()` was used again. The root cause being that we had a `Handle` stored in a `static` global. `Handle`s are weak by design, and cannot prevent the runtime from shutting down. I reverted most of the changes in #30 and instead added a `get_runtime_handle` function. This function attempts to acquire an existing runtime handle with `try_current` before creating a fallback runtime. This should be safe because `try_current` will only succeed within the scope of a tokio runtime context, and it cannot outlive that scope. --- src/lib.rs | 73 ++++++++++++++++++++++-------------------------------- 1 file changed, 29 insertions(+), 44 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00cdbe2..05c1b51 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,7 +208,7 @@ pin_project! { if this.inner.is_some() { // If the inner future wasn't moved out using into_inner, // enter the tokio context while the inner value is dropped. - let _guard = TOKIO1.handle.enter(); + let _guard = get_runtime_handle().enter(); this.project().inner.set(None); } } @@ -326,7 +326,7 @@ impl Future for Compat { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let _guard = TOKIO1.handle.enter(); + let _guard = get_runtime_handle().enter(); self.get_pin_mut().poll(cx) } } @@ -453,38 +453,21 @@ impl tokio::io::AsyncSeek for Compat { } } -static TOKIO1: Lazy = Lazy::new(|| { - let mut fallback_rt = None; - let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| { - thread::Builder::new() - .name("async-compat/tokio-1".into()) - .spawn(move || TOKIO1.fallback_rt.as_ref().unwrap().block_on(Pending)) - .unwrap(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("cannot start tokio-1 runtime"); - - let handle = rt.handle().clone(); - - fallback_rt = Some(rt); - - handle - }); +fn get_runtime_handle() -> tokio::runtime::Handle { + tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO1.handle().clone()) +} - GlobalRuntime { - handle, - fallback_rt, - } +static TOKIO1: Lazy = Lazy::new(|| { + thread::Builder::new() + .name("async-compat/tokio-1".into()) + .spawn(|| TOKIO1.block_on(Pending)) + .unwrap(); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("cannot start tokio-1 runtime") }); -struct GlobalRuntime { - /// The handle used for all `Compat` futures. - handle: tokio::runtime::Handle, - /// Only used if we couldn't acquire a handle to a runtime on creation. - fallback_rt: Option, -} - struct Pending; impl Future for Pending { @@ -497,29 +480,31 @@ impl Future for Pending { #[cfg(test)] mod tests { + use super::Lazy; use crate::{CompatExt, TOKIO1}; #[test] - fn existing_tokio_runtime_is_reused_by_compat() { + fn fallback_runtime_is_created_if_and_only_if_outside_tokio_context() { + // Use compat inside of a tokio context. tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() - .block_on(async { println!("foo") }.compat()); + .block_on(use_tokio().compat()); - assert!(TOKIO1.fallback_rt.is_none()); - } + // We didn't need to create the fallback runtime, because we used compat + // inside of an existing tokio context. + assert!(Lazy::get(&TOKIO1).is_none()); - #[test] - fn tokio_runtime_is_reused_even_after_it_exits() { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { println!("foo") }); + // Use compat outside of a tokio context. + futures::executor::block_on(use_tokio().compat()); - futures::executor::block_on(async { println!("foo") }.compat()); + // We must have created the fallback runtime, because we used compat + // outside of a tokio context. + assert!(Lazy::get(&TOKIO1).is_some()); + } - assert!(TOKIO1.fallback_rt.is_none()); + async fn use_tokio() { + tokio::time::sleep(std::time::Duration::from_micros(1)).await } }