Skip to content

Commit

Permalink
feat: Add MetadataCacheFactory interface
Browse files Browse the repository at this point in the history
  • Loading branch information
srh authored Sep 9, 2024
1 parent 11027d5 commit faf7acb
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 18 deletions.
11 changes: 9 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use ballista::context::BallistaContext;
#[cfg(any())] // Ballista disabled in CubeStore.
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::datasource::parquet::ParquetTable;
Expand All @@ -42,6 +41,10 @@ use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
physical_plan::parquet::BasicMetadataCacheFactory,
};

use structopt::StructOpt;

Expand Down Expand Up @@ -482,7 +485,11 @@ fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?))
Ok(Arc::new(ParquetTable::try_new(
&path,
Arc::new(BasicMetadataCacheFactory::new()),
max_concurrency,
)?))
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
9 changes: 8 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
// under the License.

use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
Expand Down Expand Up @@ -65,7 +67,12 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let table = ParquetTable::try_new(
&request.path[0],
Arc::new(BasicMetadataCacheFactory::new()),
num_cpus::get(),
)
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Expand Down
31 changes: 26 additions & 5 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::error::Result;
use crate::logical_plan::{combine_filters, Expr};
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::parquet::{MetadataCacheFactory, ParquetExec};
use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
schema: SchemaRef,
statistics: Statistics,
max_concurrency: usize,
Expand All @@ -43,12 +44,25 @@ pub struct ParquetTable {

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, max_concurrency: usize) -> Result<Self> {
pub fn try_new(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
max_concurrency: usize,
) -> Result<Self> {
let path = path.into();
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
let parquet_exec = ParquetExec::try_from_path_with_cache(
&path,
None,
None,
0,
1,
None,
metadata_cache_factory.make_noop_cache(),
)?;
let schema = parquet_exec.schema();
Ok(Self {
path,
metadata_cache_factory,
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
Expand Down Expand Up @@ -107,7 +121,7 @@ impl TableProvider for ParquetTable {
} else {
None
};
Ok(Arc::new(ParquetExec::try_from_path(
Ok(Arc::new(ParquetExec::try_from_path_with_cache(
&self.path,
projection.clone(),
predicate,
Expand All @@ -116,6 +130,7 @@ impl TableProvider for ParquetTable {
.unwrap_or(batch_size),
self.max_concurrency,
limit,
self.metadata_cache_factory.make_noop_cache(),
)?))
}

Expand All @@ -130,6 +145,8 @@ impl TableProvider for ParquetTable {

#[cfg(test)]
mod tests {
use crate::physical_plan::parquet::BasicMetadataCacheFactory;

use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
Expand Down Expand Up @@ -355,7 +372,11 @@ mod tests {
fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
let table = ParquetTable::try_new(
&filename,
Arc::new(BasicMetadataCacheFactory::new()),
2,
)?;
Ok(Arc::new(table))
}

Expand Down
27 changes: 25 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
hash_build_probe_order::HashBuildProbeOrder,
},
physical_optimizer::optimizer::PhysicalOptimizerRule,
physical_plan::parquet::{BasicMetadataCacheFactory, MetadataCacheFactory},
};
use log::debug;
use std::fs;
Expand Down Expand Up @@ -290,6 +291,7 @@ impl ExecutionContext {
self.state.clone(),
&LogicalPlanBuilder::scan_parquet(
filename,
self.state.lock().unwrap().metadata_cache_factory().clone(),
None,
self.state.lock().unwrap().config.concurrency,
)?
Expand Down Expand Up @@ -325,8 +327,12 @@ impl ExecutionContext {
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
let table = {
let m = self.state.lock().unwrap();
ParquetTable::try_new(filename, m.config.concurrency)?
.with_enable_pruning(m.config.parquet_pruning)
ParquetTable::try_new(
filename,
m.metadata_cache_factory().clone(),
m.config.concurrency,
)?
.with_enable_pruning(m.config.parquet_pruning)
};
self.register_table(name, Arc::new(table))?;
Ok(())
Expand Down Expand Up @@ -655,6 +661,8 @@ pub struct ExecutionConfig {
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Responsible for constructing ParquetMetadataCaches.
pub metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
/// Default catalog name for table resolution
default_catalog: String,
/// Default schema name for table resolution
Expand Down Expand Up @@ -700,6 +708,7 @@ impl Default for ExecutionConfig {
Arc::new(AddCoalescePartitionsExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()),
default_catalog: "datafusion".to_owned(),
default_schema: "public".to_owned(),
create_default_catalog_and_schema: true,
Expand Down Expand Up @@ -743,6 +752,15 @@ impl ExecutionConfig {
self
}

/// Replace the default metadata cache factory
pub fn with_metadata_cache_factory(
mut self,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
) -> Self {
self.metadata_cache_factory = metadata_cache_factory;
self
}

/// Replace the physical optimizer rules
pub fn with_physical_optimizer_rules(
mut self,
Expand Down Expand Up @@ -903,6 +921,11 @@ impl ExecutionContextState {
))
})
}

/// Returns the MetadataCacheFactory
pub fn metadata_cache_factory(&self) -> &Arc<dyn MetadataCacheFactory> {
&self.config.metadata_cache_factory
}
}

impl ContextProvider for ExecutionContextState {
Expand Down
21 changes: 18 additions & 3 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::error::{DataFusionError, Result};
use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan};
use crate::{
datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable},
prelude::CsvReadOptions,
};
use crate::{
error::{DataFusionError, Result},
physical_plan::parquet::MetadataCacheFactory,
};

use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
Expand Down Expand Up @@ -140,21 +143,33 @@ impl LogicalPlanBuilder {
/// Scan a Parquet data source
pub fn scan_parquet(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
) -> Result<Self> {
let path = path.into();
Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path)
Self::scan_parquet_with_name(
path.clone(),
metadata_cache_factory,
projection,
max_concurrency,
path,
)
}

/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
let provider = Arc::new(ParquetTable::try_new(
path,
metadata_cache_factory,
max_concurrency,
)?);
Self::scan(table_name, provider, projection)
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl PruningPredicate {
.collect::<Vec<_>>();
let stat_schema = Schema::new(stat_fields);
let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
// TODO: We know that the metadata_cache_factory field -- improperly initialized here -- is
// unused by DefaultPhysicalPlanner::default().create_physical_expr by inspection. So this
// code works, but is fragile. Maybe ExecutionContextState::metadata_cache_factory should
// be an Option<_> and users use .unwrap or .expect to access it.
let execution_context_state = ExecutionContextState::new();
let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(
&logical_predicate_expr,
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::cube_ext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{
error::{DataFusionError, Result},
Expand Down
50 changes: 46 additions & 4 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use arrow::{
};
use hashbrown::HashMap;
use log::debug;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::{
footer,
metadata::RowGroupMetaData,
Expand Down Expand Up @@ -126,7 +127,7 @@ struct ParquetPartitionMetrics {

/// Cache for Parquet Metadata
pub trait ParquetMetadataCache: Debug + Sync + Send {
/// Returns the metadata for the given file, possibly cached on key
/// Returns the metadata for the given file, possibly cached on key.
fn metadata(&self, key: &str, file: &File) -> Result<Arc<ParquetMetaData>>;

/// Creates a FileReader for the given filename
Expand All @@ -140,6 +141,23 @@ pub trait ParquetMetadataCache: Debug + Sync + Send {
}
}

// TODO: Rename to ParquetMetadataCacheFactory? Rename for build_writer_props field?
/// Constructs the desired types of caches for Parquet Metadata.
pub trait MetadataCacheFactory: Sync + Send {
/// Makes a noop cache (which doesn't cache)
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache>;
/// Makes an LRU-based cache.
fn make_lru_cache(
&self,
max_capacity: u64,
time_to_idle: Duration,
) -> Arc<dyn ParquetMetadataCache>;
/// Modifies and builds writer properties.
fn build_writer_props(&self, builder: WriterPropertiesBuilder) -> WriterProperties {
builder.build()
}
}

/// Default MetadataCache, does not cache anything
#[derive(Debug)]
pub struct NoopParquetMetadataCache;
Expand All @@ -153,7 +171,7 @@ impl NoopParquetMetadataCache {

impl ParquetMetadataCache for NoopParquetMetadataCache {
fn metadata(&self, _key: &str, file: &File) -> Result<Arc<ParquetMetaData>> {
Ok(Arc::new(footer::parse_metadata(file)?))
Ok(Arc::new(footer::parse_metadata(file, &None)?.0))
}
}

Expand Down Expand Up @@ -189,14 +207,37 @@ impl ParquetMetadataCache for LruParquetMetadataCache {
match self.cache.get(&k) {
Some(metadata) => Ok(metadata),
None => {
let metadata = Arc::new(footer::parse_metadata(file)?);
let metadata = Arc::new(footer::parse_metadata(file, &None)?.0);
self.cache.insert(k, metadata.clone());
Ok(metadata)
}
}
}
}

/// Constructs regular Noop or Lru MetadataCacheFactory objects.
pub struct BasicMetadataCacheFactory {}

impl BasicMetadataCacheFactory {
/// Constructor
pub fn new() -> BasicMetadataCacheFactory {
BasicMetadataCacheFactory {}
}
}

impl MetadataCacheFactory for BasicMetadataCacheFactory {
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache> {
NoopParquetMetadataCache::new()
}
fn make_lru_cache(
&self,
max_capacity: u64,
time_to_idle: Duration,
) -> Arc<dyn ParquetMetadataCache> {
LruParquetMetadataCache::new(max_capacity, time_to_idle)
}
}

impl ParquetExec {
/// Create a new Parquet reader execution plan based on the specified Parquet filename or
/// directory containing Parquet files
Expand Down Expand Up @@ -1191,7 +1232,8 @@ mod tests {
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
let ordinal: i16 = 0;
RowGroupMetaData::builder(schema_descr.clone(), ordinal)
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
Expand Down

0 comments on commit faf7acb

Please sign in to comment.