Skip to content

Commit

Permalink
feat(core/types): Implement concurrent read for blocking read
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed May 6, 2024
1 parent 71c8df1 commit d500961
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 103 deletions.
220 changes: 135 additions & 85 deletions core/src/types/blocking_read/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
use std::collections::Bound;
use std::ops::Range;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Buf;
use bytes::BufMut;
use log::debug;
use rayon::prelude::*;

use crate::raw::oio::BlockingRead;
use crate::raw::*;
use crate::*;

use super::buffer_iterator::BufferIterator;

/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
pub(crate) inner: Arc<dyn oio::BlockingRead>,
options: OpReader,
}

Expand All @@ -51,7 +50,10 @@ impl BlockingReader {
) -> crate::Result<Self> {
let (_, r) = acc.blocking_read(path, op)?;

Ok(BlockingReader { inner: r, options })
Ok(BlockingReader {
inner: Arc::new(r),
options,
})
}

/// Read give range from reader into [`Buffer`].
Expand Down Expand Up @@ -82,72 +84,26 @@ impl BlockingReader {
}
}

let iter = BufferIterator::new(
self.inner.clone(),
self.options.chunk(),
self.options.concurrent(),
start,
end,
);

let mut bufs = Vec::new();
let mut offset = start;
let concurrent = self.options.concurrent() as u64;


let (interval_size, mut intervals) = end
.map(|end| {
// let interval_size = (end - start + concurrent - 1) / concurrent;
let interval_size = (end - start) / concurrent;
let remainder = (end - start) % concurrent;
let intervals: Vec<(u64, u64)> = (0..concurrent)
.map(|i| {
let interval_start = start + i * interval_size + remainder.min(i);
let interval_end =
interval_start + interval_size + if i < remainder { 1 } else { 0 };
(interval_start, interval_end)
})
.filter(|(interval_start, interval_end)| interval_start != interval_end)
.collect();
(interval_size, intervals)
})
.unwrap_or({
// TODO: use service preferred io size instead.
let interval_size = 4 * 1024 * 1024;
let intervals: Vec<(u64, u64)> = (0..concurrent)
.map(|i| {
let current = start + i * interval_size;
(current, current + interval_size)
})
.collect();
(interval_size, intervals)
});

loop {

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|(start, end)| -> Result<(usize, Buffer)> {
let limit = (end - start) as usize;

let bs = self.inner.read_at(start as u64, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
let result = result?;
bufs.push(result.1);
if result.0 < interval_size as usize {
return Ok(bufs.into_iter().flatten().collect());
}

offset += result.0 as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
for buffer in iter {
match buffer {
Ok(bs) => {
bufs.push(bs);
}
Err(err) => return Err(err),
}

intervals = (0..concurrent)
.map(|i| {
let current = offset + i * interval_size;
(current, current + interval_size)
})
.collect();
}

Ok(bufs.into_iter().flatten().collect())
}

///
Expand Down Expand Up @@ -177,38 +133,132 @@ impl BlockingReader {
}
}

let mut offset = start;
let mut read = 0;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
buf.put(bs);
read += n as u64;
if n < limit {
return Ok(read as _);
}
let iter = BufferIterator::new(
self.inner.clone(),
self.options.chunk(),
self.options.concurrent(),
start,
end,
);

offset += n as u64;
if Some(offset) == end {
return Ok(read as _);
let mut total_len = 0;
for buffer in iter {
match buffer {
Ok(bs) => {
total_len += bs.len();
buf.put(bs);
}
Err(err) => return Err(err),
}
}

Ok(total_len)
}

/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_std_read(self, range: Range<u64>) -> StdReader {
// TODO: the capacity should be decided by services.
StdReader::new(self.inner, range)
StdReader::new(self.inner.clone(), range)
}

/// Convert reader into [`StdBytesIterator`] which implements [`Iterator`].
#[inline]
pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator {
StdBytesIterator::new(self.inner, range)
StdBytesIterator::new(self.inner.clone(), range)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::ThreadRng;
use rand::Rng;
use rand::RngCore;

fn gen_random_bytes() -> Vec<u8> {
let mut rng = ThreadRng::default();
// Generate size between 1B..16MB.
let size = rng.gen_range(1..16 * 1024 * 1024);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
content
}

#[test]
fn test_blocking_reader_read() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_chunk() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader_with(path).chunk(16).call().unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_concurrent() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op
.reader_with(path)
.chunk(128)
.concurrent(16)
.call()
.unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_into() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let mut buf = Vec::new();
reader
.read_into(&mut buf, ..)
.expect("read to end must succeed");

assert_eq!(buf, content);
}
}
115 changes: 115 additions & 0 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::Buf;
use rayon::prelude::*;

use crate::raw::*;
use crate::*;

pub(super) struct BufferIterator {
inner: Arc<dyn oio::BlockingRead>,
it: RangeIterator,

chunk: usize,
end: Option<u64>,
concurrent: usize,
finished: Arc<AtomicBool>,
}

impl BufferIterator {
pub fn new(
inner: Arc<dyn oio::BlockingRead>,
chunk: Option<usize>,
concurrent: usize,
offset: u64,
end: Option<u64>,
) -> Self {
let chunk = chunk.unwrap_or(4 * 1024 * 1024);
let it = RangeIterator {
offset,
chunk: chunk as u64,
};
Self {
inner,
it,
chunk,
end,
concurrent,
finished: Arc::new(AtomicBool::new(false)),
}
}
}

impl Iterator for BufferIterator {
type Item = Result<Buffer>;

fn next(&mut self) -> Option<Self::Item> {
if self.finished.load(Ordering::Relaxed) {
return None;
}

let mut bufs = Vec::with_capacity(self.concurrent);

let intervals: Vec<Range<u64>> = (0..self.concurrent)
.map(|_| {
let range = self.it.next().unwrap_or(Range {
start: u64::MAX,
end: u64::MAX,
});
if let Some(end) = self.end {
if range.start + range.end > end {
return Range {
start: range.start,
end,
};
}
}
range
})
.filter(|range| range.start < range.end)
.collect();

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|range| -> Result<(usize, Buffer)> {
let limit = (range.end - range.start) as usize;

let bs = self.inner.read_at(range.start, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
match result {
Ok((n, buf)) => {
bufs.push(buf);
if n < self.chunk {
self.finished.store(true, Ordering::Relaxed);
return Some(Ok(bufs.into_iter().flatten().collect()));
}
}
Err(err) => return Some(Err(err)),
}
}

Some(Ok(bufs.into_iter().flatten().collect()))
}
}

pub(super) struct RangeIterator {
offset: u64,
chunk: u64,
}

impl Iterator for RangeIterator {
type Item = Range<u64>;

fn next(&mut self) -> Option<Self::Item> {
let offset = self.offset;
self.offset += self.chunk;
Some(offset..offset + self.chunk)
}
}
1 change: 1 addition & 0 deletions core/src/types/blocking_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ mod std_bytes_iterator;
pub use std_bytes_iterator::StdBytesIterator;
mod std_reader;
pub use std_reader::StdReader;
mod buffer_iterator;
Loading

0 comments on commit d500961

Please sign in to comment.