diff --git a/futures-util/src/stream/stream/ready_chunks.rs b/futures-util/src/stream/stream/ready_chunks.rs index 49116d49e..ab8637f27 100644 --- a/futures-util/src/stream/stream/ready_chunks.rs +++ b/futures-util/src/stream/stream/ready_chunks.rs @@ -1,6 +1,5 @@ use crate::stream::Fuse; use alloc::vec::Vec; -use core::mem; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -15,7 +14,6 @@ pin_project! { pub struct ReadyChunks { #[pin] stream: Fuse, - items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } @@ -24,11 +22,7 @@ impl ReadyChunks { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: super::Fuse::new(stream), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: super::Fuse::new(stream), cap: capacity } } delegate_access_inner!(stream, St, (.)); @@ -40,40 +34,33 @@ impl Stream for ReadyChunks { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + let mut items: Vec = Vec::new(); + loop { match this.stream.as_mut().poll_next(cx) { // Flush all collected data if underlying stream doesn't contain // more ready values Poll::Pending => { - return if this.items.is_empty() { - Poll::Pending - } else { - Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) - } + return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } } // Push the ready item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Poll::Ready(Some(item)) => { - this.items.push(item); - if this.items.len() >= *this.cap { - return Poll::Ready(Some(mem::replace( - this.items, - Vec::with_capacity(*this.cap), - ))); + if items.is_empty() { + items.reserve(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(items)); } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. Poll::Ready(None) => { - let last = if this.items.is_empty() { - None - } else { - let full_buf = mem::take(this.items); - Some(full_buf) - }; + let last = if items.is_empty() { None } else { Some(items) }; return Poll::Ready(last); } @@ -82,20 +69,15 @@ impl Stream for ReadyChunks { } fn size_hint(&self) -> (usize, Option) { - let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = (lower / self.cap).saturating_add(chunk_len); - let upper = match upper { - Some(x) => x.checked_add(chunk_len), - None => None, - }; + let lower = lower / self.cap; (lower, upper) } } impl FusedStream for ReadyChunks { fn is_terminated(&self) -> bool { - self.stream.is_terminated() && self.items.is_empty() + self.stream.is_terminated() } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 6fa7adef6..2a0d624de 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1492,10 +1492,10 @@ pub mod stream { assert_not_impl!(PollImmediate: Unpin); assert_impl!(ReadyChunks>: Send); - assert_not_impl!(ReadyChunks: Send); + assert_impl!(ReadyChunks: Send); assert_not_impl!(ReadyChunks: Send); assert_impl!(ReadyChunks>: Sync); - assert_not_impl!(ReadyChunks: Sync); + assert_impl!(ReadyChunks: Sync); assert_not_impl!(ReadyChunks: Sync); assert_impl!(ReadyChunks: Unpin); assert_not_impl!(ReadyChunks: Unpin);