From faf7acb5a3f3d4976711f6faf76c7750b22b0eda Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 9 Sep 2024 00:28:48 -0700 Subject: [PATCH] feat: Add MetadataCacheFactory interface --- benchmarks/src/bin/tpch.rs | 11 +++- datafusion-examples/examples/flight_server.rs | 9 +++- datafusion/src/datasource/parquet.rs | 31 ++++++++++-- datafusion/src/execution/context.rs | 27 +++++++++- datafusion/src/logical_plan/builder.rs | 21 ++++++-- datafusion/src/physical_optimizer/pruning.rs | 4 ++ datafusion/src/physical_plan/mod.rs | 1 - datafusion/src/physical_plan/parquet.rs | 50 +++++++++++++++++-- 8 files changed, 136 insertions(+), 18 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 03eebf5c208c..cbddae567ba5 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -30,7 +30,6 @@ use ballista::context::BallistaContext; #[cfg(any())] // Ballista disabled in CubeStore. use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS}; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; use datafusion::datasource::parquet::ParquetTable; @@ -42,6 +41,10 @@ use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; +use datafusion::{ + arrow::datatypes::{DataType, Field, Schema}, + physical_plan::parquet::BasicMetadataCacheFactory, +}; use structopt::StructOpt; @@ -482,7 +485,11 @@ fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?)) + Ok(Arc::new(ParquetTable::try_new( + &path, + Arc::new(BasicMetadataCacheFactory::new()), + max_concurrency, + )?)) } other => { unimplemented!("Invalid file format '{}'", other); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 138434ea2482..9d97ae3d3164 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -16,8 +16,10 @@ // under the License. use std::pin::Pin; +use std::sync::Arc; use arrow_flight::SchemaAsIpc; +use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -65,7 +67,12 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); + let table = ParquetTable::try_new( + &request.path[0], + Arc::new(BasicMetadataCacheFactory::new()), + num_cpus::get(), + ) + .unwrap(); let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into(); diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 5e1a34aff004..d47da5a5d43f 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -27,7 +27,7 @@ use crate::datasource::datasource::Statistics; use crate::datasource::TableProvider; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; -use crate::physical_plan::parquet::ParquetExec; +use crate::physical_plan::parquet::{MetadataCacheFactory, ParquetExec}; use crate::physical_plan::ExecutionPlan; use super::datasource::TableProviderFilterPushDown; @@ -35,6 +35,7 @@ use super::datasource::TableProviderFilterPushDown; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, + metadata_cache_factory: Arc, schema: SchemaRef, statistics: Statistics, max_concurrency: usize, @@ -43,12 +44,25 @@ pub struct ParquetTable { impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. - pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { + pub fn try_new( + path: impl Into, + metadata_cache_factory: Arc, + max_concurrency: usize, + ) -> Result { let path = path.into(); - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; + let parquet_exec = ParquetExec::try_from_path_with_cache( + &path, + None, + None, + 0, + 1, + None, + metadata_cache_factory.make_noop_cache(), + )?; let schema = parquet_exec.schema(); Ok(Self { path, + metadata_cache_factory, schema, statistics: parquet_exec.statistics().to_owned(), max_concurrency, @@ -107,7 +121,7 @@ impl TableProvider for ParquetTable { } else { None }; - Ok(Arc::new(ParquetExec::try_from_path( + Ok(Arc::new(ParquetExec::try_from_path_with_cache( &self.path, projection.clone(), predicate, @@ -116,6 +130,7 @@ impl TableProvider for ParquetTable { .unwrap_or(batch_size), self.max_concurrency, limit, + self.metadata_cache_factory.make_noop_cache(), )?)) } @@ -130,6 +145,8 @@ impl TableProvider for ParquetTable { #[cfg(test)] mod tests { + use crate::physical_plan::parquet::BasicMetadataCacheFactory; + use super::*; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, @@ -355,7 +372,11 @@ mod tests { fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let table = ParquetTable::try_new(&filename, 2)?; + let table = ParquetTable::try_new( + &filename, + Arc::new(BasicMetadataCacheFactory::new()), + 2, + )?; Ok(Arc::new(table)) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index f6ad9fe5f4cc..dab909a2d77e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -27,6 +27,7 @@ use crate::{ hash_build_probe_order::HashBuildProbeOrder, }, physical_optimizer::optimizer::PhysicalOptimizerRule, + physical_plan::parquet::{BasicMetadataCacheFactory, MetadataCacheFactory}, }; use log::debug; use std::fs; @@ -290,6 +291,7 @@ impl ExecutionContext { self.state.clone(), &LogicalPlanBuilder::scan_parquet( filename, + self.state.lock().unwrap().metadata_cache_factory().clone(), None, self.state.lock().unwrap().config.concurrency, )? @@ -325,8 +327,12 @@ impl ExecutionContext { pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { let table = { let m = self.state.lock().unwrap(); - ParquetTable::try_new(filename, m.config.concurrency)? - .with_enable_pruning(m.config.parquet_pruning) + ParquetTable::try_new( + filename, + m.metadata_cache_factory().clone(), + m.config.concurrency, + )? + .with_enable_pruning(m.config.parquet_pruning) }; self.register_table(name, Arc::new(table))?; Ok(()) @@ -655,6 +661,8 @@ pub struct ExecutionConfig { pub physical_optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` query_planner: Arc, + /// Responsible for constructing ParquetMetadataCaches. + pub metadata_cache_factory: Arc, /// Default catalog name for table resolution default_catalog: String, /// Default schema name for table resolution @@ -700,6 +708,7 @@ impl Default for ExecutionConfig { Arc::new(AddCoalescePartitionsExec::new()), ], query_planner: Arc::new(DefaultQueryPlanner {}), + metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), create_default_catalog_and_schema: true, @@ -743,6 +752,15 @@ impl ExecutionConfig { self } + /// Replace the default metadata cache factory + pub fn with_metadata_cache_factory( + mut self, + metadata_cache_factory: Arc, + ) -> Self { + self.metadata_cache_factory = metadata_cache_factory; + self + } + /// Replace the physical optimizer rules pub fn with_physical_optimizer_rules( mut self, @@ -903,6 +921,11 @@ impl ExecutionContextState { )) }) } + + /// Returns the MetadataCacheFactory + pub fn metadata_cache_factory(&self) -> &Arc { + &self.config.metadata_cache_factory + } } impl ContextProvider for ExecutionContextState { diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 83188f1cfabc..6876783ec702 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -27,12 +27,15 @@ use arrow::{ record_batch::RecordBatch, }; -use crate::error::{DataFusionError, Result}; use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan}; use crate::{ datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable}, prelude::CsvReadOptions, }; +use crate::{ + error::{DataFusionError, Result}, + physical_plan::parquet::MetadataCacheFactory, +}; use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; @@ -140,21 +143,33 @@ impl LogicalPlanBuilder { /// Scan a Parquet data source pub fn scan_parquet( path: impl Into, + metadata_cache_factory: Arc, projection: Option>, max_concurrency: usize, ) -> Result { let path = path.into(); - Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path) + Self::scan_parquet_with_name( + path.clone(), + metadata_cache_factory, + projection, + max_concurrency, + path, + ) } /// Scan a Parquet data source and register it with a given table name pub fn scan_parquet_with_name( path: impl Into, + metadata_cache_factory: Arc, projection: Option>, max_concurrency: usize, table_name: impl Into, ) -> Result { - let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); + let provider = Arc::new(ParquetTable::try_new( + path, + metadata_cache_factory, + max_concurrency, + )?); Self::scan(table_name, provider, projection) } diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index f394b721e8b8..ea1ce784fcdc 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -127,6 +127,10 @@ impl PruningPredicate { .collect::>(); let stat_schema = Schema::new(stat_fields); let stat_dfschema = DFSchema::try_from(stat_schema.clone())?; + // TODO: We know that the metadata_cache_factory field -- improperly initialized here -- is + // unused by DefaultPhysicalPlanner::default().create_physical_expr by inspection. So this + // code works, but is fragile. Maybe ExecutionContextState::metadata_cache_factory should + // be an Option<_> and users use .unwrap or .expect to access it. let execution_context_state = ExecutionContextState::new(); let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( &logical_predicate_expr, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 05a2f2e1dc96..0c76b6e55fa7 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -20,7 +20,6 @@ use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; -use crate::cube_ext; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ error::{DataFusionError, Result}, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 08080c8ef5c3..da098f1e50b8 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -43,6 +43,7 @@ use arrow::{ }; use hashbrown::HashMap; use log::debug; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::{ footer, metadata::RowGroupMetaData, @@ -126,7 +127,7 @@ struct ParquetPartitionMetrics { /// Cache for Parquet Metadata pub trait ParquetMetadataCache: Debug + Sync + Send { - /// Returns the metadata for the given file, possibly cached on key + /// Returns the metadata for the given file, possibly cached on key. fn metadata(&self, key: &str, file: &File) -> Result>; /// Creates a FileReader for the given filename @@ -140,6 +141,23 @@ pub trait ParquetMetadataCache: Debug + Sync + Send { } } +// TODO: Rename to ParquetMetadataCacheFactory? Rename for build_writer_props field? +/// Constructs the desired types of caches for Parquet Metadata. +pub trait MetadataCacheFactory: Sync + Send { + /// Makes a noop cache (which doesn't cache) + fn make_noop_cache(&self) -> Arc; + /// Makes an LRU-based cache. + fn make_lru_cache( + &self, + max_capacity: u64, + time_to_idle: Duration, + ) -> Arc; + /// Modifies and builds writer properties. + fn build_writer_props(&self, builder: WriterPropertiesBuilder) -> WriterProperties { + builder.build() + } +} + /// Default MetadataCache, does not cache anything #[derive(Debug)] pub struct NoopParquetMetadataCache; @@ -153,7 +171,7 @@ impl NoopParquetMetadataCache { impl ParquetMetadataCache for NoopParquetMetadataCache { fn metadata(&self, _key: &str, file: &File) -> Result> { - Ok(Arc::new(footer::parse_metadata(file)?)) + Ok(Arc::new(footer::parse_metadata(file, &None)?.0)) } } @@ -189,7 +207,7 @@ impl ParquetMetadataCache for LruParquetMetadataCache { match self.cache.get(&k) { Some(metadata) => Ok(metadata), None => { - let metadata = Arc::new(footer::parse_metadata(file)?); + let metadata = Arc::new(footer::parse_metadata(file, &None)?.0); self.cache.insert(k, metadata.clone()); Ok(metadata) } @@ -197,6 +215,29 @@ impl ParquetMetadataCache for LruParquetMetadataCache { } } +/// Constructs regular Noop or Lru MetadataCacheFactory objects. +pub struct BasicMetadataCacheFactory {} + +impl BasicMetadataCacheFactory { + /// Constructor + pub fn new() -> BasicMetadataCacheFactory { + BasicMetadataCacheFactory {} + } +} + +impl MetadataCacheFactory for BasicMetadataCacheFactory { + fn make_noop_cache(&self) -> Arc { + NoopParquetMetadataCache::new() + } + fn make_lru_cache( + &self, + max_capacity: u64, + time_to_idle: Duration, + ) -> Arc { + LruParquetMetadataCache::new(max_capacity, time_to_idle) + } +} + impl ParquetExec { /// Create a new Parquet reader execution plan based on the specified Parquet filename or /// directory containing Parquet files @@ -1191,7 +1232,8 @@ mod tests { .unwrap(); columns.push(column); } - RowGroupMetaData::builder(schema_descr.clone()) + let ordinal: i16 = 0; + RowGroupMetaData::builder(schema_descr.clone(), ordinal) .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(columns)