Skip to content

Commit

Permalink
feat: Reuse existing tokio runtime if one exists
Browse files Browse the repository at this point in the history
Fixes #29
  • Loading branch information
bonsairobo authored May 31, 2024
1 parent 34f48a2 commit bbf268f
Showing 1 changed file with 68 additions and 18 deletions.
86 changes: 68 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
//!
//! There are two kinds of compatibility issues between [tokio] and [futures]:
//!
//! 1. Tokio's types cannot be used outside tokio context, so any attempt to use
//! them will panic.
//! - Solution: If you apply the [`Compat`] adapter to a future, the future will enter the
//! context of a global single-threaded tokio runtime started by this crate. That does
//! *not* mean the future runs on the tokio runtime - it only means the future sets a
//! thread-local variable pointing to the global tokio runtime so that tokio's types can be
//! used inside it.
//! 1. Tokio's types cannot be used outside tokio context, so any attempt to use them will panic.
//! - Solution: If you apply the [`Compat`] adapter to a future, the future will manually
//! enter the context of a global tokio runtime. If a runtime is already available via tokio
//! thread-locals, then it will be used. Otherwise, a new single-threaded runtime will be
//! created on demand. That does *not* mean the future is polled by the tokio runtime - it
//! only means the future sets a thread-local variable pointing to the global tokio runtime so
//! that tokio's types can be used inside it.
//! 2. Tokio and futures have similar but different I/O traits `AsyncRead`, `AsyncWrite`,
//! `AsyncBufRead`, and `AsyncSeek`.
//! - Solution: When the [`Compat`] adapter is applied to an I/O type, it will implement traits
Expand Down Expand Up @@ -207,7 +207,7 @@ pin_project! {
if this.inner.is_some() {
// If the inner future wasn't moved out using into_inner,
// enter the tokio context while the inner value is dropped.
let _guard = TOKIO1.enter();
let _guard = TOKIO1.handle.enter();
this.project().inner.set(None);
}
}
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<T: Future> Future for Compat<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _guard = TOKIO1.enter();
let _guard = TOKIO1.handle.enter();
self.get_pin_mut().poll(cx)
}
}
Expand Down Expand Up @@ -452,17 +452,38 @@ impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
}
}

static TOKIO1: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-compat/tokio-1".to_string())
.spawn(move || TOKIO1.block_on(Pending))
.unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime")
static TOKIO1: Lazy<GlobalRuntime> = Lazy::new(|| {
let mut fallback_rt = None;
let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| {
thread::Builder::new()
.name("async-compat/tokio-1".into())
.spawn(move || TOKIO1.fallback_rt.as_ref().unwrap().block_on(Pending))
.unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("cannot start tokio-1 runtime");

let handle = rt.handle().clone();

fallback_rt = Some(rt);

handle
});

GlobalRuntime {
handle,
fallback_rt,
}
});

struct GlobalRuntime {
/// The handle used for all `Compat` futures.
handle: tokio::runtime::Handle,
/// Only used if we couldn't acquire a handle to a runtime on creation.
fallback_rt: Option<tokio::runtime::Runtime>,
}

struct Pending;

impl Future for Pending {
Expand All @@ -472,3 +493,32 @@ impl Future for Pending {
Poll::Pending
}
}

#[cfg(test)]
mod tests {
use crate::{CompatExt, TOKIO1};

#[test]
fn existing_tokio_runtime_is_reused_by_compat() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { println!("foo") }.compat());

assert!(TOKIO1.fallback_rt.is_none());
}

#[test]
fn tokio_runtime_is_reused_even_after_it_exits() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { println!("foo") });

futures::executor::block_on(async { println!("foo") }.compat());

assert!(TOKIO1.fallback_rt.is_none());
}
}

0 comments on commit bbf268f

Please sign in to comment.