Skip to content

Commit

Permalink
parquet: Add support for user-provided metadata loaders
Browse files Browse the repository at this point in the history
This allows users to, for example, cache the Page Index so it does not
need to be parsed every time we open the file.
  • Loading branch information
progval committed Sep 23, 2024
1 parent a35d007 commit 620f0bb
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use reader::{
DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory,
};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader,
};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
Expand All @@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;

Expand Down Expand Up @@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let mut reader: Box<dyn AsyncFileReader> =
let mut reader: Box<dyn ParquetFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand Down Expand Up @@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let metadata = reader.load_metadata(options.clone()).await?;
let mut schema = metadata.schema().clone();
// read with view types
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
Expand All @@ -133,8 +131,10 @@ impl FileOpener for ParquetOpener {
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader.upcast(),
metadata,
);

let file_schema = builder.schema().clone();

Expand Down
51 changes: 46 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::Bytes;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
Expand Down Expand Up @@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
) -> datafusion_common::Result<Box<dyn ParquetFileReader>>;
}

/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
pub trait ParquetFileReader: AsyncFileReader + Send + 'static {
/// Returns a [`AsyncFileReader`] trait object
///
/// This can usually be implemented as `Box::new(*self)`
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>;

/// Parses the file's metadata
///
/// The default implementation is:
///
/// ```
/// Box::pin(ArrowReaderMetadata::load_async(self, options))
/// ```
fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>>;
}

macro_rules! impl_ParquetFileReader {
($type:ty) => {
impl ParquetFileReader for $type {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}
};
}

impl_ParquetFileReader!(ParquetObjectReader);
impl_ParquetFileReader!(DefaultParquetFileReader);

/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
Expand All @@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory {
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
pub(crate) struct DefaultParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
}

impl AsyncFileReader for ParquetFileReader {
impl AsyncFileReader for DefaultParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
Expand Down Expand Up @@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
) -> datafusion_common::Result<Box<dyn ParquetFileReader>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
Expand All @@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(ParquetFileReader {
Ok(Box::new(DefaultParquetFileReader {
inner,
file_metrics,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;

use arrow::datatypes::DataType::Decimal128;
Expand Down Expand Up @@ -1516,7 +1516,7 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
let reader = ParquetFileReader {
let reader = DefaultParquetFileReader {
inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta),
file_metrics: file_metrics.clone(),
};
Expand Down

0 comments on commit 620f0bb

Please sign in to comment.