Skip to content

Commit

Permalink
Introduce async-dispatcher runtime feature
Browse files Browse the repository at this point in the history
This allows us to use ZeroMQ with native schedulers instead of relying directly on
Tokio or async-std's `spawn`.

Co-authored-by: Kyle <[email protected]>
  • Loading branch information
maxbrunsfeld and rgbkrk committed May 31, 2024
1 parent d3ed228 commit 47c5960
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/main-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ jobs:
with:
command: test
args: --all --no-default-features --features async-std-runtime,all-transport
- name: Test Async-dispatcher version
uses: actions-rs/cargo@v1
with:
command: test
args: --all --no-default-features --features async-dispatcher-runtime,all-transport

fmt:
name: Formatting
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ rust-version = "1.62.0"
default = ["tokio-runtime", "all-transport"]
tokio-runtime = ["tokio", "tokio-util"]
async-std-runtime = ["async-std"]
async-dispatcher-runtime = ["async-std", "async-dispatcher"]
all-transport = ["ipc-transport", "tcp-transport"]
ipc-transport = []
tcp-transport = []

[dependencies]
async-dispatcher = { version = "0.1", optional = true }
thiserror = "1"
futures-channel = { version = "0.3", features = ["sink"] }
futures-io = "0.3"
Expand Down
8 changes: 6 additions & 2 deletions benches/req_rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ async fn setup(endpoint: &str) -> (ReqSocket, RepSocket) {
fn criterion_benchmark(c: &mut Criterion) {
#[cfg(feature = "tokio-runtime")]
type Runtime = tokio::runtime::Runtime;
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
type Runtime = ();

#[cfg(feature = "tokio-runtime")]
let mut rt = tokio::runtime::Runtime::new().unwrap();
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
let mut rt = ();

const N_MSG: u32 = 512;
Expand All @@ -48,6 +48,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let (req, rep) = rt.block_on(setup(endpoint));
#[cfg(feature = "async-std-runtime")]
let (req, rep) = async_std::task::block_on(setup(endpoint));
#[cfg(feature = "async-dispatcher-runtime")]
let (req, rep) = async_dispatcher::block_on(setup(endpoint));

let (mut req, mut rep) = (Some(req), Some(rep));

Expand All @@ -57,6 +59,8 @@ fn criterion_benchmark(c: &mut Criterion) {
rt.block_on(iter_fn(&mut req, &mut rep));
#[cfg(feature = "async-std-runtime")]
async_std::task::block_on(iter_fn(&mut req, &mut rep));
#[cfg(feature = "async-dispatcher-runtime")]
async_dispatcher::block_on(iter_fn(&mut req, &mut rep));
})
});
}
Expand Down
8 changes: 8 additions & 0 deletions examples/async_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ pub async fn sleep(duration: std::time::Duration) {
pub async fn sleep(duration: std::time::Duration) {
async_std::task::sleep(duration).await
}

#[allow(unused_imports)]
#[cfg(feature = "async-dispatcher-runtime")]
pub use async_dispatcher::{main, test};

#[allow(unused)]
#[cfg(feature = "async-dispatcher-runtime")]
pub use async_dispatcher::sleep;
3 changes: 3 additions & 0 deletions src/async_rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ pub use tokio::{main, test};
extern crate async_std;
#[cfg(feature = "async-std-runtime")]
pub use async_std::{main, test};

#[cfg(feature = "async-dispatcher-runtime")]
pub use async_dispatcher::{main, test};

Check failure on line 16 in src/async_rt/mod.rs

View workflow job for this annotation

GitHub Actions / Test

unresolved imports `async_dispatcher::main`, `async_dispatcher::test`

Check failure on line 16 in src/async_rt/mod.rs

View workflow job for this annotation

GitHub Actions / Test

unresolved imports `async_dispatcher::main`, `async_dispatcher::test`
4 changes: 3 additions & 1 deletion src/async_rt/task/join_handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "async-dispatcher-runtime")]
use async_dispatcher as rt_task;
#[cfg(feature = "async-std-runtime")]
use async_std::task as rt_task;
#[cfg(feature = "tokio-runtime")]
Expand All @@ -17,7 +19,7 @@ impl<T> Future for JoinHandle<T> {
// In async-std, the program aborts on panic so results arent returned. To
// unify with tokio, we simply make an `Ok` result.
let result = rt_task::JoinHandle::poll(Pin::new(&mut self.0), cx);
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
return result.map(Ok);
#[cfg(feature = "tokio-runtime")]
return result.map_err(|e| e.into());
Expand Down
8 changes: 7 additions & 1 deletion src/async_rt/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ where
let result = tokio::task::spawn(task).into();
#[cfg(feature = "async-std-runtime")]
let result = async_std::task::spawn(task).into();
#[cfg(feature = "async-dispatcher-runtime")]
let result = async_dispatcher::spawn(task).into();

result
}
Expand Down Expand Up @@ -54,7 +56,9 @@ pub async fn sleep(duration: std::time::Duration) {
#[cfg(feature = "tokio-runtime")]
::tokio::time::sleep(duration).await;
#[cfg(feature = "async-std-runtime")]
::async_std::task::sleep(duration).await
::async_std::task::sleep(duration).await;
#[cfg(feature = "async-dispatcher-runtime")]
::async_dispatcher::sleep(duration).await;
}

pub async fn timeout<F, T>(
Expand All @@ -68,6 +72,8 @@ where
let result = ::tokio::time::timeout(duration, f).await?;
#[cfg(feature = "async-std-runtime")]
let result = ::async_std::future::timeout(duration, f).await?;
#[cfg(feature = "async-dispatcher-runtime")]
let result = ::async_dispatcher::timeout(duration, f).await?;

Ok(result)
}
6 changes: 3 additions & 3 deletions src/transport/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "tokio-runtime")]
use tokio::net::{UnixListener, UnixStream};

#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
use async_std::os::unix::net::{UnixListener, UnixStream};

use super::make_framed;
Expand Down Expand Up @@ -39,7 +39,7 @@ where

#[cfg(feature = "tokio-runtime")]
let listener = UnixListener::bind(path)?;
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
let listener = UnixListener::bind(path).await?;

let resolved_addr = listener.local_addr()?;
Expand All @@ -65,7 +65,7 @@ where
}
drop(listener);
if let Some(listener_addr) = listener_addr {
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
use async_std::fs::remove_file;
#[cfg(feature = "tokio-runtime")]
use tokio::fs::remove_file;
Expand Down
2 changes: 1 addition & 1 deletion src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
}

#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
fn make_framed<T>(stream: T) -> FramedIo
where
T: futures_io::AsyncRead + futures_io::AsyncWrite + Send + Sync + 'static,
Expand Down
2 changes: 1 addition & 1 deletion src/transport/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "tokio-runtime")]
use tokio::net::{TcpListener, TcpStream};

#[cfg(feature = "async-std-runtime")]
#[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))]
use async_std::net::{TcpListener, TcpStream};

use super::make_framed;
Expand Down

0 comments on commit 47c5960

Please sign in to comment.