Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/docs_on_slt
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 5, 2024
2 parents a813fe1 + 7c6f891 commit b7107b9
Show file tree
Hide file tree
Showing 117 changed files with 907 additions and 504 deletions.
39 changes: 18 additions & 21 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -70,31 +70,28 @@ impl RunOpt {
let sort_cases = vec![
(
"sort utf8",
vec![PhysicalSortExpr {
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort int",
vec![PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort decimal",
vec![
// sort decimal
PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
}]),
),
(
"sort integer tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
Expand All @@ -103,11 +100,11 @@ impl RunOpt {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort utf8 tuple",
vec![
LexOrdering::new(vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
Expand All @@ -125,11 +122,11 @@ impl RunOpt {
expr: col("image", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort mixed tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
Expand All @@ -142,7 +139,7 @@ impl RunOpt {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
]),
),
];
for (title, expr) in sort_cases {
Expand Down Expand Up @@ -170,13 +167,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: LexOrderingRef<'_>,
expr: &LexOrdering,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
let exec = Arc::new(SortExec::new(expr.clone(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
33 changes: 17 additions & 16 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be executed
FinalPhysicalPlanWithSchema,
/// An error creating the physical plan
PhysicalPlanError,
}

impl Display for PlanType {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
Expand Down Expand Up @@ -118,7 +121,9 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
PlanType::FinalLogicalPlan
| PlanType::FinalPhysicalPlan
| PlanType::PhysicalPlanError => true,
_ => verbose_mode,
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
//! FunctionalDependencies keeps track of functional dependencies
//! inside DFSchema.

use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, JoinType};
use crate::{DFSchema, HashSet, JoinType};

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub use functional_dependencies::{
get_target_functional_dependencies, Constraint, Constraints, Dependency,
FunctionalDependence, FunctionalDependencies,
};
use hashbrown::hash_map::DefaultHashBuilder;
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
Expand All @@ -87,6 +88,10 @@ pub use error::{
_substrait_datafusion_err,
};

// The HashMap and HashSet implementations that should be used as the uniform defaults
pub type HashMap<K, V, S = DefaultHashBuilder> = hashbrown::HashMap<K, V, S>;
pub type HashSet<T, S = DefaultHashBuilder> = hashbrown::HashSet<T, S>;

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
///
Expand Down
78 changes: 73 additions & 5 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,35 @@ impl Statistics {
return self;
};

// todo: it would be nice to avoid cloning column statistics if
// possible (e.g. if the projection did not contain duplicates)
self.column_statistics = projection
.iter()
.map(|&i| self.column_statistics[i].clone())
enum Slot {
/// The column is taken and put into the specified statistics location
Taken(usize),
/// The original columns is present
Present(ColumnStatistics),
}

// Convert to Vec<Slot> so we can avoid copying the statistics
let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
.into_iter()
.map(Slot::Present)
.collect();

for idx in projection {
let next_idx = self.column_statistics.len();
let slot = std::mem::replace(
columns.get_mut(*idx).expect("projection out of bounds"),
Slot::Taken(next_idx),
);
match slot {
// The column was there, so just move it
Slot::Present(col) => self.column_statistics.push(col),
// The column was taken, so copy from the previous location
Slot::Taken(prev_idx) => self
.column_statistics
.push(self.column_statistics[prev_idx].clone()),
}
}

self
}

Expand Down Expand Up @@ -581,4 +603,50 @@ mod tests {
let p2 = precision.clone();
assert_eq!(precision, p2);
}

#[test]
fn test_project_none() {
let projection = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![]));
}

#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![30, 20]));
}

#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}

// Make a Statistics structure with the specified null counts for each column
fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
Statistics {
num_rows: Precision::Exact(42),
total_byte_size: Precision::Exact(500),
column_statistics: counts.into_iter().map(col_stats_i64).collect(),
}
}

fn col_stats_i64(null_count: usize) -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
distinct_count: Precision::Exact(100),
}
}
}
3 changes: 1 addition & 2 deletions datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// under the License.

use datafusion::execution::SessionStateDefaults;
use datafusion_common::{not_impl_err, Result};
use datafusion_common::{not_impl_err, HashSet, Result};
use datafusion_expr::{
aggregate_doc_sections, scalar_doc_sections, window_doc_sections, AggregateUDF,
DocSection, Documentation, ScalarUDF, WindowUDF,
};
use hashbrown::HashSet;
use itertools::Itertools;
use std::env::args;
use std::fmt::Write as _;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::path::Path;
use std::sync::{Arc, Mutex};

use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory};
use crate::execution::context::SessionState;

use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference};
use datafusion_common::{
Constraints, DFSchema, DataFusionError, HashMap, TableReference,
};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down
Loading

0 comments on commit b7107b9

Please sign in to comment.