Skip to content

Commit

Permalink
Implement Async{Read,Write} from futures
Browse files Browse the repository at this point in the history
We implement `AsyncRead` and `AsyncWrite` from `futures::io` for
`PipeReader` and `PipeWriter`, respectively. This implementation is
behind a feature flag.
  • Loading branch information
Thomas Scholtes authored and ttiurani committed Oct 25, 2020
1 parent 14aed75 commit 0004c04
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 39 deletions.
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ readme = "README.md"
license = "MIT"
edition = "2018"

[features]
default = ["tokio"]

[dependencies]
tokio = { version = "0.2", features= [] }
tokio = { version = "0.2", features= [], optional = true }
log = "0.4"
futures = { version = "0.3", optional = true }

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }

[package.metadata.docs.rs]
features = ["futures"]


5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[![Documentation](https://docs.rs/async-pipe/badge.svg)](https://docs.rs/async-pipe)
[![MIT](https://img.shields.io/crates/l/async-pipe.svg)](./LICENSE)

Creates an asynchronous piped reader and writer pair using `tokio.rs`.
Creates an asynchronous piped reader and writer pair using `tokio.rs` or
`futures`

[Docs](https://docs.rs/async-pipe)

Expand Down Expand Up @@ -38,4 +39,4 @@ async fn main() {

## Contributing

Your PRs and stars are always welcome.
Your PRs and stars are always welcome.
4 changes: 1 addition & 3 deletions examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use tokio::prelude::*;
async fn main() {
let (mut w, mut r) = async_pipe::pipe();

tokio::spawn(async move {
w.write_all(b"hello world").await.unwrap();
});
let _ = w.write_all(b"hello world").await;

let mut v = Vec::new();
r.read_to_end(&mut v).await.unwrap();
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Creates an asynchronous piped reader and writer pair using `tokio.rs`.
//! Creates an asynchronous piped reader and writer pair using `tokio.rs` and `futures`.
//!
//! # Examples
//!
Expand All @@ -21,6 +21,11 @@
//!
//! tokio::runtime::Runtime::new().unwrap().block_on(run());
//! ```
//!
//! # Featues
//!
//! * `tokio` (default) Implement `AsyncWrite` and `AsyncRead` from `tokio::io`.
//! * `futures` Implement `AsyncWrite` and `AsyncRead` from `futures::io`
use state::State;
use std::sync::{Arc, Mutex};
Expand Down
59 changes: 43 additions & 16 deletions src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::state::{Data, State};
use std::io;
use std::pin::Pin;
use std::ptr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead};

/// The read half of the pipe which implements [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
/// The read half of the pipe
///
/// Implements [`tokio::io::AsyncRead`][tokio-async-read] when feature `tokio` is enabled (the
/// default). Implements [`futures::io::AsyncRead`][futures-async-read] when feature `futures` is
/// enabled.
///
/// [futures-async-read]: https://docs.rs/futures/0.3.5/futures/io/trait.AsyncRead.html
/// [tokio-async-read]: https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncRead.html
pub struct PipeReader {
pub(crate) state: Arc<Mutex<State>>,
}
Expand Down Expand Up @@ -62,21 +69,7 @@ impl PipeReader {
}
len
}
}

impl Drop for PipeReader {
fn drop(&mut self) {
if let Err(err) = self.close() {
log::warn!(
"{}: PipeReader: Failed to close the channel on drop: {}",
env!("CARGO_PKG_NAME"),
err
);
}
}
}

impl AsyncRead for PipeReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
Expand Down Expand Up @@ -123,3 +116,37 @@ impl AsyncRead for PipeReader {
};
}
}

impl Drop for PipeReader {
fn drop(&mut self) {
if let Err(err) = self.close() {
log::warn!(
"{}: PipeReader: Failed to close the channel on drop: {}",
env!("CARGO_PKG_NAME"),
err
);
}
}
}

#[cfg(feature = "tokio")]
impl tokio::io::AsyncRead for PipeReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.poll_read(cx, buf)
}
}

#[cfg(feature = "futures")]
impl futures::io::AsyncRead for PipeReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.poll_read(cx, buf)
}
}
67 changes: 51 additions & 16 deletions src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::state::Data;
use crate::state::State;
use std::io;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{self, AsyncWrite};

/// The write half of the pipe which implements [`AsyncWrite`](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html).
/// The write half of the pipe
///
/// Implements [`tokio::io::AsyncWrite`][tokio-async-write] when feature `tokio` is enabled (the
/// default). Implements [`futures::io::AsyncWrite`][futures-async-write] when feature `futures` is
/// enabled.
///
/// [futures-async-write]: https://docs.rs/futures/0.3.5/futures/io/trait.AsyncWrite.html
/// [tokio-async-write]: https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html
pub struct PipeWriter {
pub(crate) state: Arc<Mutex<State>>,
}
Expand Down Expand Up @@ -54,21 +61,7 @@ impl PipeWriter {
waker.clone().wake();
}
}
}

impl Drop for PipeWriter {
fn drop(&mut self) {
if let Err(err) = self.close() {
log::warn!(
"{}: PipeWriter: Failed to close the channel on drop: {}",
env!("CARGO_PKG_NAME"),
err
);
}
}
}

impl AsyncWrite for PipeWriter {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut state;
match self.state.lock() {
Expand Down Expand Up @@ -142,3 +135,45 @@ impl AsyncWrite for PipeWriter {
}
}
}

impl Drop for PipeWriter {
fn drop(&mut self) {
if let Err(err) = self.close() {
log::warn!(
"{}: PipeWriter: Failed to close the channel on drop: {}",
env!("CARGO_PKG_NAME"),
err
);
}
}
}

#[cfg(feature = "tokio")]
impl tokio::io::AsyncWrite for PipeWriter {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.poll_shutdown(cx)
}
}

#[cfg(feature = "futures")]
impl futures::io::AsyncWrite for PipeWriter {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.poll_shutdown(cx)
}
}

0 comments on commit 0004c04

Please sign in to comment.