Skip to content

Commit

Permalink
parquet: Add support for user-provided metadata loaders
Browse files Browse the repository at this point in the history
This allows users to, for example, cache the Page Index so it does not
need to be parsed every time we open the file.
  • Loading branch information
progval committed Sep 23, 2024
1 parent a35d007 commit 0ca234b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 23 deletions.
24 changes: 20 additions & 4 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
use datafusion::datasource::physical_plan::parquet::{
ParquetFileReader, ParquetFileReaderFactory,
};
use datafusion::datasource::physical_plan::{FileMeta, FileScanConfig};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
RowSelection, RowSelector,
};
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use datafusion::parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -552,7 +554,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
) -> Result<Box<dyn ParquetFileReader>> {
// for this example we ignore the partition index and metrics
// but in a real system you would likely use them to report details on
// the performance of the reader.
Expand Down Expand Up @@ -621,6 +623,20 @@ impl AsyncFileReader for ParquetReaderWithCache {
}
}

impl ParquetFileReader for ParquetReaderWithCache {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<ArrowReaderMetadata>> {
// This could be cached too, if CPU time is a concern in addition to storage latency
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}

/// Creates a new parquet file at the specified path.
///
/// * id: Int32
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use reader::{
DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory,
};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader,
};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
Expand All @@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;

Expand Down Expand Up @@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let mut reader: Box<dyn AsyncFileReader> =
let mut reader: Box<dyn ParquetFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand Down Expand Up @@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let metadata = reader.load_metadata(options.clone()).await?;
let mut schema = metadata.schema().clone();
// read with view types
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
Expand All @@ -133,8 +131,10 @@ impl FileOpener for ParquetOpener {
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader.upcast(),
metadata,
);

let file_schema = builder.schema().clone();

Expand Down
51 changes: 46 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::Bytes;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
Expand Down Expand Up @@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
) -> datafusion_common::Result<Box<dyn ParquetFileReader>>;
}

/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
pub trait ParquetFileReader: AsyncFileReader + Send + 'static {
/// Returns a [`AsyncFileReader`] trait object
///
/// This can usually be implemented as `Box::new(*self)`
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>;

/// Parses the file's metadata
///
/// The default implementation is:
///
/// ```
/// Box::pin(ArrowReaderMetadata::load_async(self, options))
/// ```
fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>>;
}

macro_rules! impl_ParquetFileReader {
($type:ty) => {
impl ParquetFileReader for $type {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}
};
}

impl_ParquetFileReader!(ParquetObjectReader);
impl_ParquetFileReader!(DefaultParquetFileReader);

/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
Expand All @@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory {
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
pub(crate) struct DefaultParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
}

impl AsyncFileReader for ParquetFileReader {
impl AsyncFileReader for DefaultParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
Expand Down Expand Up @@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
) -> datafusion_common::Result<Box<dyn ParquetFileReader>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
Expand All @@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(ParquetFileReader {
Ok(Box::new(DefaultParquetFileReader {
inner,
file_metrics,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;

use arrow::datatypes::DataType::Decimal128;
Expand Down Expand Up @@ -1516,7 +1516,7 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
let reader = ParquetFileReader {
let reader = DefaultParquetFileReader {
inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta),
file_metrics: file_metrics.clone(),
};
Expand Down
23 changes: 19 additions & 4 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::parquet::ParquetFileReader;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
};
Expand All @@ -41,6 +42,7 @@ use futures::{FutureExt, TryFutureExt};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
Expand Down Expand Up @@ -115,7 +117,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
) -> Result<Box<dyn ParquetFileReader>> {
let metadata = file_meta
.extensions
.as_ref()
Expand All @@ -132,7 +134,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
metrics,
);

Ok(Box::new(ParquetFileReader {
Ok(Box::new(CustomParquetFileReader {
store: Arc::clone(&self.0),
meta: file_meta.object_meta,
metrics: parquet_file_metrics,
Expand Down Expand Up @@ -202,14 +204,14 @@ async fn store_parquet_in_memory(
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
struct ParquetFileReader {
struct CustomParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metrics: ParquetFileMetrics,
metadata_size_hint: Option<usize>,
}

impl AsyncFileReader for ParquetFileReader {
impl AsyncFileReader for CustomParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
Expand Down Expand Up @@ -243,3 +245,16 @@ impl AsyncFileReader for ParquetFileReader {
})
}
}

impl ParquetFileReader for CustomParquetFileReader {
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
Box::new(*self)
}

fn load_metadata(
&mut self,
options: ArrowReaderOptions,
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
Box::pin(ArrowReaderMetadata::load_async(self, options))
}
}

0 comments on commit 0ca234b

Please sign in to comment.