Skip to content

Commit

Permalink
Support filter out strip by provided range
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Sep 29, 2024
1 parent a5c5ee2 commit f01f7c5
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ private/

/perf.*
/flamegraph.svg

# IDEA
.idea/
34 changes: 31 additions & 3 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
Expand All @@ -28,7 +29,7 @@ use crate::projection::ProjectionMask;
use crate::reader::metadata::{read_metadata, FileMetadata};
use crate::reader::ChunkReader;
use crate::schema::RootDataType;
use crate::stripe::Stripe;
use crate::stripe::{Stripe, StripeMetadata};

const DEFAULT_BATCH_SIZE: usize = 8192;

Expand All @@ -38,6 +39,7 @@ pub struct ArrowReaderBuilder<R> {
pub(crate) batch_size: usize,
pub(crate) projection: ProjectionMask,
pub(crate) schema_ref: Option<SchemaRef>,
pub(crate) file_byte_range: Option<Range<usize>>,
}

impl<R> ArrowReaderBuilder<R> {
Expand All @@ -48,6 +50,7 @@ impl<R> ArrowReaderBuilder<R> {
batch_size: DEFAULT_BATCH_SIZE,
projection: ProjectionMask::all(),
schema_ref: None,
file_byte_range: None,
}
}

Expand All @@ -70,6 +73,12 @@ impl<R> ArrowReaderBuilder<R> {
self
}

/// Specifies a range of file bytes that will read the strips offset within this range
pub fn with_file_byte_range(mut self, range: Range<usize>) -> Self {
self.file_byte_range = Some(range);
self
}

/// Returns the currently computed schema
///
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
Expand Down Expand Up @@ -108,6 +117,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
file_metadata: self.file_metadata,
projected_data_type,
stripe_index: 0,
file_byte_range: self.file_byte_range,
};
ArrowReader {
cursor,
Expand Down Expand Up @@ -176,14 +186,32 @@ pub(crate) struct Cursor<R> {
pub file_metadata: Arc<FileMetadata>,
pub projected_data_type: RootDataType,
pub stripe_index: usize,
pub file_byte_range: Option<Range<usize>>,
}

impl<R: ChunkReader> Cursor<R> {
fn get_stripe_metadatas(&self) -> Vec<StripeMetadata> {
if let Some(range) = self.file_byte_range.clone() {
self.file_metadata
.stripe_metadatas()
.iter()
.filter(|info| {
let offset = info.offset() as usize;
range.contains(&offset)
})
.map(|info| info.to_owned())
.collect::<Vec<_>>()
} else {
self.file_metadata.stripe_metadatas().to_vec()
}
}
}

impl<R: ChunkReader> Iterator for Cursor<R> {
type Item = Result<Stripe>;

fn next(&mut self) -> Option<Self::Item> {
self.file_metadata
.stripe_metadatas()
self.get_stripe_metadatas()
.get(self.stripe_index)
.map(|info| {
let stripe = Stripe::new(
Expand Down
8 changes: 8 additions & 0 deletions src/async_arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ impl<R: AsyncChunkReader + 'static> StripeFactory<R> {
.cloned();

if let Some(info) = info {
if let Some(range) = self.inner.file_byte_range.clone() {
let offset = info.offset() as usize;
if !range.contains(&offset) {
self.inner.stripe_index += 1;
return Ok((self, None));
}
}
match self.read_next_stripe_inner(&info).await {
Ok(stripe) => Ok((self, Some(stripe))),
Err(err) => Err(err),
Expand Down Expand Up @@ -214,6 +221,7 @@ impl<R: AsyncChunkReader + 'static> ArrowReaderBuilder<R> {
file_metadata: self.file_metadata,
projected_data_type,
stripe_index: 0,
file_byte_range: self.file_byte_range,
};
ArrowStreamReader::new(cursor, self.batch_size, schema_ref)
}
Expand Down
6 changes: 5 additions & 1 deletion src/datafusion/physical_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,15 @@ impl FileOpener for OrcOpener {
// Offset by 1 since index 0 is the root
let projection = self.projection.iter().map(|i| i + 1).collect::<Vec<_>>();
Ok(Box::pin(async move {
let builder = ArrowReaderBuilder::try_new_async(reader)
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
.map_err(ArrowError::from)?;
let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
if let Some(range) = file_meta.range.clone() {
let range = range.start as usize..range.end as usize;
builder = builder.with_range(range);
}
let reader = builder
.with_batch_size(batch_size)
.with_projection(projection_mask)
Expand Down
60 changes: 60 additions & 0 deletions tests/basic/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::fs::File;
use std::ops::Range;
use std::sync::Arc;

use arrow::datatypes::{DataType, Decimal128Type, DecimalType, Field, Schema, TimeUnit};
Expand Down Expand Up @@ -48,11 +49,32 @@ async fn new_arrow_stream_reader_root(path: &str) -> ArrowStreamReader<tokio::fs
.build_async()
}

#[cfg(feature = "async")]
async fn new_arrow_stream_reader_range(
path: &str,
range: Range<usize>,
) -> ArrowStreamReader<tokio::fs::File> {
let f = tokio::fs::File::open(path).await.unwrap();
ArrowReaderBuilder::try_new_async(f)
.await
.unwrap()
.with_file_byte_range(range)
.build_async()
}

fn new_arrow_reader_root(path: &str) -> ArrowReader<File> {
let f = File::open(path).expect("no file found");
ArrowReaderBuilder::try_new(f).unwrap().build()
}

fn new_arrow_reader_range(path: &str, range: Range<usize>) -> ArrowReader<File> {
let f = File::open(path).expect("no file found");
ArrowReaderBuilder::try_new(f)
.unwrap()
.with_file_byte_range(range)
.build()
}

fn basic_path(path: &str) -> String {
let dir = env!("CARGO_MANIFEST_DIR");
format!("{}/tests/basic/data/{}", dir, path)
Expand Down Expand Up @@ -360,6 +382,44 @@ pub fn basic_test_0() {
assert_batches_eq(&batch, &expected);
}

#[test]
pub fn basic_test_with_range() {
let path = basic_path("test.orc");
let reader = new_arrow_reader_range(&path, 0..2000);
let batch = reader.collect::<Result<Vec<_>, _>>().unwrap();

assert_eq!(5, batch[0].column(0).len());
}

#[test]
pub fn basic_test_with_range_without_data() {
let path = basic_path("test.orc");
let reader = new_arrow_reader_range(&path, 100..2000);
let batch = reader.collect::<Result<Vec<_>, _>>().unwrap();

assert_eq!(0, batch.len());
}

#[cfg(feature = "async")]
#[tokio::test]
pub async fn async_basic_test_with_range() {
let path = basic_path("test.orc");
let reader = new_arrow_stream_reader_range(&path, 0..2000).await;
let batch = reader.try_collect::<Vec<_>>().await.unwrap();

assert_eq!(5, batch[0].column(0).len());
}

#[cfg(feature = "async")]
#[tokio::test]
pub async fn async_basic_test_with_range_without_data() {
let path = basic_path("test.orc");
let reader = new_arrow_stream_reader_range(&path, 100..2000).await;
let batch = reader.try_collect::<Vec<_>>().await.unwrap();

assert_eq!(0, batch.len());
}

#[cfg(feature = "async")]
#[tokio::test]
pub async fn async_basic_test_0() {
Expand Down

0 comments on commit f01f7c5

Please sign in to comment.