Skip to content

Commit

Permalink
stream: add next and map utility fn (#1962)
Browse files Browse the repository at this point in the history
Introduces `StreamExt` trait. This trait will be used to add utility functions
to make working with streams easier. This patch includes two functions:

* `next`: a future returning the item in the stream.
* `map`: transform each item in the stream.
  • Loading branch information
vorot93 authored and carllerche committed Dec 18, 2019
1 parent b0836ec commit 4c64586
Show file tree
Hide file tree
Showing 42 changed files with 295 additions and 57 deletions.
7 changes: 4 additions & 3 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
#![warn(rust_2018_idioms)]

use tokio::net::{TcpListener, TcpStream};
use tokio::stream::{Stream, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};

use futures::{SinkExt, Stream, StreamExt};
use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
Expand Down Expand Up @@ -163,12 +164,12 @@ impl Stream for Peer {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First poll the `UnboundedReceiver`.

if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) {
if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Received(v))));
}

// Secondly poll the `Framed` stream.
let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx));
let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));

Poll::Ready(match result {
// We've received a message we should broadcast to others.
Expand Down
13 changes: 8 additions & 5 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> {

mod tcp {
use super::codec;
use futures::{future, Sink, SinkExt, Stream, StreamExt};
use futures::StreamExt;
use futures::{future, Sink, SinkExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};

pub async fn connect(
addr: &SocketAddr,
stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
let sink = FramedWrite::new(w, codec::Bytes);
let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
Expand All @@ -78,7 +80,7 @@ mod tcp {
})
.map(Ok);

match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await {
(Err(e), _) | (_, Err(e)) => Err(e.into()),
_ => Ok(()),
}
Expand All @@ -88,8 +90,9 @@ mod tcp {
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
use tokio::stream::{Stream, StreamExt};

use futures::{future, Sink, SinkExt, Stream, StreamExt};
use futures::{future, Sink, SinkExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_util::codec::{BytesCodec, Decoder};

use futures::StreamExt;
use std::env;

#[tokio::main]
Expand Down
3 changes: 2 additions & 1 deletion examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};

use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
Expand Down
3 changes: 2 additions & 1 deletion examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
#![warn(rust_2018_idioms)]

use bytes::BytesMut;
use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use http::{header::HeaderValue, Request, Response, StatusCode};
#[macro_use]
extern crate serde_derive;
use serde_json;
use std::{env, error::Error, fmt, io};
use tokio::net::{TcpListener, TcpStream};
use tokio::stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};

#[tokio::main]
Expand Down
3 changes: 2 additions & 1 deletion examples/udp-codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
#![warn(rust_2018_idioms)]

use tokio::net::UdpSocket;
use tokio::stream::StreamExt;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

use bytes::Bytes;
use futures::{FutureExt, SinkExt, StreamExt};
use futures::{FutureExt, SinkExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
Expand Down
2 changes: 1 addition & 1 deletion tokio-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Testing utilities for Tokio- and futures-based code
categories = ["asynchronous", "testing"]

[dependencies]
tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "sync", "time", "test-util"] }
tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "stream", "sync", "time", "test-util"] }

bytes = "0.5.0"
futures-core = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion tokio-test/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
#![allow(clippy::mutex_atomic)]

use futures_core::Stream;
use std::future::Future;
use std::mem;
use std::ops;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

use tokio::stream::Stream;

/// TOOD: dox
pub fn spawn<T>(task: T) -> Spawn<T> {
Spawn {
Expand Down
2 changes: 1 addition & 1 deletion tokio-tls/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use cfg_if::cfg_if;
use env_logger;
use futures::join;
use futures::stream::StreamExt;
use native_tls;
use native_tls::{Identity, TlsAcceptor, TlsConnector};
use std::io::Write;
Expand All @@ -12,6 +11,7 @@ use std::process::Command;
use std::ptr;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, Error, ErrorKind};
use tokio::net::{TcpListener, TcpStream};
use tokio::stream::StreamExt;
use tokio_tls;

macro_rules! t {
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ default = []
# Shorthand for enabling everything
full = ["codec", "udp"]

codec = []
codec = ["tokio/stream"]
udp = ["tokio/udp"]

[dependencies]
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/src/codec/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use crate::codec::encoder::Encoder;
use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};

use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream};

use bytes::BytesMut;
use futures_core::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::fmt;
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/src/codec/framed_read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::codec::framed::{Fuse, ProjectFuse};
use crate::codec::Decoder;

use tokio::io::AsyncRead;
use tokio::{io::AsyncRead, stream::Stream};

use bytes::BytesMut;
use futures_core::Stream;
use futures_sink::Sink;
use log::trace;
use pin_project_lite::pin_project;
Expand Down
4 changes: 2 additions & 2 deletions tokio-util/src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use crate::codec::framed::{Fuse, ProjectFuse};

use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream};

use bytes::BytesMut;
use futures_core::{ready, Stream};
use futures_core::ready;
use futures_sink::Sink;
use log::trace;
use pin_project_lite::pin_project;
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
//!
//! [`AsyncRead`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncRead.html
//! [`AsyncWrite`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncWrite.html
//! [`Stream`]: https://docs.rs/tokio/*/tokio/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html
//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
mod bytes_codec;
pub use self::bytes_codec::BytesCodec;
Expand Down
4 changes: 2 additions & 2 deletions tokio-util/src/udp/frame.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::codec::{Decoder, Encoder};

use tokio::net::UdpSocket;
use tokio::{net::UdpSocket, stream::Stream};

use bytes::{BufMut, BytesMut};
use futures_core::{ready, Stream};
use futures_core::ready;
use futures_sink::Sink;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/tests/framed.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![warn(rust_2018_idioms)]

use tokio::prelude::*;
use tokio::{prelude::*, stream::StreamExt};
use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};

use bytes::{Buf, BufMut, BytesMut};
use futures::StreamExt;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use tokio::net::UdpSocket;
use tokio::{net::UdpSocket, stream::StreamExt};
use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;

use bytes::{BufMut, BytesMut};
use futures::future::try_join;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::io;

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/fs/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
///
/// [`read_dir`]: read_dir
/// [`DirEntry`]: DirEntry
/// [`Stream`]: futures_core::Stream
/// [`Stream`]: crate::stream::Stream
/// [`Err`]: std::result::Result::Err
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
Expand Down Expand Up @@ -85,7 +85,7 @@ impl ReadDir {
}

#[cfg(feature = "stream")]
impl futures_core::Stream for ReadDir {
impl crate::stream::Stream for ReadDir {
type Item = io::Result<DirEntry>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/async_buf_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ cfg_io_util! {
///
/// ```
/// use tokio::io::AsyncBufReadExt;
/// use tokio::stream::StreamExt;
///
/// use futures::{StreamExt};
/// use std::io::Cursor;
///
/// #[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
}

#[cfg(feature = "stream")]
impl<R: AsyncBufRead> futures_core::Stream for Lines<R> {
impl<R: AsyncBufRead> crate::stream::Stream for Lines<R> {
type Item = io::Result<String>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
}

#[cfg(feature = "stream")]
impl<R: AsyncBufRead> futures_core::Stream for Split<R> {
impl<R: AsyncBufRead> crate::stream::Stream for Split<R> {
type Item = io::Result<Vec<u8>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ cfg_signal! {
pub mod signal;
}

cfg_stream! {
pub mod stream;
}

cfg_sync! {
pub mod sync;
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Incoming<'_> {
}

#[cfg(feature = "stream")]
impl futures_core::Stream for Incoming<'_> {
impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<TcpStream>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,7 @@ impl TcpListener {
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpListener;
///
/// use futures::StreamExt;
/// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Incoming<'_> {
}

#[cfg(feature = "stream")]
impl futures_core::Stream for Incoming<'_> {
impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/net/unix/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl UnixListener {
///
/// ```no_run
/// use tokio::net::UnixListener;
///
/// use futures::StreamExt;
/// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl Signal {
}

cfg_stream! {
impl futures_core::Stream for Signal {
impl crate::stream::Stream for Signal {
type Item = ();

fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/signal/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl CtrlBreak {
}

cfg_stream! {
impl futures_core::Stream for CtrlBreak {
impl crate::stream::Stream for CtrlBreak {
type Item = ();

fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
Expand Down Expand Up @@ -246,9 +246,9 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
mod tests {
use super::*;
use crate::runtime::Runtime;
use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
use crate::stream::StreamExt;

use futures::stream::StreamExt;
use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};

#[test]
fn ctrl_c() {
Expand Down
Loading

0 comments on commit 4c64586

Please sign in to comment.