Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/clarify_infallable
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 5, 2024
2 parents 31b7335 + 7c6f891 commit 7aa374c
Show file tree
Hide file tree
Showing 243 changed files with 4,463 additions and 2,586 deletions.
39 changes: 18 additions & 21 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -70,31 +70,28 @@ impl RunOpt {
let sort_cases = vec![
(
"sort utf8",
vec![PhysicalSortExpr {
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort int",
vec![PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
}],
}]),
),
(
"sort decimal",
vec![
// sort decimal
PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
}]),
),
(
"sort integer tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
Expand All @@ -103,11 +100,11 @@ impl RunOpt {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort utf8 tuple",
vec![
LexOrdering::new(vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
Expand All @@ -125,11 +122,11 @@ impl RunOpt {
expr: col("image", &schema)?,
options: Default::default(),
},
],
]),
),
(
"sort mixed tuple",
vec![
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
Expand All @@ -142,7 +139,7 @@ impl RunOpt {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
]),
),
];
for (title, expr) in sort_cases {
Expand Down Expand Up @@ -170,13 +167,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
expr: &LexOrdering,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let exec = Arc::new(SortExec::new(expr.clone(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
33 changes: 17 additions & 16 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be executed
FinalPhysicalPlanWithSchema,
/// An error creating the physical plan
PhysicalPlanError,
}

impl Display for PlanType {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
Expand Down Expand Up @@ -118,7 +121,9 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
PlanType::FinalLogicalPlan
| PlanType::FinalPhysicalPlan
| PlanType::PhysicalPlanError => true,
_ => verbose_mode,
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
//! FunctionalDependencies keeps track of functional dependencies
//! inside DFSchema.

use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, JoinType};
use crate::{DFSchema, HashSet, JoinType};

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub mod scalar;
pub mod stats;
pub mod test_util;
pub mod tree_node;
pub mod types;
pub mod utils;

/// Reexport arrow crate
Expand All @@ -65,6 +66,7 @@ pub use functional_dependencies::{
get_target_functional_dependencies, Constraint, Constraints, Dependency,
FunctionalDependence, FunctionalDependencies,
};
use hashbrown::hash_map::DefaultHashBuilder;
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
Expand All @@ -86,6 +88,10 @@ pub use error::{
_substrait_datafusion_err,
};

// The HashMap and HashSet implementations that should be used as the uniform defaults
pub type HashMap<K, V, S = DefaultHashBuilder> = hashbrown::HashMap<K, V, S>;
pub type HashSet<T, S = DefaultHashBuilder> = hashbrown::HashSet<T, S>;

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
///
Expand Down
88 changes: 88 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,48 @@ impl Statistics {
self
}

/// Project the statistics to the given column indices.
///
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
return self;
};

enum Slot {
/// The column is taken and put into the specified statistics location
Taken(usize),
/// The original columns is present
Present(ColumnStatistics),
}

// Convert to Vec<Slot> so we can avoid copying the statistics
let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
.into_iter()
.map(Slot::Present)
.collect();

for idx in projection {
let next_idx = self.column_statistics.len();
let slot = std::mem::replace(
columns.get_mut(*idx).expect("projection out of bounds"),
Slot::Taken(next_idx),
);
match slot {
// The column was there, so just move it
Slot::Present(col) => self.column_statistics.push(col),
// The column was taken, so copy from the previous location
Slot::Taken(prev_idx) => self
.column_statistics
.push(self.column_statistics[prev_idx].clone()),
}
}

self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
Expand Down Expand Up @@ -561,4 +603,50 @@ mod tests {
let p2 = precision.clone();
assert_eq!(precision, p2);
}

#[test]
fn test_project_none() {
let projection = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![]));
}

#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![30, 20]));
}

#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}

// Make a Statistics structure with the specified null counts for each column
fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
Statistics {
num_rows: Precision::Exact(42),
total_byte_size: Precision::Exact(500),
column_statistics: counts.into_iter().map(col_stats_i64).collect(),
}
}

fn col_stats_i64(null_count: usize) -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
distinct_count: Precision::Exact(100),
}
}
}
Loading

0 comments on commit 7aa374c

Please sign in to comment.