From 6eabdbb81a8628f83b15622cd59249f557373cf1 Mon Sep 17 00:00:00 2001 From: DmitryAstafyev Date: Tue, 14 Jun 2022 15:54:54 +0200 Subject: [PATCH] Reading cancelation --- src/lib.rs | 20 +++++++++++++++++++- src/policy.rs | 17 +++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 6a5365a..440db8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -203,6 +203,9 @@ pub struct BufReader{ buf: Buffer, inner: R, policy: P, + // We need field "empty" to return empty &[u8] in case of reader is paused + // This field is never used, never filled. + empty: Vec, } impl BufReader { @@ -262,7 +265,7 @@ impl BufReader { /// then it will be returned in `read()` and `fill_buf()` ahead of any data from `inner`. pub fn with_buffer(buf: Buffer, inner: R) -> Self { BufReader { - buf, inner, policy: StdPolicy + buf, inner, policy: StdPolicy, empty: vec![] } } } @@ -273,6 +276,7 @@ impl BufReader { BufReader { inner: self.inner, buf: self.buf, + empty: self.empty, policy } } @@ -357,6 +361,11 @@ impl BufReader { fn should_read(&mut self) -> bool { self.policy.before_read(&mut self.buf).0 } + + #[inline] + fn is_paused(&mut self) -> bool { + self.policy.is_paused() + } } impl BufReader { @@ -377,12 +386,17 @@ impl BufReader { inner, buf: self.buf, policy: self.policy, + empty: self.empty, } } } impl Read for BufReader { fn read(&mut self, out: &mut [u8]) -> io::Result { + // If reading is paused, returning 0 to send end reading signal + if self.is_paused() { + return Ok(0); + } // If we don't have any buffered data and we're doing a read matching // or exceeding the internal buffer's capacity, bypass the buffer. if self.buf.is_empty() && out.len() >= self.buf.capacity() { @@ -397,6 +411,10 @@ impl Read for BufReader { impl BufRead for BufReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { + // If reading is paused, we are sending empty buffer to send signal - "no data any more" + if self.is_paused() { + return Ok(&self.empty); + } // If we've reached the end of our internal buffer then we need to fetch // some more data from the underlying reader. // This execution order is important; the policy may want to resize the buffer or move data diff --git a/src/policy.rs b/src/policy.rs index c2ab54b..64818f9 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -64,6 +64,23 @@ pub trait ReaderPolicy { /// /// This is a no-op by default. fn after_consume(&mut self, _buffer: &mut Buffer, _amt: usize) {} + + /// Consulted with `read` and `fill_buf` methods. + /// Return `bool` to continue (`true`) reading or pause it (`false`). + /// + /// ### Note + /// As soon as it was paused, the current position in the buffer isn't dropped. + /// The reader still can continue reading with the next iteration if the flag will + /// be changed again to `true`. + /// + /// Possible use-case. For example, we have a huge file, which we would like to + /// read and somehow manipulate with content (search in it for example). If we + /// are passing the reader into the searcher, we are losing control of it. Withing + /// `pausing` policy we can pass to a reader some kind of token and keep control + /// of the reading process. + fn is_paused(&mut self) -> bool { + false + } } /// Behavior of `std::io::BufReader`: the buffer will only be read into if it is empty.