diff --git a/.github/workflows/main-ci.yml b/.github/workflows/main-ci.yml index 3eb3fb8..9b0b457 100644 --- a/.github/workflows/main-ci.yml +++ b/.github/workflows/main-ci.yml @@ -59,6 +59,11 @@ jobs: with: command: test args: --all --no-default-features --features async-std-runtime,all-transport + - name: Test smol version + uses: actions-rs/cargo@v1 + with: + command: test + args: --all --no-default-features --features smol-runtime,all-transport fmt: name: Formatting diff --git a/Cargo.toml b/Cargo.toml index bf82a17..514273e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,14 +9,17 @@ repository = "https://github.com/zeromq/zmq.rs" rust-version = "1.62.0" [features] -default = ["tokio-runtime", "all-transport"] +default = ["smol-runtime", "tcp-transport"] tokio-runtime = ["tokio", "tokio-util"] async-std-runtime = ["async-std"] +smol-runtime = ["smol", "async-std", "async-net", "async-io"] all-transport = ["ipc-transport", "tcp-transport"] ipc-transport = [] tcp-transport = [] [dependencies] +async-io = { version = "2.3.2", optional = true } +async-net = { version = "2.0.0", optional = true } thiserror = "1" futures-channel = { version = "0.3", features = ["sink"] } futures-io = "0.3" @@ -26,13 +29,17 @@ async-trait = "0.1" parking_lot = "0.12" rand = "0.8" bytes = "1" +smol = { version = "0.3.2", optional = true } tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["compat"], optional = true } num-traits = "0.2" dashmap = "5" crossbeam-queue = "0.3" uuid = { version = "1", features = ["v4"] } -regex = { version = "1", default-features = false, features = ["std", "unicode-perl"] } +regex = { version = "1", default-features = false, features = [ + "std", + "unicode-perl", +] } once_cell = "1" log = "0.4" asynchronous-codec = "0.7" diff --git a/benches/req_rep.rs b/benches/req_rep.rs index e03dae7..e330bb6 100644 --- a/benches/req_rep.rs +++ b/benches/req_rep.rs @@ -27,11 +27,15 @@ fn criterion_benchmark(c: &mut Criterion) { type Runtime = tokio::runtime::Runtime; #[cfg(feature = "async-std-runtime")] type Runtime = (); + #[cfg(feature = "smol-runtime")] + type Runtime = (); #[cfg(feature = "tokio-runtime")] let mut rt = tokio::runtime::Runtime::new().unwrap(); #[cfg(feature = "async-std-runtime")] let mut rt = (); + #[cfg(feature = "smol-runtime")] + let mut rt = (); const N_MSG: u32 = 512; @@ -48,6 +52,8 @@ fn criterion_benchmark(c: &mut Criterion) { let (req, rep) = rt.block_on(setup(endpoint)); #[cfg(feature = "async-std-runtime")] let (req, rep) = async_std::task::block_on(setup(endpoint)); + #[cfg(feature = "smol-runtime")] + let (req, rep) = smol::block_on(setup(endpoint)); let (mut req, mut rep) = (Some(req), Some(rep)); @@ -57,6 +63,8 @@ fn criterion_benchmark(c: &mut Criterion) { rt.block_on(iter_fn(&mut req, &mut rep)); #[cfg(feature = "async-std-runtime")] async_std::task::block_on(iter_fn(&mut req, &mut rep)); + #[cfg(feature = "smol-runtime")] + smol::block_on(iter_fn(&mut req, &mut rep)); }) }); } diff --git a/src/async_rt/mod.rs b/src/async_rt/mod.rs index ac897c0..76cea7a 100644 --- a/src/async_rt/mod.rs +++ b/src/async_rt/mod.rs @@ -11,3 +11,8 @@ pub use tokio::{main, test}; extern crate async_std; #[cfg(feature = "async-std-runtime")] pub use async_std::{main, test}; + +#[cfg(feature = "smol-runtime")] +extern crate smol; +#[cfg(feature = "smol-runtime")] +pub use async_std::{main, test}; diff --git a/src/async_rt/task/join_handle.rs b/src/async_rt/task/join_handle.rs index 693ac31..48cd663 100644 --- a/src/async_rt/task/join_handle.rs +++ b/src/async_rt/task/join_handle.rs @@ -1,30 +1,31 @@ #[cfg(feature = "async-std-runtime")] use async_std::task as rt_task; +#[cfg(feature = "smol-runtime")] +use smol::Task as SmolTask; #[cfg(feature = "tokio-runtime")] use tokio::task as rt_task; +pub struct JoinHandle(pub(crate) JoinHandleImpl); + use super::JoinError; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct JoinHandle(rt_task::JoinHandle); -impl Future for JoinHandle { - type Output = Result; +#[cfg(feature = "smol-runtime")] +type JoinHandleImpl = SmolTask; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // In async-std, the program aborts on panic so results arent returned. To - // unify with tokio, we simply make an `Ok` result. - let result = rt_task::JoinHandle::poll(Pin::new(&mut self.0), cx); - #[cfg(feature = "async-std-runtime")] - return result.map(Ok); - #[cfg(feature = "tokio-runtime")] - return result.map_err(|e| e.into()); +impl JoinHandle { + pub(crate) fn new(handle: JoinHandleImpl) -> Self { + JoinHandle(handle) } } -impl From> for JoinHandle { - fn from(h: rt_task::JoinHandle) -> Self { - Self(h) + +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) } } diff --git a/src/async_rt/task/mod.rs b/src/async_rt/task/mod.rs index bc001e4..05c573a 100644 --- a/src/async_rt/task/mod.rs +++ b/src/async_rt/task/mod.rs @@ -5,6 +5,9 @@ pub use join_handle::JoinHandle; use std::any::Any; use std::future::Future; +#[cfg(feature = "smol-runtime")] +use smol::Task; + #[track_caller] pub fn spawn(task: T) -> JoinHandle where @@ -15,6 +18,8 @@ where let result = tokio::task::spawn(task).into(); #[cfg(feature = "async-std-runtime")] let result = async_std::task::spawn(task).into(); + #[cfg(feature = "smol-runtime")] + let result = JoinHandle::new(Task::spawn(task).into()); result } @@ -54,7 +59,9 @@ pub async fn sleep(duration: std::time::Duration) { #[cfg(feature = "tokio-runtime")] ::tokio::time::sleep(duration).await; #[cfg(feature = "async-std-runtime")] - ::async_std::task::sleep(duration).await + ::async_std::task::sleep(duration).await; + #[cfg(feature = "smol-runtime")] + ::async_io::Timer::after(duration).await; } pub async fn timeout( @@ -68,6 +75,8 @@ where let result = ::tokio::time::timeout(duration, f).await?; #[cfg(feature = "async-std-runtime")] let result = ::async_std::future::timeout(duration, f).await?; + #[cfg(feature = "smol-runtime")] + let result = ::smol::future::timeout(duration, f).await?; Ok(result) } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index d8c67e4..500774c 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -110,3 +110,13 @@ where let (read, write) = stream.split(); FramedIo::new(Box::new(read), Box::new(write)) } + +#[cfg(feature = "smol-runtime")] +fn make_framed(stream: T) -> FramedIo +where + T: smol::io::AsyncRead + smol::io::AsyncWrite + Send + Sync + 'static, +{ + use futures_util::AsyncReadExt; + let (read, write) = stream.split(); + FramedIo::new(Box::new(read), Box::new(write)) +} diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index e676e58..b192040 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -4,6 +4,9 @@ use tokio::net::{TcpListener, TcpStream}; #[cfg(feature = "async-std-runtime")] use async_std::net::{TcpListener, TcpStream}; +#[cfg(feature = "smol-runtime")] +use async_net::{TcpListener, TcpStream}; + use super::make_framed; use super::AcceptStopHandle; use crate::async_rt;