Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add support for user-provided metadata loaders #12593

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
use datafusion::datasource::physical_plan::parquet::{
ParquetFileReader, ParquetFileReaderFactory,
};
use datafusion::datasource::physical_plan::{FileMeta, FileScanConfig};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
RowSelection, RowSelector,
};
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use datafusion::parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -552,7 +554,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
) -> Result<Box<dyn ParquetFileReader>> {
// for this example we ignore the partition index and metrics
// but in a real system you would likely use them to report details on
// the performance of the reader.
Expand Down Expand Up @@ -621,6 +623,20 @@ impl AsyncFileReader for ParquetReaderWithCache {
}
}

impl ParquetFileReader for ParquetReaderWithCache {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<ArrowReaderMetadata>> {
// This could be cached too, if CPU time is a concern in addition to storage latency
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}

/// Creates a new parquet file at the specified path.
///
/// * id: Int32
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use reader::{
DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory,
};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader,
};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
Expand All @@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;

Expand Down Expand Up @@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let mut reader: Box<dyn AsyncFileReader> =
let mut reader: Box<dyn ParquetFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand Down Expand Up @@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let metadata = reader.load_metadata(options.clone()).await?;
let mut schema = metadata.schema().clone();
// read with view types
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
Expand All @@ -133,8 +131,10 @@ impl FileOpener for ParquetOpener {
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader.upcast(),
metadata,
);

let file_schema = builder.schema().clone();

Expand Down
51 changes: 46 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::Bytes;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
Expand Down Expand Up @@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
) -> datafusion_common::Result<Box<dyn ParquetFileReader>>;
}

/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use one of the (many!) already existing APIs /traits for loading the metadata. The number of layers of abstraction are already pretty mind boggling. Adding yet another one seems unweildy. Or maybe we could just change the code not to call the loader if it has metadata 🤔

pub trait ParquetFileReader: AsyncFileReader + Send + 'static {
/// Returns a [`AsyncFileReader`] trait object
///
/// This can usually be implemented as `Box::new(*self)`
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unfortunate, but I also could not figire out a way without it


/// Parses the file's metadata
///
/// The default implementation is:
///
/// ```ignore
/// Box::pin(ArrowReaderMetadata::load_async(self, options))
/// ```
fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>>;
}

macro_rules! impl_ParquetFileReader {
($type:ty) => {
impl ParquetFileReader for $type {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}
};
}

impl_ParquetFileReader!(ParquetObjectReader);
impl_ParquetFileReader!(DefaultParquetFileReader);

/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
Expand All @@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory {
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
pub(crate) struct DefaultParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
}

impl AsyncFileReader for ParquetFileReader {
impl AsyncFileReader for DefaultParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
Expand Down Expand Up @@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
) -> datafusion_common::Result<Box<dyn ParquetFileReader>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
Expand All @@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(ParquetFileReader {
Ok(Box::new(DefaultParquetFileReader {
inner,
file_metrics,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;

use arrow::datatypes::DataType::Decimal128;
Expand Down Expand Up @@ -1516,7 +1516,7 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
let reader = ParquetFileReader {
let reader = DefaultParquetFileReader {
inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta),
file_metrics: file_metrics.clone(),
};
Expand Down
23 changes: 19 additions & 4 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::parquet::ParquetFileReader;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
};
Expand All @@ -41,6 +42,7 @@ use futures::{FutureExt, TryFutureExt};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
Expand Down Expand Up @@ -115,7 +117,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
) -> Result<Box<dyn ParquetFileReader>> {
let metadata = file_meta
.extensions
.as_ref()
Expand All @@ -132,7 +134,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
metrics,
);

Ok(Box::new(ParquetFileReader {
Ok(Box::new(CustomParquetFileReader {
store: Arc::clone(&self.0),
meta: file_meta.object_meta,
metrics: parquet_file_metrics,
Expand Down Expand Up @@ -202,14 +204,14 @@ async fn store_parquet_in_memory(
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
struct ParquetFileReader {
struct CustomParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metrics: ParquetFileMetrics,
metadata_size_hint: Option<usize>,
}

impl AsyncFileReader for ParquetFileReader {
impl AsyncFileReader for CustomParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
Expand Down Expand Up @@ -243,3 +245,16 @@ impl AsyncFileReader for ParquetFileReader {
})
}
}

impl ParquetFileReader for CustomParquetFileReader {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}
Loading