Skip to content
This repository has been archived by the owner on Feb 14, 2023. It is now read-only.

Commit

Permalink
Reading cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryAstafyev committed Jun 14, 2022
1 parent 4c59a0f commit 6eabdbb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ pub struct BufReader<R, P = StdPolicy>{
buf: Buffer,
inner: R,
policy: P,
// We need field "empty" to return empty &[u8] in case of reader is paused
// This field is never used, never filled.
empty: Vec<u8>,
}

impl<R> BufReader<R, StdPolicy> {
Expand Down Expand Up @@ -262,7 +265,7 @@ impl<R> BufReader<R, StdPolicy> {
/// then it will be returned in `read()` and `fill_buf()` ahead of any data from `inner`.
pub fn with_buffer(buf: Buffer, inner: R) -> Self {
BufReader {
buf, inner, policy: StdPolicy
buf, inner, policy: StdPolicy, empty: vec![]
}
}
}
Expand All @@ -273,6 +276,7 @@ impl<R, P> BufReader<R, P> {
BufReader {
inner: self.inner,
buf: self.buf,
empty: self.empty,
policy
}
}
Expand Down Expand Up @@ -357,6 +361,11 @@ impl<R, P: ReaderPolicy> BufReader<R, P> {
fn should_read(&mut self) -> bool {
self.policy.before_read(&mut self.buf).0
}

#[inline]
fn is_paused(&mut self) -> bool {
self.policy.is_paused()
}
}

impl<R: Read, P> BufReader<R, P> {
Expand All @@ -377,12 +386,17 @@ impl<R: Read, P> BufReader<R, P> {
inner,
buf: self.buf,
policy: self.policy,
empty: self.empty,
}
}
}

impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
// If reading is paused, returning 0 to send end reading signal
if self.is_paused() {
return Ok(0);
}
// If we don't have any buffered data and we're doing a read matching
// or exceeding the internal buffer's capacity, bypass the buffer.
if self.buf.is_empty() && out.len() >= self.buf.capacity() {
Expand All @@ -397,6 +411,10 @@ impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {

impl<R: Read, P: ReaderPolicy> BufRead for BufReader<R, P> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If reading is paused, we are sending empty buffer to send signal - "no data any more"
if self.is_paused() {
return Ok(&self.empty);
}
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// This execution order is important; the policy may want to resize the buffer or move data
Expand Down
17 changes: 17 additions & 0 deletions src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ pub trait ReaderPolicy {
///
/// This is a no-op by default.
fn after_consume(&mut self, _buffer: &mut Buffer, _amt: usize) {}

/// Consulted with `read` and `fill_buf` methods.
/// Return `bool` to continue (`true`) reading or pause it (`false`).
///
/// ### Note
/// As soon as it was paused, the current position in the buffer isn't dropped.
/// The reader still can continue reading with the next iteration if the flag will
/// be changed again to `true`.
///
/// Possible use-case. For example, we have a huge file, which we would like to
/// read and somehow manipulate with content (search in it for example). If we
/// are passing the reader into the searcher, we are losing control of it. Withing
/// `pausing` policy we can pass to a reader some kind of token and keep control
/// of the reading process.
fn is_paused(&mut self) -> bool {
false
}
}

/// Behavior of `std::io::BufReader`: the buffer will only be read into if it is empty.
Expand Down

0 comments on commit 6eabdbb

Please sign in to comment.