Skip to content

Commit

Permalink
Allow usage without Seek: RevReader approach
Browse files Browse the repository at this point in the history
Implemented my proposal to resolve RustAudio#12 .
  • Loading branch information
florian1345 committed Aug 2, 2022
1 parent 0910d8d commit 9fce63a
Showing 1 changed file with 192 additions and 28 deletions.
220 changes: 192 additions & 28 deletions src/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,19 +528,167 @@ impl BasePacketReader {
}
}

// TODO replace 'a with GAT as soon as they are stabilized
// (https://github.com/rust-lang/rust/issues/44265)

/// A trait for [Read] implementations that allow reversible reads to a buffer.
/// After such a read, the reader's position can be reverted as with [Seek],
/// however only as long as it is still within the bounds of the previous
/// reversible read call. As such, this trait is to be viewed as a subset of
/// [Seek]. The lifetime `'a` is that of the reference passed to the object in
/// [RevRead::rev_read] in order to support depending [RevRead::Buf] on that
/// lifetime.
///
/// Usually, you will not need to implement this yourself. Instead, you can
/// rely either on the blanket implementation provided for all type that
/// implement [Read] and [Seek] or use the [RevReader] to wrap a type that only
/// implements [Read].
pub trait RevRead<'a>: Read {

/// The type of buffer returned by this reversible reader.
type Buf: AsRef<[u8]>;

/// Executes a reversible read hat returns a buffer with data read from the
/// current position in the reader. The buffer must be larger than 28 bytes
/// in size, but it does not have to be completely filled with data from
/// the reader. The second return value indicates the actual numbe of bytes
/// read from the reader.
///
/// # Arguments
///
/// * `amount`: An upper bound on the number of bytes that should be read
/// from the reader. This does not have to be an upper bound on the size of
/// the returned buffer.
///
/// # Errors
///
/// If reading fails according to [Read::read].
fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)>;

/// Reverts the previous execution of [RevRead::rev_read] by some amount,
/// i.e. the next read will yield previous data of that size again as if
/// [Seek::seek] was called with [SeekFrom::Current] of `-amount`. It is a
/// prerequisite to this function that the previous operation was
/// [RevRead::rev_read] and that that returned at least one byte of data,
/// otherwise reverting may or may not work.
///
/// # Arguments
///
/// * `amount`: The number of bytes by which to revert the previous read.
///
/// # Errors
///
/// If reverting by the given amount is unsupported given the current
/// situation.
fn revert(&mut self, amount: usize) -> io::Result<()>;
}

// The array's size is freely choosable, but must be > 27.
const REV_READ_BUF_SIZE: usize = 1024;

impl<'a, R: Read + Seek> RevRead<'a> for R {
type Buf = [u8; REV_READ_BUF_SIZE];

fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)> {
let amount = amount.min(REV_READ_BUF_SIZE);
let mut buf = [0; REV_READ_BUF_SIZE];
let count = self.read(&mut buf[..amount])?;

Ok((buf, count))
}

fn revert(&mut self, amount: usize) -> io::Result<()> {
self.seek(SeekFrom::Current(-(amount as i64)))?;

Ok(())
}
}

/// A reversible reader that wraps around a type implementing [Read] and offers
/// a [RevRead] implementation by storing read data in a local buffer. Use this
/// if you want to use the [PacketReader] with a type that does not implement
/// [Seek].
pub struct RevReader<R> {
/// The underlying reader.
read: R,
/// The buffer used to store the result of a [RevRead::rev_read] call.
buf: Box<[u8; REV_READ_BUF_SIZE]>,
/// The number of bytes in the buffer that are valid.
buf_len: usize,
/// The index of the next byte that should be read from the buffer.
buf_idx: usize
}

impl<R> RevReader<R> {

/// Creates a new reversible reader that wraps around the given `read`.
/// Note in order for the [RevRead] implementation to hold, `read` must
/// implement [Read].
pub fn new(read: R) -> RevReader<R> {
RevReader {
read,
buf: Box::new([0; REV_READ_BUF_SIZE]),
buf_len: 0,
buf_idx: 0
}
}
}

impl<R: Read> Read for RevReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buf_idx < self.buf_len {
// We are still within reverted amount => copy from internal buffer
let read_len = (self.buf_len - self.buf_idx).min(buf.len());
let read_data = &self.buf[self.buf_idx..(self.buf_idx + read_len)];

(&mut buf[..read_len]).copy_from_slice(read_data);
self.buf_idx += read_len;
Ok(read_len)
}
else {
// Invalidate the buffer
self.buf_len = 0;
self.read.read(buf)
}
}
}

impl<'a, R: Read> RevRead<'a> for RevReader<R> {
type Buf = &'a [u8];

fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)> {
let amount = amount.min(self.buf.len());
let buf = &mut self.buf[..amount];
let count = self.read.read(buf)?;

self.buf_len = count;
self.buf_idx = count;
Ok((&*self.buf, count))
}

fn revert(&mut self, amount: usize) -> io::Result<()> {
if amount > self.buf_idx || self.buf_len == 0 {
return Err(io::Error::new(ErrorKind::Other, "invalid revert"));
}

self.buf_idx -= amount;
Ok(())
}
}

#[derive(Clone, Copy)]
enum UntilPageHeaderReaderMode {
Searching,
FoundWithNeeded(u8),
SeekNeeded(i32),
RevertNeeded(usize),
Found,
}

enum UntilPageHeaderResult {
Eof,
Found,
ReadNeeded,
SeekNeeded,
RevertNeeded,
}

struct UntilPageHeaderReader {
Expand Down Expand Up @@ -583,27 +731,28 @@ impl UntilPageHeaderReader {
/// return Ok(true) if the full header has been read and can be extracted with
///
/// or return Ok(false) if the
pub fn do_read<R :Read>(&mut self, mut rdr :R)
-> Result<UntilPageHeaderResult, OggReadError> {
pub fn do_read<R>(&mut self, rdr :&mut R)
-> Result<UntilPageHeaderResult, OggReadError>
where
for<'a> R: RevRead<'a>
{
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
// The array's size is freely choosable, but must be > 27,
// and must well fit into an i32 (needs to be stored in SeekNeeded)
let mut buf :[u8; 1024] = [0; 1024];

let rd_len = tri!(rdr.read(if self.read_amount < 27 {
let rd_amount = if self.read_amount < 27 {
// This is an optimisation for the most likely case:
// the next page directly follows the current read position.
// Then it would be a waste to read more than the needed amount.
&mut buf[0 .. 27 - self.read_amount]
27 - self.read_amount
} else {
match self.mode {
Searching => &mut buf,
FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
SeekNeeded(_) => return Ok(Res::SeekNeeded),
Searching => usize::MAX,
FoundWithNeeded(amount) => amount as usize,
RevertNeeded(_) => return Ok(Res::RevertNeeded),
Found => return Ok(Res::Found),
}
}));
};
let (buf, rd_len) = tri!(rdr.rev_read(rd_amount));
let buf = buf.as_ref();

if rd_len == 0 {
// Reached EOF. This means we're in one of these cases:
Expand Down Expand Up @@ -678,18 +827,21 @@ impl UntilPageHeaderReader {
// Seek back so that we are at the position
// right after the header.

self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
return Ok(Res::SeekNeeded);
self.mode = RevertNeeded(fnd_buf.len() - needed);
return Ok(Res::RevertNeeded);
}
}
pub fn do_seek<S :Seek>(&mut self, mut skr :S)
-> Result<UntilPageHeaderResult, OggReadError> {
pub fn do_revert<R>(&mut self, rdr :&mut R)
-> Result<UntilPageHeaderResult, OggReadError>
where
for<'a> R: RevRead<'a>
{
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
match self.mode {
Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
SeekNeeded(offs) => {
tri!(skr.seek(SeekFrom::Current(offs as i64)));
RevertNeeded(amount) => {
tri!(rdr.revert(amount));
self.mode = Found;
Ok(Res::Found)
},
Expand All @@ -715,15 +867,21 @@ consistent when it encounters the `WouldBlock` error kind.
If you desire async functionality, consider enabling the `async` feature
and look into the async module.
*/
pub struct PacketReader<T :io::Read + io::Seek> {
pub struct PacketReader<T>
where
for<'a> T: RevRead<'a>
{
rdr :T,

base_pck_rdr :BasePacketReader,

read_some_pg :bool
}

impl<T :io::Read + io::Seek> PacketReader<T> {
impl<T> PacketReader<T>
where
for<'a> T: RevRead<'a>
{
/// Constructs a new `PacketReader` with a given `Read`.
pub fn new(rdr :T) -> PacketReader<T> {
PacketReader { rdr, base_pck_rdr : BasePacketReader::new(), read_some_pg : false }
Expand Down Expand Up @@ -785,7 +943,7 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
break
},
ReadNeeded => tri!(r.do_read(&mut self.rdr)),
SeekNeeded => tri!(r.do_seek(&mut self.rdr))
RevertNeeded => tri!(r.do_revert(&mut self.rdr))
}
}
Ok(Some(r.into_header()))
Expand Down Expand Up @@ -815,6 +973,17 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
}

/// Resets the internal state by deleting all
/// unread packets.
pub fn delete_unread_packets(&mut self) {
self.base_pck_rdr.update_after_seek();
}
}

impl<T> PacketReader<T>
where
for<'a> T: RevRead<'a> + io::Seek
{
/// Seeks the underlying reader
///
/// Seeks the reader that this PacketReader bases on by the specified
Expand Down Expand Up @@ -1015,11 +1184,6 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
}
}
}
/// Resets the internal state by deleting all
/// unread packets.
pub fn delete_unread_packets(&mut self) {
self.base_pck_rdr.update_after_seek();
}
}

// util function
Expand Down

0 comments on commit 9fce63a

Please sign in to comment.