diff --git a/src/lib.rs b/src/lib.rs index 00cdbe2..05c1b51 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,7 +208,7 @@ pin_project! { if this.inner.is_some() { // If the inner future wasn't moved out using into_inner, // enter the tokio context while the inner value is dropped. - let _guard = TOKIO1.handle.enter(); + let _guard = get_runtime_handle().enter(); this.project().inner.set(None); } } @@ -326,7 +326,7 @@ impl Future for Compat { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let _guard = TOKIO1.handle.enter(); + let _guard = get_runtime_handle().enter(); self.get_pin_mut().poll(cx) } } @@ -453,38 +453,21 @@ impl tokio::io::AsyncSeek for Compat { } } -static TOKIO1: Lazy = Lazy::new(|| { - let mut fallback_rt = None; - let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| { - thread::Builder::new() - .name("async-compat/tokio-1".into()) - .spawn(move || TOKIO1.fallback_rt.as_ref().unwrap().block_on(Pending)) - .unwrap(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("cannot start tokio-1 runtime"); - - let handle = rt.handle().clone(); - - fallback_rt = Some(rt); - - handle - }); +fn get_runtime_handle() -> tokio::runtime::Handle { + tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO1.handle().clone()) +} - GlobalRuntime { - handle, - fallback_rt, - } +static TOKIO1: Lazy = Lazy::new(|| { + thread::Builder::new() + .name("async-compat/tokio-1".into()) + .spawn(|| TOKIO1.block_on(Pending)) + .unwrap(); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("cannot start tokio-1 runtime") }); -struct GlobalRuntime { - /// The handle used for all `Compat` futures. - handle: tokio::runtime::Handle, - /// Only used if we couldn't acquire a handle to a runtime on creation. - fallback_rt: Option, -} - struct Pending; impl Future for Pending { @@ -497,29 +480,31 @@ impl Future for Pending { #[cfg(test)] mod tests { + use super::Lazy; use crate::{CompatExt, TOKIO1}; #[test] - fn existing_tokio_runtime_is_reused_by_compat() { + fn fallback_runtime_is_created_if_and_only_if_outside_tokio_context() { + // Use compat inside of a tokio context. tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() - .block_on(async { println!("foo") }.compat()); + .block_on(use_tokio().compat()); - assert!(TOKIO1.fallback_rt.is_none()); - } + // We didn't need to create the fallback runtime, because we used compat + // inside of an existing tokio context. + assert!(Lazy::get(&TOKIO1).is_none()); - #[test] - fn tokio_runtime_is_reused_even_after_it_exits() { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { println!("foo") }); + // Use compat outside of a tokio context. + futures::executor::block_on(use_tokio().compat()); - futures::executor::block_on(async { println!("foo") }.compat()); + // We must have created the fallback runtime, because we used compat + // outside of a tokio context. + assert!(Lazy::get(&TOKIO1).is_some()); + } - assert!(TOKIO1.fallback_rt.is_none()); + async fn use_tokio() { + tokio::time::sleep(std::time::Duration::from_micros(1)).await } }