Skip to content

Commit

Permalink
Support user defined ParquetAccessPlan in ParquetExec, validation…
Browse files Browse the repository at this point in the history
… to `ParquetAccessPlan::select` (#10813)

* Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select

* Add test for filtering and user supplied access plan

* fix on windows

* Apply suggestions from code review

Co-authored-by: Jeffrey Vo <[email protected]>

---------

Co-authored-by: Jeffrey Vo <[email protected]>
  • Loading branch information
alamb and Jefffrey authored Jun 9, 2024
1 parent ad0dc2f commit 9503456
Show file tree
Hide file tree
Showing 8 changed files with 692 additions and 38 deletions.
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ impl PartitionedFile {
self.range = Some(FileRange { start, end });
self
}

/// Update the user defined extensions for this file.
///
/// This can be used to pass reader specific information.
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
121 changes: 111 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{internal_err, Result};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;

Expand Down Expand Up @@ -182,6 +183,11 @@ impl ParquetAccessPlan {
/// is returned for *all* the rows in the row groups that are not skipped.
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
///
/// # Errors
///
/// Returns an error if any specified row selection does not specify
/// the same number of rows as in it's corresponding `row_group_metadata`.
///
/// # Example: No Selections
///
/// Given an access plan like this
Expand Down Expand Up @@ -228,7 +234,7 @@ impl ParquetAccessPlan {
pub fn into_overall_row_selection(
self,
row_group_meta_data: &[RowGroupMetaData],
) -> Option<RowSelection> {
) -> Result<Option<RowSelection>> {
assert_eq!(row_group_meta_data.len(), self.row_groups.len());
// Intuition: entire row groups are filtered out using
// `row_group_indexes` which come from Skip and Scan. An overall
Expand All @@ -239,7 +245,32 @@ impl ParquetAccessPlan {
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
return Ok(None);
}

// validate all Selections
for (idx, (rg, rg_meta)) in self
.row_groups
.iter()
.zip(row_group_meta_data.iter())
.enumerate()
{
let RowGroupAccess::Selection(selection) = rg else {
continue;
};
let rows_in_selection = selection
.iter()
.map(|selection| selection.row_count)
.sum::<usize>();

let row_group_row_count = rg_meta.num_rows();
if rows_in_selection as i64 != row_group_row_count {
return internal_err!(
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
but selection only specifies {rows_in_selection} rows. \
Selection: {selection:?}"
);
}
}

let total_selection: RowSelection = self
Expand All @@ -261,7 +292,7 @@ impl ParquetAccessPlan {
})
.collect();

Some(total_selection)
Ok(Some(total_selection))
}

/// Return an iterator over the row group indexes that should be scanned
Expand Down Expand Up @@ -305,6 +336,7 @@ impl ParquetAccessPlan {
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
Expand All @@ -320,7 +352,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// scan all row groups, no selection
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -337,7 +371,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// skip all row groups, no selection
assert_eq!(row_group_indexes, vec![] as Vec<usize>);
Expand All @@ -348,14 +384,22 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// select / skip all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8),
]
.into(),
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
assert_eq!(
Expand All @@ -366,7 +410,8 @@ mod test {
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
RowSelector::skip(7),
RowSelector::select(8)
]
.into()
)
Expand All @@ -379,13 +424,21 @@ mod test {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// specify all 30 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
]
.into(),
),
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
assert_eq!(
Expand All @@ -397,6 +450,7 @@ mod test {
// selectors from the third row group
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
// select the entire fourth row group
RowSelector::select(40),
]
Expand All @@ -405,6 +459,53 @@ mod test {
);
}

#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
}

#[test]
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
RowSelector::skip(2),
RowSelector::select(10),
]
.into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
Expand Down
46 changes: 46 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,52 @@ pub use writer::plan_to_parquet;
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details.
///
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
/// groups that the ParquetExec will consider by providing an initial
/// [`ParquetAccessPlan`] as `extensions` on [`PartitionedFile`]. This can be
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
/// other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Schema, SchemaRef};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # fn schema() -> SchemaRef {
/// # Arc::new(Schema::empty())
/// # }
/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
/// let mut access_plan = ParquetAccessPlan::new_all(5);
/// access_plan.skip(2);
/// access_plan.skip(4);
/// // provide the plan as extension to the FileScanConfig
/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
/// .with_extensions(Arc::new(access_plan));
/// // create a ParquetExec to scan this file
/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
/// .with_file(partitioned_file);
/// // this parquet exec will not even try to read row groups 2 and 4. Additional
/// // pruning based on predicates may also happen
/// let exec = ParquetExec::builder(file_scan_config).build();
/// ```
///
/// For a complete example, see the [`parquet_index_advanced` example]).
///
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
Expand Down
46 changes: 39 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;
use arrow_schema::{ArrowError, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -60,11 +61,10 @@ pub(super) struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
);
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
Expand Down Expand Up @@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let access_plan =
create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
Expand Down Expand Up @@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener {

let row_group_indexes = access_plan.row_group_indexes();
if let Some(row_selection) =
access_plan.into_overall_row_selection(rg_metadata)
access_plan.into_overall_row_selection(rg_metadata)?
{
builder = builder.with_row_selection(row_selection);
}
Expand All @@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener {
}))
}
}

/// Return the initial [`ParquetAccessPlan`]
///
/// If the user has supplied one as an extension, use that
/// otherwise return a plan that scans all row groups
///
/// Returns an error if an invalid `ParquetAccessPlan` is provided
///
/// Note: file_name is only used for error messages
fn create_initial_plan(
file_name: &str,
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}

// check row group count matches the plan
return Ok(access_plan.clone());
}
}

// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
Loading

0 comments on commit 9503456

Please sign in to comment.