Skip to content

Commit

Permalink
feat: Add MetadataCacheFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Aug 29, 2024
1 parent 11027d5 commit 32c4a03
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 19 deletions.
4 changes: 2 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ballista::context::BallistaContext;
#[cfg(any())] // Ballista disabled in CubeStore.
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::{arrow::datatypes::{DataType, Field, Schema}, physical_plan::parquet::BasicMetadataCacheFactory};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::datasource::parquet::ParquetTable;
Expand Down Expand Up @@ -482,7 +482,7 @@ fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?))
Ok(Arc::new(ParquetTable::try_new(&path, Arc::new(BasicMetadataCacheFactory::new()), max_concurrency)?))
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub async fn main() {
let files = file_paths
.map(|file_path| File::open(file_path).unwrap())
.collect::<Vec<_>>();
let mut ctx = ExecutionContext::with_config(execution_config);
let mut ctx = ExecutionContext::with_config(execution_config); // TODO: Probably just ignore datafusion-cli, but consider setting up execution_config further
for file in files {
let mut reader = BufReader::new(file);
exec_from_lines(&mut ctx, &mut reader, print_options.clone()).await;
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn exec_from_lines(
}

async fn exec_from_repl(execution_config: ExecutionConfig, print_options: PrintOptions) {
let mut ctx = ExecutionContext::with_config(execution_config);
let mut ctx = ExecutionContext::with_config(execution_config); // TODO: Probably just ignore datafusion-cli, but consider setting up execution_config further

let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
Expand Down
4 changes: 3 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
// under the License.

use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
Expand Down Expand Up @@ -65,7 +67,7 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let table = ParquetTable::try_new(&request.path[0], Arc::new(BasicMetadataCacheFactory::new()), num_cpus::get()).unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Expand Down
15 changes: 10 additions & 5 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::error::Result;
use crate::logical_plan::{combine_filters, Expr};
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::parquet::{MetadataCacheFactory, ParquetExec};
use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
schema: SchemaRef,
statistics: Statistics,
max_concurrency: usize,
Expand All @@ -43,12 +44,13 @@ pub struct ParquetTable {

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, max_concurrency: usize) -> Result<Self> {
pub fn try_new(path: impl Into<String>, metadata_cache_factory: Arc<dyn MetadataCacheFactory>, max_concurrency: usize) -> Result<Self> {
let path = path.into();
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
let parquet_exec = ParquetExec::try_from_path_with_cache(&path, None, None, 0, 1, None, metadata_cache_factory.make_noop_cache())?;
let schema = parquet_exec.schema();
Ok(Self {
path,
metadata_cache_factory,
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
Expand Down Expand Up @@ -107,7 +109,7 @@ impl TableProvider for ParquetTable {
} else {
None
};
Ok(Arc::new(ParquetExec::try_from_path(
Ok(Arc::new(ParquetExec::try_from_path_with_cache(
&self.path,
projection.clone(),
predicate,
Expand All @@ -116,6 +118,7 @@ impl TableProvider for ParquetTable {
.unwrap_or(batch_size),
self.max_concurrency,
limit,
self.metadata_cache_factory.make_noop_cache()
)?))
}

Expand All @@ -130,6 +133,8 @@ impl TableProvider for ParquetTable {

#[cfg(test)]
mod tests {
use crate::physical_plan::parquet::BasicMetadataCacheFactory;

use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
Expand Down Expand Up @@ -355,7 +360,7 @@ mod tests {
fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
let table = ParquetTable::try_new(&filename, Arc::new(BasicMetadataCacheFactory::new()), 2)?;
Ok(Arc::new(table))
}

Expand Down
23 changes: 21 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
aggregate_statistics::AggregateStatistics, eliminate_limit::EliminateLimit,
hash_build_probe_order::HashBuildProbeOrder,
},
physical_optimizer::optimizer::PhysicalOptimizerRule,
physical_optimizer::optimizer::PhysicalOptimizerRule, physical_plan::parquet::{BasicMetadataCacheFactory, MetadataCacheFactory},
};
use log::debug;
use std::fs;
Expand Down Expand Up @@ -290,6 +290,7 @@ impl ExecutionContext {
self.state.clone(),
&LogicalPlanBuilder::scan_parquet(
filename,
self.state.lock().unwrap().metadata_cache_factory().clone(),
None,
self.state.lock().unwrap().config.concurrency,
)?
Expand Down Expand Up @@ -325,7 +326,7 @@ impl ExecutionContext {
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
let table = {
let m = self.state.lock().unwrap();
ParquetTable::try_new(filename, m.config.concurrency)?
ParquetTable::try_new(filename, m.metadata_cache_factory().clone(), m.config.concurrency)?
.with_enable_pruning(m.config.parquet_pruning)
};
self.register_table(name, Arc::new(table))?;
Expand Down Expand Up @@ -655,6 +656,8 @@ pub struct ExecutionConfig {
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Responsible for constructing ParquetMetadataCaches.
pub metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
/// Default catalog name for table resolution
default_catalog: String,
/// Default schema name for table resolution
Expand All @@ -677,6 +680,7 @@ pub struct ExecutionConfig {
parquet_pruning: bool,
}


impl Default for ExecutionConfig {
fn default() -> Self {
Self {
Expand All @@ -700,6 +704,7 @@ impl Default for ExecutionConfig {
Arc::new(AddCoalescePartitionsExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()),
default_catalog: "datafusion".to_owned(),
default_schema: "public".to_owned(),
create_default_catalog_and_schema: true,
Expand Down Expand Up @@ -743,6 +748,15 @@ impl ExecutionConfig {
self
}

/// Replace the default metadata cache factory
pub fn with_metadata_cache_factory(
mut self,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
) -> Self {
self.metadata_cache_factory = metadata_cache_factory;
self
}

/// Replace the physical optimizer rules
pub fn with_physical_optimizer_rules(
mut self,
Expand Down Expand Up @@ -903,6 +917,11 @@ impl ExecutionContextState {
))
})
}

/// Returns the MetadataCacheFactory
pub fn metadata_cache_factory(&self) -> &Arc<dyn MetadataCacheFactory> {
&self.config.metadata_cache_factory
}
}

impl ContextProvider for ExecutionContextState {
Expand Down
8 changes: 5 additions & 3 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::error::{DataFusionError, Result};
use crate::{error::{DataFusionError, Result}, physical_plan::parquet::MetadataCacheFactory};
use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan};
use crate::{
datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable},
Expand Down Expand Up @@ -140,21 +140,23 @@ impl LogicalPlanBuilder {
/// Scan a Parquet data source
pub fn scan_parquet(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
) -> Result<Self> {
let path = path.into();
Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path)
Self::scan_parquet_with_name(path.clone(), metadata_cache_factory, projection, max_concurrency, path)
}

/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
let provider = Arc::new(ParquetTable::try_new(path, metadata_cache_factory, max_concurrency)?);
Self::scan(table_name, provider, projection)
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl PruningPredicate {
.collect::<Vec<_>>();
let stat_schema = Schema::new(stat_fields);
let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
// TODO: We know that the metadata_cache_factory field -- improperly initialized here -- is
// unused by DefaultPhysicalPlanner::default().create_physical_expr by inspection. So this
// code works, but is fragile. Maybe ExecutionContextState::metadata_cache_factory should
// be an Option<_> and users use .unwrap or .expect to access it.
let execution_context_state = ExecutionContextState::new();
let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(
&logical_predicate_expr,
Expand Down
42 changes: 38 additions & 4 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use arrow::{
};
use hashbrown::HashMap;
use log::debug;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::{
footer,
metadata::RowGroupMetaData,
Expand Down Expand Up @@ -126,7 +127,7 @@ struct ParquetPartitionMetrics {

/// Cache for Parquet Metadata
pub trait ParquetMetadataCache: Debug + Sync + Send {
/// Returns the metadata for the given file, possibly cached on key
/// Returns the metadata for the given file, possibly cached on key.
fn metadata(&self, key: &str, file: &File) -> Result<Arc<ParquetMetaData>>;

/// Creates a FileReader for the given filename
Expand All @@ -140,6 +141,19 @@ pub trait ParquetMetadataCache: Debug + Sync + Send {
}
}

// TODO: Rename to ParquetMetadataCacheFactory? Rename for build_writer_props field?
/// Constructs the desired types of caches for Parquet Metadata.
pub trait MetadataCacheFactory: Sync + Send {
/// Makes a noop cache (which doesn't cache)
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache>;
/// Makes an LRU-based cache.
fn make_lru_cache(&self, max_capacity: u64, time_to_idle: Duration) -> Arc<dyn ParquetMetadataCache>;
/// Modifies and builds writer properties.
fn build_writer_props(&self, builder: WriterPropertiesBuilder) -> WriterProperties {
builder.build()
}
}

/// Default MetadataCache, does not cache anything
#[derive(Debug)]
pub struct NoopParquetMetadataCache;
Expand All @@ -153,7 +167,7 @@ impl NoopParquetMetadataCache {

impl ParquetMetadataCache for NoopParquetMetadataCache {
fn metadata(&self, _key: &str, file: &File) -> Result<Arc<ParquetMetaData>> {
Ok(Arc::new(footer::parse_metadata(file)?))
Ok(Arc::new(footer::parse_metadata(file, &None)?.0))
}
}

Expand Down Expand Up @@ -189,14 +203,33 @@ impl ParquetMetadataCache for LruParquetMetadataCache {
match self.cache.get(&k) {
Some(metadata) => Ok(metadata),
None => {
let metadata = Arc::new(footer::parse_metadata(file)?);
let metadata = Arc::new(footer::parse_metadata(file, &None)?.0);
self.cache.insert(k, metadata.clone());
Ok(metadata)
}
}
}
}

/// Constructs regular Noop or Lru MetadataCacheFactory objects.
pub struct BasicMetadataCacheFactory {
}

impl BasicMetadataCacheFactory {
/// Constructor
pub fn new() -> BasicMetadataCacheFactory { BasicMetadataCacheFactory{} }
}

impl MetadataCacheFactory for BasicMetadataCacheFactory {
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache> {
NoopParquetMetadataCache::new()
}
fn make_lru_cache(&self, max_capacity: u64, time_to_idle: Duration) -> Arc<dyn ParquetMetadataCache> {
LruParquetMetadataCache::new(max_capacity, time_to_idle)
}
}


impl ParquetExec {
/// Create a new Parquet reader execution plan based on the specified Parquet filename or
/// directory containing Parquet files
Expand Down Expand Up @@ -1191,7 +1224,8 @@ mod tests {
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
let ordinal: i16 = 0;
RowGroupMetaData::builder(schema_descr.clone(), ordinal)
.set_num_rows(1000)
.set_total_byte_size(2000)
.set_column_metadata(columns)
Expand Down

0 comments on commit 32c4a03

Please sign in to comment.