diff --git a/examples/chat.rs b/examples/chat.rs index 91589072bbf..e1da5f325a9 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -27,10 +27,11 @@ #![warn(rust_2018_idioms)] use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::{Stream, StreamExt}; use tokio::sync::{mpsc, Mutex}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; -use futures::{SinkExt, Stream, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; @@ -163,12 +164,12 @@ impl Stream for Peer { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // First poll the `UnboundedReceiver`. - if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) { + if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) { return Poll::Ready(Some(Ok(Message::Received(v)))); } // Secondly poll the `Framed` stream. - let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx)); + let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx)); Poll::Ready(match result { // We've received a message we should broadcast to others. diff --git a/examples/connect.rs b/examples/connect.rs index d51af88c970..75640c6270f 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box> { mod tcp { use super::codec; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::StreamExt; + use futures::{future, Sink, SinkExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; + use tokio::stream::Stream; use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream, io::Error>> + Unpin, + mut stdin: impl Stream, io::Error>> + Unpin, mut stdout: impl Sink, Error = io::Error> + Unpin, ) -> Result<(), Box> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let sink = FramedWrite::new(w, codec::Bytes); + let mut sink = FramedWrite::new(w, codec::Bytes); let mut stream = FramedRead::new(r, codec::Bytes) .filter_map(|i| match i { Ok(i) => future::ready(Some(i)), @@ -78,7 +80,7 @@ mod tcp { }) .map(Ok); - match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { (Err(e), _) | (_, Err(e)) => Err(e.into()), _ => Ok(()), } @@ -88,8 +90,9 @@ mod tcp { mod udp { use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; + use tokio::stream::{Stream, StreamExt}; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::{future, Sink, SinkExt}; use std::error::Error; use std::io; use std::net::SocketAddr; diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index 4604139b4b8..d650b5bd0de 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,9 +55,9 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{BytesCodec, Decoder}; -use futures::StreamExt; use std::env; #[tokio::main] diff --git a/examples/tinydb.rs b/examples/tinydb.rs index cf867a0a6e6..7c71dedf62f 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -42,9 +42,10 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{Framed, LinesCodec}; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 5ddf0d486d6..9ac2806e947 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -14,13 +14,14 @@ #![warn(rust_2018_idioms)] use bytes::BytesMut; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use http::{header::HeaderValue, Request, Response, StatusCode}; #[macro_use] extern crate serde_derive; use serde_json; use std::{env, error::Error, fmt, io}; use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::StreamExt; use tokio_util::codec::{Decoder, Encoder, Framed}; #[tokio::main] diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 6b3f84a0f05..dc30394fd6a 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,12 +9,13 @@ #![warn(rust_2018_idioms)] use tokio::net::UdpSocket; +use tokio::stream::StreamExt; use tokio::{io, time}; use tokio_util::codec::BytesCodec; use tokio_util::udp::UdpFramed; use bytes::Bytes; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt}; use std::env; use std::error::Error; use std::net::SocketAddr; diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index a2902dc53b4..a1e60500db2 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -20,7 +20,7 @@ Testing utilities for Tokio- and futures-based code categories = ["asynchronous", "testing"] [dependencies] -tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "sync", "time", "test-util"] } +tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "stream", "sync", "time", "test-util"] } bytes = "0.5.0" futures-core = "0.3.0" diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 4790de54282..71ebe7b41a6 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -2,7 +2,6 @@ #![allow(clippy::mutex_atomic)] -use futures_core::Stream; use std::future::Future; use std::mem; use std::ops; @@ -10,6 +9,8 @@ use std::pin::Pin; use std::sync::{Arc, Condvar, Mutex}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use tokio::stream::Stream; + /// TOOD: dox pub fn spawn(task: T) -> Spawn { Spawn { diff --git a/tokio-tls/tests/smoke.rs b/tokio-tls/tests/smoke.rs index 64dda6ab0b6..a575ba859ab 100644 --- a/tokio-tls/tests/smoke.rs +++ b/tokio-tls/tests/smoke.rs @@ -3,7 +3,6 @@ use cfg_if::cfg_if; use env_logger; use futures::join; -use futures::stream::StreamExt; use native_tls; use native_tls::{Identity, TlsAcceptor, TlsConnector}; use std::io::Write; @@ -12,6 +11,7 @@ use std::process::Command; use std::ptr; use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, Error, ErrorKind}; use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::StreamExt; use tokio_tls; macro_rules! t { diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 7f40660ac80..c2b83565b0c 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -26,7 +26,7 @@ default = [] # Shorthand for enabling everything full = ["codec", "udp"] -codec = [] +codec = ["tokio/stream"] udp = ["tokio/udp"] [dependencies] diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index 0c5ef9f6fab..a3715c2445b 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -3,10 +3,9 @@ use crate::codec::encoder::Encoder; use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream}; use bytes::BytesMut; -use futures_core::Stream; use futures_sink::Sink; use pin_project_lite::pin_project; use std::fmt; diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index bd1f625b0c2..ca9464b50e6 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,10 +1,9 @@ use crate::codec::framed::{Fuse, ProjectFuse}; use crate::codec::Decoder; -use tokio::io::AsyncRead; +use tokio::{io::AsyncRead, stream::Stream}; use bytes::BytesMut; -use futures_core::Stream; use futures_sink::Sink; use log::trace; use pin_project_lite::pin_project; diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 9aed7ea3ce8..2ef91c7c5ca 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -2,10 +2,10 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; use crate::codec::framed::{Fuse, ProjectFuse}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream}; use bytes::BytesMut; -use futures_core::{ready, Stream}; +use futures_core::ready; use futures_sink::Sink; use log::trace; use pin_project_lite::pin_project; diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index b162dd3a788..4b1b86fbb4d 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -6,8 +6,8 @@ //! //! [`AsyncRead`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncRead.html //! [`AsyncWrite`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncWrite.html +//! [`Stream`]: https://docs.rs/tokio/*/tokio/stream/trait.Stream.html //! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html -//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html mod bytes_codec; pub use self::bytes_codec::BytesCodec; diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index a6c6f220704..18ee106e59c 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -1,9 +1,9 @@ use crate::codec::{Decoder, Encoder}; -use tokio::net::UdpSocket; +use tokio::{net::UdpSocket, stream::Stream}; use bytes::{BufMut, BytesMut}; -use futures_core::{ready, Stream}; +use futures_core::ready; use futures_sink::Sink; use std::io; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index b98df7368d7..cb82f8df890 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -1,11 +1,10 @@ #![warn(rust_2018_idioms)] -use tokio::prelude::*; +use tokio::{prelude::*, stream::StreamExt}; use tokio_test::assert_ok; use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts}; use bytes::{Buf, BufMut, BytesMut}; -use futures::StreamExt; use std::io::{self, Read}; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index af8002bd800..51c65c630d0 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,4 +1,4 @@ -use tokio::net::UdpSocket; +use tokio::{net::UdpSocket, stream::StreamExt}; use tokio_util::codec::{Decoder, Encoder}; use tokio_util::udp::UdpFramed; @@ -6,7 +6,6 @@ use bytes::{BufMut, BytesMut}; use futures::future::try_join; use futures::future::FutureExt; use futures::sink::SinkExt; -use futures::stream::StreamExt; use std::io; #[tokio::test] diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index d252eabb999..06eed384beb 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -36,7 +36,7 @@ pub async fn read_dir(path: impl AsRef) -> io::Result { /// /// [`read_dir`]: read_dir /// [`DirEntry`]: DirEntry -/// [`Stream`]: futures_core::Stream +/// [`Stream`]: crate::stream::Stream /// [`Err`]: std::result::Result::Err #[derive(Debug)] #[must_use = "streams do nothing unless polled"] @@ -85,7 +85,7 @@ impl ReadDir { } #[cfg(feature = "stream")] -impl futures_core::Stream for ReadDir { +impl crate::stream::Stream for ReadDir { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index 26ce28cad84..b2930652aba 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -226,8 +226,8 @@ cfg_io_util! { /// /// ``` /// use tokio::io::AsyncBufReadExt; + /// use tokio::stream::StreamExt; /// - /// use futures::{StreamExt}; /// use std::io::Cursor; /// /// #[tokio::main] diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 0f1d946a805..f0e75de4b18 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -91,7 +91,7 @@ where } #[cfg(feature = "stream")] -impl futures_core::Stream for Lines { +impl crate::stream::Stream for Lines { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index a2e168de12d..f1ed2fd89d3 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -89,7 +89,7 @@ where } #[cfg(feature = "stream")] -impl futures_core::Stream for Split { +impl crate::stream::Stream for Split { type Item = io::Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 66fd79759bb..831a5241d7e 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -241,6 +241,10 @@ cfg_signal! { pub mod signal; } +cfg_stream! { + pub mod stream; +} + cfg_sync! { pub mod sync; } diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs index 3033aefa33c..0abe047d05f 100644 --- a/tokio/src/net/tcp/incoming.rs +++ b/tokio/src/net/tcp/incoming.rs @@ -28,7 +28,7 @@ impl Incoming<'_> { } #[cfg(feature = "stream")] -impl futures_core::Stream for Incoming<'_> { +impl crate::stream::Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 2cddf24b9f7..3e0e0f7d6c2 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -250,9 +250,7 @@ impl TcpListener { /// # Examples /// /// ```no_run - /// use tokio::net::TcpListener; - /// - /// use futures::StreamExt; + /// use tokio::{net::TcpListener, stream::StreamExt}; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs index dbe964a8d29..bede96dd4ba 100644 --- a/tokio/src/net/unix/incoming.rs +++ b/tokio/src/net/unix/incoming.rs @@ -27,7 +27,7 @@ impl Incoming<'_> { } #[cfg(feature = "stream")] -impl futures_core::Stream for Incoming<'_> { +impl crate::stream::Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 311bae2c70d..4b93bf827cb 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -104,8 +104,7 @@ impl UnixListener { /// /// ```no_run /// use tokio::net::UnixListener; - /// - /// use futures::StreamExt; + /// use tokio::stream::StreamExt; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index cd326424bdb..bfa2e3097be 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -482,7 +482,7 @@ impl Signal { } cfg_stream! { - impl futures_core::Stream for Signal { + impl crate::stream::Stream for Signal { type Item = (); fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index de35643e5b6..def1a1d742b 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -209,7 +209,7 @@ impl CtrlBreak { } cfg_stream! { - impl futures_core::Stream for CtrlBreak { + impl crate::stream::Stream for CtrlBreak { type Item = (); fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -246,9 +246,9 @@ pub fn ctrl_break() -> io::Result { mod tests { use super::*; use crate::runtime::Runtime; - use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; + use crate::stream::StreamExt; - use futures::stream::StreamExt; + use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; #[test] fn ctrl_c() { diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs new file mode 100644 index 00000000000..dc06495ea02 --- /dev/null +++ b/tokio/src/stream/iter.rs @@ -0,0 +1,52 @@ +use crate::stream::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`iter`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter { + iter: I, +} + +impl Unpin for Iter {} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ``` +/// # async fn dox() { +/// use tokio::stream::{self, StreamExt}; +/// +/// let mut stream = stream::iter(vec![17, 19]); +/// +/// assert_eq!(stream.next().await, Some(17)); +/// assert_eq!(stream.next().await, Some(19)); +/// assert_eq!(stream.next().await, None); +/// # } +/// ``` +pub fn iter(i: I) -> Iter + where I: IntoIterator, +{ + Iter { + iter: i.into_iter(), + } +} + +impl Stream for Iter + where I: Iterator, +{ + type Item = I::Item; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.iter.next()) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs new file mode 100644 index 00000000000..a89769decec --- /dev/null +++ b/tokio/src/stream/map.rs @@ -0,0 +1,57 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map { + #[pin] + stream: St, + f: F, + } +} + +impl fmt::Debug for Map +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map") + .field("stream", &self.stream) + .finish() + } +} + +impl Map + where St: Stream, + F: FnMut(St::Item) -> T, +{ + pub(super) fn new(stream: St, f: F) -> Map { + Map { stream, f } + } +} + +impl Stream for Map + where St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.as_mut() + .project().stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs new file mode 100644 index 00000000000..04e6dc06b25 --- /dev/null +++ b/tokio/src/stream/mod.rs @@ -0,0 +1,93 @@ +//! Stream utilities for Tokio. +//! +//! `Stream`s are an asynchoronous version of standard library's Iterator. +//! +//! This module provides helpers to work with them. + +mod iter; +pub use iter::{iter, Iter}; + +mod map; +use map::Map; + +mod next; +use next::Next; + +pub use futures_core::Stream; + +/// An extension trait for `Stream`s that provides a variety of convenient +/// combinator functions. +pub trait StreamExt: Stream { + /// Creates a future that resolves to the next item in the stream. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map(self, f: F) -> Map + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } +} + +impl StreamExt for T where T: Stream {} diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs new file mode 100644 index 00000000000..3b0a1dd8f64 --- /dev/null +++ b/tokio/src/stream/next.rs @@ -0,0 +1,31 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`next`](super::StreamExt::next) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, +} + +impl Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl Future for Next<'_, St> { + type Output = Option; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + Pin::new(&mut self.stream).poll_next(cx) + } +} diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 7294e4d5622..7c7a5abb87b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -177,7 +177,7 @@ impl Receiver { impl Unpin for Receiver {} cfg_stream! { - impl futures_core::Stream for Receiver { + impl crate::stream::Stream for Receiver { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 4a6ba8eee8e..63d04370d40 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -148,7 +148,7 @@ impl UnboundedReceiver { } #[cfg(feature = "stream")] -impl futures_core::Stream for UnboundedReceiver { +impl crate::stream::Stream for UnboundedReceiver { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 124027f9b54..d6b829829a0 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -268,7 +268,7 @@ impl Receiver { } #[cfg(feature = "stream")] -impl futures_core::Stream for Receiver { +impl crate::stream::Stream for Receiver { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 70daf47daf7..b1c3ee55f73 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -130,7 +130,7 @@ impl Interval { } #[cfg(feature = "stream")] -impl futures_core::Stream for Interval { +impl crate::stream::Stream for Interval { type Item = Instant; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/time/throttle.rs b/tokio/src/time/throttle.rs index 2daa30fcb25..ccb28ad8cdf 100644 --- a/tokio/src/time/throttle.rs +++ b/tokio/src/time/throttle.rs @@ -1,5 +1,6 @@ //! Slow down a stream by enforcing a delay between items. +use crate::stream::Stream; use crate::time::{Delay, Duration, Instant}; use std::future::Future; @@ -7,7 +8,6 @@ use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; -use futures_core::Stream; use pin_project_lite::pin_project; /// Slow down a stream by enforcing a delay between items. @@ -17,8 +17,8 @@ use pin_project_lite::pin_project; /// /// Create a throttled stream. /// ```rust,norun -/// use futures::stream::StreamExt; /// use std::time::Duration; +/// use tokio::stream::StreamExt; /// use tokio::time::throttle; /// /// # async fn dox() { diff --git a/tokio/tests/fs_dir.rs b/tokio/tests/fs_dir.rs index c8b32fc6b0e..eaff59da4f9 100644 --- a/tokio/tests/fs_dir.rs +++ b/tokio/tests/fs_dir.rs @@ -71,7 +71,7 @@ async fn read_inherent() { #[tokio::test] async fn read_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let base_dir = tempdir().unwrap(); diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs index 3775cae05de..2f6b3393b99 100644 --- a/tokio/tests/io_lines.rs +++ b/tokio/tests/io_lines.rs @@ -20,7 +20,7 @@ async fn lines_inherent() { #[tokio::test] async fn lines_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let rd: &[u8] = b"hello\r\nworld\n\n"; let mut st = rd.lines(); diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index e1f99595da5..7e5c60e2e2f 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -42,7 +42,7 @@ fn send_recv_with_buffer() { #[tokio::test] async fn send_recv_stream_with_buffer() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (mut tx, mut rx) = mpsc::channel::(16); @@ -147,7 +147,7 @@ async fn async_send_recv_unbounded() { #[tokio::test] async fn send_recv_stream_unbounded() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (tx, mut rx) = mpsc::unbounded_channel::(); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 409615d45cb..2bc5bb2a85a 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -195,7 +195,7 @@ fn poll_close() { #[test] fn stream_impl() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (tx, mut rx) = watch::channel("one"); diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 6a1c756fe0d..1123681f492 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -45,7 +45,7 @@ async fn usage() { #[tokio::test] async fn usage_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let start = Instant::now(); let mut interval = time::interval(ms(10));