From d500961d6d916d6a6e5dfda7ec1279709f3a0571 Mon Sep 17 00:00:00 2001 From: hoslo Date: Sun, 28 Apr 2024 11:31:52 +0800 Subject: [PATCH] feat(core/types): Implement concurrent read for blocking read --- .../types/blocking_read/blocking_reader.rs | 220 +++++++++++------- .../types/blocking_read/buffer_iterator.rs | 115 +++++++++ core/src/types/blocking_read/mod.rs | 1 + .../types/blocking_read/std_bytes_iterator.rs | 5 +- core/src/types/blocking_read/std_reader.rs | 5 +- core/src/types/operator/blocking_operator.rs | 14 +- core/src/types/operator/operator_functions.rs | 57 ++++- 7 files changed, 314 insertions(+), 103 deletions(-) create mode 100644 core/src/types/blocking_read/buffer_iterator.rs diff --git a/core/src/types/blocking_read/blocking_reader.rs b/core/src/types/blocking_read/blocking_reader.rs index a6403f87ab93..54d776a783ca 100644 --- a/core/src/types/blocking_read/blocking_reader.rs +++ b/core/src/types/blocking_read/blocking_reader.rs @@ -18,20 +18,19 @@ use std::collections::Bound; use std::ops::Range; use std::ops::RangeBounds; +use std::sync::Arc; -use bytes::Buf; use bytes::BufMut; -use log::debug; -use rayon::prelude::*; -use crate::raw::oio::BlockingRead; use crate::raw::*; use crate::*; +use super::buffer_iterator::BufferIterator; + /// BlockingReader is designed to read data from given path in an blocking /// manner. pub struct BlockingReader { - pub(crate) inner: oio::BlockingReader, + pub(crate) inner: Arc, options: OpReader, } @@ -51,7 +50,10 @@ impl BlockingReader { ) -> crate::Result { let (_, r) = acc.blocking_read(path, op)?; - Ok(BlockingReader { inner: r, options }) + Ok(BlockingReader { + inner: Arc::new(r), + options, + }) } /// Read give range from reader into [`Buffer`]. @@ -82,72 +84,26 @@ impl BlockingReader { } } + let iter = BufferIterator::new( + self.inner.clone(), + self.options.chunk(), + self.options.concurrent(), + start, + end, + ); + let mut bufs = Vec::new(); - let mut offset = start; - let concurrent = self.options.concurrent() as u64; - - - let (interval_size, mut intervals) = end - .map(|end| { - // let interval_size = (end - start + concurrent - 1) / concurrent; - let interval_size = (end - start) / concurrent; - let remainder = (end - start) % concurrent; - let intervals: Vec<(u64, u64)> = (0..concurrent) - .map(|i| { - let interval_start = start + i * interval_size + remainder.min(i); - let interval_end = - interval_start + interval_size + if i < remainder { 1 } else { 0 }; - (interval_start, interval_end) - }) - .filter(|(interval_start, interval_end)| interval_start != interval_end) - .collect(); - (interval_size, intervals) - }) - .unwrap_or({ - // TODO: use service preferred io size instead. - let interval_size = 4 * 1024 * 1024; - let intervals: Vec<(u64, u64)> = (0..concurrent) - .map(|i| { - let current = start + i * interval_size; - (current, current + interval_size) - }) - .collect(); - (interval_size, intervals) - }); - - loop { - - let results: Vec> = intervals - .into_par_iter() - .map(|(start, end)| -> Result<(usize, Buffer)> { - let limit = (end - start) as usize; - - let bs = self.inner.read_at(start as u64, limit)?; - let n = bs.remaining(); - - Ok((n, bs)) - }) - .collect(); - for result in results { - let result = result?; - bufs.push(result.1); - if result.0 < interval_size as usize { - return Ok(bufs.into_iter().flatten().collect()); - } - offset += result.0 as u64; - if Some(offset) == end { - return Ok(bufs.into_iter().flatten().collect()); + for buffer in iter { + match buffer { + Ok(bs) => { + bufs.push(bs); } + Err(err) => return Err(err), } - - intervals = (0..concurrent) - .map(|i| { - let current = offset + i * interval_size; - (current, current + interval_size) - }) - .collect(); } + + Ok(bufs.into_iter().flatten().collect()) } /// @@ -177,25 +133,26 @@ impl BlockingReader { } } - let mut offset = start; - let mut read = 0; - - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; - let n = bs.remaining(); - buf.put(bs); - read += n as u64; - if n < limit { - return Ok(read as _); - } + let iter = BufferIterator::new( + self.inner.clone(), + self.options.chunk(), + self.options.concurrent(), + start, + end, + ); - offset += n as u64; - if Some(offset) == end { - return Ok(read as _); + let mut total_len = 0; + for buffer in iter { + match buffer { + Ok(bs) => { + total_len += bs.len(); + buf.put(bs); + } + Err(err) => return Err(err), } } + + Ok(total_len) } /// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], @@ -203,12 +160,105 @@ impl BlockingReader { #[inline] pub fn into_std_read(self, range: Range) -> StdReader { // TODO: the capacity should be decided by services. - StdReader::new(self.inner, range) + StdReader::new(self.inner.clone(), range) } /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] pub fn into_bytes_iterator(self, range: Range) -> StdBytesIterator { - StdBytesIterator::new(self.inner, range) + StdBytesIterator::new(self.inner.clone(), range) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::ThreadRng; + use rand::Rng; + use rand::RngCore; + + fn gen_random_bytes() -> Vec { + let mut rng = ThreadRng::default(); + // Generate size between 1B..16MB. + let size = rng.gen_range(1..16 * 1024 * 1024); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + content + } + + #[test] + fn test_blocking_reader_read() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_chunk() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader_with(path).chunk(16).call().unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_concurrent() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op + .reader_with(path) + .chunk(128) + .concurrent(16) + .call() + .unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_into() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let mut buf = Vec::new(); + reader + .read_into(&mut buf, ..) + .expect("read to end must succeed"); + + assert_eq!(buf, content); } } diff --git a/core/src/types/blocking_read/buffer_iterator.rs b/core/src/types/blocking_read/buffer_iterator.rs new file mode 100644 index 000000000000..e4b68ac35902 --- /dev/null +++ b/core/src/types/blocking_read/buffer_iterator.rs @@ -0,0 +1,115 @@ +use std::ops::Range; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use bytes::Buf; +use rayon::prelude::*; + +use crate::raw::*; +use crate::*; + +pub(super) struct BufferIterator { + inner: Arc, + it: RangeIterator, + + chunk: usize, + end: Option, + concurrent: usize, + finished: Arc, +} + +impl BufferIterator { + pub fn new( + inner: Arc, + chunk: Option, + concurrent: usize, + offset: u64, + end: Option, + ) -> Self { + let chunk = chunk.unwrap_or(4 * 1024 * 1024); + let it = RangeIterator { + offset, + chunk: chunk as u64, + }; + Self { + inner, + it, + chunk, + end, + concurrent, + finished: Arc::new(AtomicBool::new(false)), + } + } +} + +impl Iterator for BufferIterator { + type Item = Result; + + fn next(&mut self) -> Option { + if self.finished.load(Ordering::Relaxed) { + return None; + } + + let mut bufs = Vec::with_capacity(self.concurrent); + + let intervals: Vec> = (0..self.concurrent) + .map(|_| { + let range = self.it.next().unwrap_or(Range { + start: u64::MAX, + end: u64::MAX, + }); + if let Some(end) = self.end { + if range.start + range.end > end { + return Range { + start: range.start, + end, + }; + } + } + range + }) + .filter(|range| range.start < range.end) + .collect(); + + let results: Vec> = intervals + .into_par_iter() + .map(|range| -> Result<(usize, Buffer)> { + let limit = (range.end - range.start) as usize; + + let bs = self.inner.read_at(range.start, limit)?; + let n = bs.remaining(); + + Ok((n, bs)) + }) + .collect(); + for result in results { + match result { + Ok((n, buf)) => { + bufs.push(buf); + if n < self.chunk { + self.finished.store(true, Ordering::Relaxed); + return Some(Ok(bufs.into_iter().flatten().collect())); + } + } + Err(err) => return Some(Err(err)), + } + } + + Some(Ok(bufs.into_iter().flatten().collect())) + } +} + +pub(super) struct RangeIterator { + offset: u64, + chunk: u64, +} + +impl Iterator for RangeIterator { + type Item = Range; + + fn next(&mut self) -> Option { + let offset = self.offset; + self.offset += self.chunk; + Some(offset..offset + self.chunk) + } +} diff --git a/core/src/types/blocking_read/mod.rs b/core/src/types/blocking_read/mod.rs index ff7631cd99eb..2f9178190375 100644 --- a/core/src/types/blocking_read/mod.rs +++ b/core/src/types/blocking_read/mod.rs @@ -23,3 +23,4 @@ mod std_bytes_iterator; pub use std_bytes_iterator::StdBytesIterator; mod std_reader; pub use std_reader::StdReader; +mod buffer_iterator; diff --git a/core/src/types/blocking_read/std_bytes_iterator.rs b/core/src/types/blocking_read/std_bytes_iterator.rs index dba05ae734f2..dc9e75a5a00a 100644 --- a/core/src/types/blocking_read/std_bytes_iterator.rs +++ b/core/src/types/blocking_read/std_bytes_iterator.rs @@ -16,6 +16,7 @@ // under the License. use std::io; +use std::sync::Arc; use bytes::Buf; use bytes::Bytes; @@ -28,7 +29,7 @@ use crate::raw::*; /// /// StdIterator also implements [`Send`] and [`Sync`]. pub struct StdBytesIterator { - inner: oio::BlockingReader, + inner: Arc, offset: u64, size: u64, cap: usize, @@ -39,7 +40,7 @@ pub struct StdBytesIterator { impl StdBytesIterator { /// NOTE: don't allow users to create StdIterator directly. #[inline] - pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range) -> Self { + pub(crate) fn new(r: Arc, range: std::ops::Range) -> Self { StdBytesIterator { inner: r, offset: range.start, diff --git a/core/src/types/blocking_read/std_reader.rs b/core/src/types/blocking_read/std_reader.rs index 191eeb1fbbc3..b7d4f439b3e5 100644 --- a/core/src/types/blocking_read/std_reader.rs +++ b/core/src/types/blocking_read/std_reader.rs @@ -21,6 +21,7 @@ use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::ops::Range; +use std::sync::Arc; use bytes::Buf; @@ -33,7 +34,7 @@ use crate::*; /// /// StdReader also implements [`Send`] and [`Sync`]. pub struct StdReader { - inner: oio::BlockingReader, + inner: Arc, offset: u64, size: u64, cap: usize, @@ -45,7 +46,7 @@ pub struct StdReader { impl StdReader { /// NOTE: don't allow users to create StdReader directly. #[inline] - pub(super) fn new(r: oio::BlockingReader, range: Range) -> Self { + pub(super) fn new(r: Arc, range: Range) -> Self { StdReader { inner: r, offset: range.start, diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 68361fac5416..9d77573d8838 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -385,7 +385,11 @@ impl BlockingOperator { FunctionRead(OperatorFunction::new( self.inner().clone(), path, - (OpRead::default(), BytesRange::default(), OpReader::default()), + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), |inner, path, (args, range, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( @@ -443,8 +447,12 @@ impl BlockingOperator { FunctionReader(OperatorFunction::new( self.inner().clone(), path, - (OpRead::default(), OpReader::default()), - |inner, path, (args, options)| { + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), + |inner, path, (args, _, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "reader path is a directory") diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index ef584f884cb9..3a49bf9c6a82 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -344,19 +344,30 @@ impl FunctionRead { .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); self } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::reader_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FunctionReader(pub(crate) OperatorFunction<(OpRead, OpReader), BlockingReader>); +pub struct FunctionReader( + pub(crate) OperatorFunction<(OpRead, BytesRange, OpReader), BlockingReader>, +); impl FunctionReader { /// Sets the content-disposition header that should be send back by the remote read operation. pub fn override_content_disposition(mut self, content_disposition: &str) -> Self { - self.0 = self.0.map_args(|(args, options)| { + self.0 = self.0.map_args(|(args, range, options)| { ( args.with_override_content_disposition(content_disposition), + range, options, ) }); @@ -365,17 +376,25 @@ impl FunctionReader { /// Sets the cache-control header that should be send back by the remote read operation. pub fn override_cache_control(mut self, cache_control: &str) -> Self { - self.0 = self - .0 - .map_args(|(args, options)| (args.with_override_cache_control(cache_control), options)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_cache_control(cache_control), + range, + options, + ) + }); self } /// Sets the content-type header that should be send back by the remote read operation. pub fn override_content_type(mut self, content_type: &str) -> Self { - self.0 = self - .0 - .map_args(|(args, options)| (args.with_override_content_type(content_type), options)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_content_type(content_type), + range, + options, + ) + }); self } @@ -383,7 +402,7 @@ impl FunctionReader { pub fn if_match(mut self, v: &str) -> Self { self.0 = self .0 - .map_args(|(args, options)| (args.with_if_match(v), options)); + .map_args(|(args, range, options)| (args.with_if_match(v), range, options)); self } @@ -391,7 +410,7 @@ impl FunctionReader { pub fn if_none_match(mut self, v: &str) -> Self { self.0 = self .0 - .map_args(|(args, options)| (args.with_if_none_match(v), options)); + .map_args(|(args, range, options)| (args.with_if_none_match(v), range, options)); self } @@ -399,7 +418,7 @@ impl FunctionReader { pub fn version(mut self, v: &str) -> Self { self.0 = self .0 - .map_args(|(args, options)| (args.with_version(v), options)); + .map_args(|(args, range, options)| (args.with_version(v), range, options)); self } @@ -408,6 +427,22 @@ impl FunctionReader { pub fn call(self) -> Result { self.0.call() } + + /// Set the concurrent read task amount. + pub fn concurrent(mut self, concurrent: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); + self + } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::stat_with`].