Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement row group skipping for the default engine parquet readers #362

Merged
merged 33 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
715f233
WIP - first pass at the code
ryan-johnson-databricks Sep 25, 2024
ef71f1a
split out a trait, add more type support
ryan-johnson-databricks Sep 25, 2024
39b8927
support short circuit junction eval
ryan-johnson-databricks Sep 25, 2024
b5c3a52
Merge remote-tracking branch 'oss/main' into row-group-skipping
scovich Sep 25, 2024
e71571e
add tests, fix bugs
scovich Sep 26, 2024
cbca3b3
support SQL WHERE semantics, finished adding tests for skipping logic
scovich Sep 27, 2024
e7d87eb
Mark block text as not rust code doctest should run
scovich Sep 27, 2024
beeb6e8
add missing tests identified by codecov
scovich Sep 27, 2024
519acbd
Wire up row group skipping
scovich Sep 27, 2024
18b33cf
delete for split - parquet reader uses row group skipping
scovich Sep 27, 2024
6c98441
parquet reader now uses row group skipping
scovich Sep 27, 2024
0fdaf0a
add stats-getter test; review comments
scovich Oct 3, 2024
8ac33f8
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 3, 2024
1cf03dc
improve test coverage; clippy
scovich Oct 3, 2024
bc8b344
yet more test coverage
scovich Oct 3, 2024
0971002
improve test coverage even more
scovich Oct 4, 2024
375a380
Add a query level test as well
scovich Oct 4, 2024
6236874
Fix broken sync json parsing and harmonize file reading
scovich Oct 4, 2024
9efcbf7
fmt
scovich Oct 4, 2024
46d19e3
remove spurious TODO
scovich Oct 7, 2024
7666512
Revert "Fix broken sync json parsing and harmonize file reading"
scovich Oct 7, 2024
f3865d0
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 7, 2024
a4dc3da
review comments
scovich Oct 7, 2024
40131db
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 8, 2024
bf65904
Infer null count stat for missing columns; add more tests
scovich Oct 8, 2024
cce762d
One last test
scovich Oct 8, 2024
c7d6bb0
test cleanup
scovich Oct 8, 2024
4f92ed7
code comment tweak
scovich Oct 8, 2024
08a305b
remove unneeded test
scovich Oct 8, 2024
e8a947e
Merge remote-tracking branch 'oss' into use-row-group-skipping
scovich Oct 9, 2024
bf1e3a8
fix two nullcount stat bugs
scovich Oct 9, 2024
9d632e7
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 9, 2024
4a77f3a
review nits
scovich Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
scovich marked this conversation as resolved.
Show resolved Hide resolved
let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
28 changes: 24 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

Expand Down Expand Up @@ -47,7 +48,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,10 +63,15 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -83,20 +89,23 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
table_schema: SchemaRef,
predicate: Option<Expression>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
predicate,
limit: None,
store,
}
Expand All @@ -111,6 +120,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;

Ok(Box::pin(async move {
Expand All @@ -133,6 +143,9 @@ impl FileOpener for ParquetOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand All @@ -153,16 +166,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand All @@ -173,6 +188,7 @@ impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs

Expand All @@ -196,6 +212,9 @@ impl FileOpener for PresignedUrlOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand Down Expand Up @@ -261,6 +280,7 @@ mod tests {
size: meta.size,
}];

// TODO: add a test that uses predicate skipping?
scovich marked this conversation as resolved.
Show resolved Hide resolved
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub mod arrow_expression;
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod arrow_data;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_row_group_skipping;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_stats_skipping;

Expand Down
207 changes: 207 additions & 0 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
//! An implementation of parquet row group skipping using data skipping predicates over footer stats.
use crate::engine::parquet_stats_skipping::{col_name_to_path, ParquetStatsSkippingFilter};
use crate::expressions::{Expression, Scalar};
use crate::schema::{DataType, PrimitiveType};
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::statistics::Statistics;
use parquet::schema::types::{ColumnDescPtr, ColumnPath};
use std::collections::{HashMap, HashSet};

#[cfg(test)]
mod tests;

/// An extension trait for [`ArrowReaderBuilder`] that injects row group skipping capability.
pub(crate) trait ParquetRowGroupSkipping {
/// Instructs the parquet reader to perform row group skipping, eliminating any row group whose
/// stats prove that none of the group's rows can satisfy the given `predicate`.
fn with_row_group_filter(self, predicate: &Expression) -> Self;
}
impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> {
fn with_row_group_filter(self, predicate: &Expression) -> Self {
let indices = self
.metadata()
.row_groups()
.iter()
.enumerate()
.filter_map(|(index, row_group)| {
// If the group survives the filter, return Some(index) so filter_map keeps it.
RowGroupFilter::apply(row_group, predicate).then_some(index)
})
.collect();
self.with_row_groups(indices)
}
}

/// A ParquetStatsSkippingFilter for row group skipping. It obtains stats from a parquet
/// [`RowGroupMetaData`] and pre-computes the mapping of each referenced column path to its
/// corresponding field index, for O(1) stats lookups.
struct RowGroupFilter<'a> {
row_group: &'a RowGroupMetaData,
field_indices: HashMap<ColumnPath, usize>,
}

impl<'a> RowGroupFilter<'a> {
/// Creates a new row group filter for the given row group and predicate.
fn new(row_group: &'a RowGroupMetaData, predicate: &Expression) -> Self {
Self {
row_group,
field_indices: compute_field_indices(row_group.schema_descr().columns(), predicate),
}
}

/// Applies a filtering predicate to a row group. Return value false means to skip it.
fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool {
let result = RowGroupFilter::new(row_group, predicate).apply_sql_where(predicate);
!matches!(result, Some(false))
scovich marked this conversation as resolved.
Show resolved Hide resolved
}

fn get_stats(&self, col: &ColumnPath) -> Option<&Statistics> {
let field_index = self.field_indices.get(col)?;
self.row_group.column(*field_index).statistics()
}
fn decimal_from_bytes(bytes: Option<&[u8]>, p: u8, s: u8) -> Option<Scalar> {
nicklan marked this conversation as resolved.
Show resolved Hide resolved
// WARNING: The bytes are stored in big-endian order; reverse and then 0-pad them.
let bytes = bytes.filter(|b| b.len() <= 16)?;
let mut bytes = Vec::from(bytes);
bytes.reverse();
bytes.resize(16, 0u8);
let bytes: [u8; 16] = bytes.try_into().ok()?;
Some(Scalar::Decimal(i128::from_le_bytes(bytes), p, s))
}
}

impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> {
// Extracts a stat value, converting from its physical type to the requested logical type.
//
// NOTE: This code is highly redundant with [`get_min_stat_value`], but parquet
scovich marked this conversation as resolved.
Show resolved Hide resolved
// ValueStatistics<T> requires T to impl a private trait, so we can't factor out any kind of
// helper method. And macros are hard enough to read that it's not worth defining one.
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> {
use PrimitiveType::*;
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) {
(String, Statistics::ByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(),
(String, Statistics::FixedLenByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(),
(String, _) => return None,
(Long, Statistics::Int64(s)) => s.min_opt()?.into(),
(Long, Statistics::Int32(s)) => (*s.min_opt()? as i64).into(),
(Long, _) => return None,
(Integer, Statistics::Int32(s)) => s.min_opt()?.into(),
(Integer, _) => return None,
(Short, Statistics::Int32(s)) => (*s.min_opt()? as i16).into(),
(Short, _) => return None,
(Byte, Statistics::Int32(s)) => (*s.min_opt()? as i8).into(),
(Byte, _) => return None,
(Float, Statistics::Float(s)) => s.min_opt()?.into(),
(Float, _) => return None,
(Double, Statistics::Double(s)) => s.min_opt()?.into(),
(Double, Statistics::Float(s)) => (*s.min_opt()? as f64).into(),
(Double, _) => return None,
(Boolean, Statistics::Boolean(s)) => s.min_opt()?.into(),
(Boolean, _) => return None,
(Binary, Statistics::ByteArray(s)) => s.min_opt()?.data().into(),
(Binary, Statistics::FixedLenByteArray(s)) => s.min_opt()?.data().into(),
(Binary, _) => return None,
(Date, Statistics::Int32(s)) => Scalar::Date(*s.min_opt()?),
(Date, _) => return None,
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.min_opt()?),
(Timestamp, _) => return None, // TODO: Int96 timestamps
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.min_opt()?),
(TimestampNtz, Statistics::Int32(_)) => return None, // TODO: widen from DATE
scovich marked this conversation as resolved.
Show resolved Hide resolved
(TimestampNtz, _) => return None, // TODO: Int96 timestamps
(Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s),
(Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s),
(Decimal(p, s), Statistics::FixedLenByteArray(b)) => {
Self::decimal_from_bytes(b.min_bytes_opt(), *p, *s)?
}
(Decimal(..), _) => return None,
};
Some(value)
}

fn get_max_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> {
use PrimitiveType::*;
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) {
(String, Statistics::ByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(),
(String, Statistics::FixedLenByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(),
(String, _) => return None,
(Long, Statistics::Int64(s)) => s.max_opt()?.into(),
(Long, Statistics::Int32(s)) => (*s.max_opt()? as i64).into(),
(Long, _) => return None,
(Integer, Statistics::Int32(s)) => s.max_opt()?.into(),
(Integer, _) => return None,
(Short, Statistics::Int32(s)) => (*s.max_opt()? as i16).into(),
(Short, _) => return None,
(Byte, Statistics::Int32(s)) => (*s.max_opt()? as i8).into(),
(Byte, _) => return None,
(Float, Statistics::Float(s)) => s.max_opt()?.into(),
(Float, _) => return None,
(Double, Statistics::Double(s)) => s.max_opt()?.into(),
(Double, Statistics::Float(s)) => (*s.max_opt()? as f64).into(),
(Double, _) => return None,
(Boolean, Statistics::Boolean(s)) => s.max_opt()?.into(),
(Boolean, _) => return None,
(Binary, Statistics::ByteArray(s)) => s.max_opt()?.data().into(),
(Binary, Statistics::FixedLenByteArray(s)) => s.max_opt()?.data().into(),
(Binary, _) => return None,
(Date, Statistics::Int32(s)) => Scalar::Date(*s.max_opt()?),
(Date, _) => return None,
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.max_opt()?),
(Timestamp, _) => return None, // TODO: Int96 timestamps
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.max_opt()?),
(TimestampNtz, Statistics::Int32(_)) => return None, // TODO: widen from DATE
(TimestampNtz, _) => return None, // TODO: Int96 timestamps
(Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s),
(Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s),
(Decimal(p, s), Statistics::FixedLenByteArray(b)) => {
Self::decimal_from_bytes(b.max_bytes_opt(), *p, *s)?
}
(Decimal(..), _) => return None,
};
Some(value)
}

// Parquet nullcount stats always have the same type (u64), so we can directly return the value
// instead of wrapping it in a Scalar. We can safely cast it from u64 to i64, because the
// nullcount can never be larger than the rowcount, and the parquet rowcount stat is i64.
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> {
Some(self.get_stats(col)?.null_count_opt()? as i64)
}

fn get_rowcount_stat_value(&self) -> i64 {
self.row_group.num_rows()
}
}

/// Given a filter expression of interest and a set of parquet column descriptors, build a column ->
/// index mapping for columns the expression references. This ensures O(1) lookup times, for an
/// overall O(n) cost to evaluate an expression tree with n nodes.
pub(crate) fn compute_field_indices(
fields: &[ColumnDescPtr],
expression: &Expression,
) -> HashMap<ColumnPath, usize> {
fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnPath>) {
use Expression::*;
let mut recurse = |expr| do_recurse(expr, cols); // less arg passing below
match expression {
Literal(_) => {}
Column(name) => drop(cols.insert(col_name_to_path(name))),
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
scovich marked this conversation as resolved.
Show resolved Hide resolved
Struct(fields) => fields.iter().for_each(recurse),
UnaryOperation { expr, .. } => recurse(expr),
BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)),
VariadicOperation { exprs, .. } => exprs.iter().for_each(recurse),
}
}

// Build up a set of requested column paths, then take each found path as the corresponding map
// key (avoids unnecessary cloning).
//
// NOTE: If a requested column was not available, it is silently ignored.
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
let mut requested_columns = HashSet::new();
do_recurse(expression, &mut requested_columns);
fields
.iter()
.enumerate()
.filter_map(|(i, f)| requested_columns.take(f.path()).map(|path| (path, i)))
.collect()
}
Loading
Loading