Skip to content

Commit

Permalink
style: Obey linting rules
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Sep 5, 2024
1 parent cb5d4b7 commit c9cc20a
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 22 deletions.
11 changes: 9 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use ballista::context::BallistaContext;
#[cfg(any())] // Ballista disabled in CubeStore.
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};

use datafusion::{arrow::datatypes::{DataType, Field, Schema}, physical_plan::parquet::BasicMetadataCacheFactory};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;
use datafusion::datasource::parquet::ParquetTable;
Expand All @@ -42,6 +41,10 @@ use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
physical_plan::parquet::BasicMetadataCacheFactory,
};

use structopt::StructOpt;

Expand Down Expand Up @@ -482,7 +485,11 @@ fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
Ok(Arc::new(ParquetTable::try_new(&path, Arc::new(BasicMetadataCacheFactory::new()), max_concurrency)?))
Ok(Arc::new(ParquetTable::try_new(
&path,
Arc::new(BasicMetadataCacheFactory::new()),
max_concurrency,
)?))
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub async fn main() {
let files = file_paths
.map(|file_path| File::open(file_path).unwrap())
.collect::<Vec<_>>();
let mut ctx = ExecutionContext::with_config(execution_config); // TODO: Probably just ignore datafusion-cli, but consider setting up execution_config further
let mut ctx = ExecutionContext::with_config(execution_config);
for file in files {
let mut reader = BufReader::new(file);
exec_from_lines(&mut ctx, &mut reader, print_options.clone()).await;
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn exec_from_lines(
}

async fn exec_from_repl(execution_config: ExecutionConfig, print_options: PrintOptions) {
let mut ctx = ExecutionContext::with_config(execution_config); // TODO: Probably just ignore datafusion-cli, but consider setting up execution_config further
let mut ctx = ExecutionContext::with_config(execution_config);

let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
Expand Down
7 changes: 6 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let table = ParquetTable::try_new(&request.path[0], Arc::new(BasicMetadataCacheFactory::new()), num_cpus::get()).unwrap();
let table = ParquetTable::try_new(
&request.path[0],
Arc::new(BasicMetadataCacheFactory::new()),
num_cpus::get(),
)
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Expand Down
24 changes: 20 additions & 4 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,21 @@ pub struct ParquetTable {

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, metadata_cache_factory: Arc<dyn MetadataCacheFactory>, max_concurrency: usize) -> Result<Self> {
pub fn try_new(
path: impl Into<String>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
max_concurrency: usize,
) -> Result<Self> {
let path = path.into();
let parquet_exec = ParquetExec::try_from_path_with_cache(&path, None, None, 0, 1, None, metadata_cache_factory.make_noop_cache())?;
let parquet_exec = ParquetExec::try_from_path_with_cache(
&path,
None,
None,
0,
1,
None,
metadata_cache_factory.make_noop_cache(),
)?;
let schema = parquet_exec.schema();
Ok(Self {
path,
Expand Down Expand Up @@ -118,7 +130,7 @@ impl TableProvider for ParquetTable {
.unwrap_or(batch_size),
self.max_concurrency,
limit,
self.metadata_cache_factory.make_noop_cache()
self.metadata_cache_factory.make_noop_cache(),
)?))
}

Expand Down Expand Up @@ -360,7 +372,11 @@ mod tests {
fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, Arc::new(BasicMetadataCacheFactory::new()), 2)?;
let table = ParquetTable::try_new(
&filename,
Arc::new(BasicMetadataCacheFactory::new()),
2,
)?;
Ok(Arc::new(table))
}

Expand Down
12 changes: 8 additions & 4 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{
aggregate_statistics::AggregateStatistics, eliminate_limit::EliminateLimit,
hash_build_probe_order::HashBuildProbeOrder,
},
physical_optimizer::optimizer::PhysicalOptimizerRule, physical_plan::parquet::{BasicMetadataCacheFactory, MetadataCacheFactory},
physical_optimizer::optimizer::PhysicalOptimizerRule,
physical_plan::parquet::{BasicMetadataCacheFactory, MetadataCacheFactory},
};
use log::debug;
use std::fs;
Expand Down Expand Up @@ -326,8 +327,12 @@ impl ExecutionContext {
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
let table = {
let m = self.state.lock().unwrap();
ParquetTable::try_new(filename, m.metadata_cache_factory().clone(), m.config.concurrency)?
.with_enable_pruning(m.config.parquet_pruning)
ParquetTable::try_new(
filename,
m.metadata_cache_factory().clone(),
m.config.concurrency,
)?
.with_enable_pruning(m.config.parquet_pruning)
};
self.register_table(name, Arc::new(table))?;
Ok(())
Expand Down Expand Up @@ -680,7 +685,6 @@ pub struct ExecutionConfig {
parquet_pruning: bool,
}


impl Default for ExecutionConfig {
fn default() -> Self {
Self {
Expand Down
19 changes: 16 additions & 3 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::{error::{DataFusionError, Result}, physical_plan::parquet::MetadataCacheFactory};
use crate::{datasource::TableProvider, logical_plan::plan::ToStringifiedPlan};
use crate::{
datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable},
prelude::CsvReadOptions,
};
use crate::{
error::{DataFusionError, Result},
physical_plan::parquet::MetadataCacheFactory,
};

use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
Expand Down Expand Up @@ -145,7 +148,13 @@ impl LogicalPlanBuilder {
max_concurrency: usize,
) -> Result<Self> {
let path = path.into();
Self::scan_parquet_with_name(path.clone(), metadata_cache_factory, projection, max_concurrency, path)
Self::scan_parquet_with_name(
path.clone(),
metadata_cache_factory,
projection,
max_concurrency,
path,
)
}

/// Scan a Parquet data source and register it with a given table name
Expand All @@ -156,7 +165,11 @@ impl LogicalPlanBuilder {
max_concurrency: usize,
table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, metadata_cache_factory, max_concurrency)?);
let provider = Arc::new(ParquetTable::try_new(
path,
metadata_cache_factory,
max_concurrency,
)?);
Self::scan(table_name, provider, projection)
}

Expand Down
20 changes: 14 additions & 6 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ pub trait MetadataCacheFactory: Sync + Send {
/// Makes a noop cache (which doesn't cache)
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache>;
/// Makes an LRU-based cache.
fn make_lru_cache(&self, max_capacity: u64, time_to_idle: Duration) -> Arc<dyn ParquetMetadataCache>;
fn make_lru_cache(
&self,
max_capacity: u64,
time_to_idle: Duration,
) -> Arc<dyn ParquetMetadataCache>;
/// Modifies and builds writer properties.
fn build_writer_props(&self, builder: WriterPropertiesBuilder) -> WriterProperties {
builder.build()
Expand Down Expand Up @@ -212,24 +216,28 @@ impl ParquetMetadataCache for LruParquetMetadataCache {
}

/// Constructs regular Noop or Lru MetadataCacheFactory objects.
pub struct BasicMetadataCacheFactory {
}
pub struct BasicMetadataCacheFactory {}

impl BasicMetadataCacheFactory {
/// Constructor
pub fn new() -> BasicMetadataCacheFactory { BasicMetadataCacheFactory{} }
pub fn new() -> BasicMetadataCacheFactory {
BasicMetadataCacheFactory {}
}
}

impl MetadataCacheFactory for BasicMetadataCacheFactory {
fn make_noop_cache(&self) -> Arc<dyn ParquetMetadataCache> {
NoopParquetMetadataCache::new()
}
fn make_lru_cache(&self, max_capacity: u64, time_to_idle: Duration) -> Arc<dyn ParquetMetadataCache> {
fn make_lru_cache(
&self,
max_capacity: u64,
time_to_idle: Duration,
) -> Arc<dyn ParquetMetadataCache> {
LruParquetMetadataCache::new(max_capacity, time_to_idle)
}
}


impl ParquetExec {
/// Create a new Parquet reader execution plan based on the specified Parquet filename or
/// directory containing Parquet files
Expand Down

0 comments on commit c9cc20a

Please sign in to comment.