From 5b8177d67424a6aed5168258f041125f0158b051 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Sat, 10 Aug 2024 16:40:05 -0700 Subject: [PATCH] adding more situation --- .../combine_partial_final_agg.rs | 6 ++- .../physical_optimizer/update_aggr_exprs.rs | 4 +- datafusion/core/src/physical_planner.rs | 39 +++++++++++-------- datafusion/expr/src/logical_plan/plan.rs | 14 ++++++- datafusion/expr/src/logical_plan/tree_node.rs | 4 ++ .../optimizer/src/common_subexpr_eliminate.rs | 2 + .../src/eliminate_duplicated_expr.rs | 7 +++- .../src/eliminate_group_by_constant.rs | 13 ++++--- .../optimizer/src/optimize_projections/mod.rs | 6 ++- .../physical-plan/src/aggregates/mod.rs | 22 ++++++++++- .../src/aggregates/no_grouping.rs | 25 ++++++++---- .../sqllogictest/test_files/aggregate.slt | 17 ++++++++ 12 files changed, 124 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 843efcc7b0d2..8514369ece8f 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -94,7 +94,11 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.input_schema(), ) .map(|combined_agg| { - combined_agg.with_limit(agg_exec.limit()) + combined_agg + .with_limit(agg_exec.limit()) + .with_is_global_group_by( + agg_exec.is_global_group_by(), + ) }) .ok() .map(Arc::new) diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index f8edf73e3d2a..fb3d635eaf11 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -93,7 +93,9 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { input.equivalence_properties(), )?; - let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_expr); + let aggr_exec = aggr_exec + .with_new_aggr_exprs(aggr_expr) + .with_is_global_group_by(aggr_exec.is_global_group_by); Ok(Transformed::yes(Arc::new(aggr_exec) as _)) } else { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ab0765ac0deb..11d517f085fe 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -662,6 +662,7 @@ impl DefaultPhysicalPlanner { input, group_expr, aggr_expr, + is_global_group_by, .. }) => { // Initially need to perform the aggregate and then merge the partitions @@ -691,14 +692,17 @@ impl DefaultPhysicalPlanner { let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); - let initial_aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - filters.clone(), - input_exec, - physical_input_schema.clone(), - )?); + let initial_aggr = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + filters.clone(), + input_exec, + physical_input_schema.clone(), + )? + .with_is_global_group_by(*is_global_group_by), + ); // update group column indices based on partial aggregate plan evaluation let final_group: Vec> = @@ -732,14 +736,17 @@ impl DefaultPhysicalPlanner { .collect(), ); - Arc::new(AggregateExec::try_new( - next_partition_mode, - final_grouping_set, - updated_aggregates, - filters, - initial_aggr, - physical_input_schema.clone(), - )?) + Arc::new( + AggregateExec::try_new( + next_partition_mode, + final_grouping_set, + updated_aggregates, + filters, + initial_aggr, + physical_input_schema.clone(), + )? + .with_is_global_group_by(*is_global_group_by), + ) } LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c5538d8880a7..17d898c66257 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -25,7 +25,9 @@ use std::sync::Arc; use super::dml::CopyTo; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; -use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; +use crate::expr::{ + is_constant_expression, Placeholder, Sort as SortExpr, WindowFunction, +}; use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; @@ -656,6 +658,7 @@ impl LogicalPlan { group_expr, aggr_expr, schema: _, + is_global_group_by: _, }) => Aggregate::try_new(input, group_expr, aggr_expr) .map(LogicalPlan::Aggregate), LogicalPlan::Sort(_) => Ok(self), @@ -2629,6 +2632,8 @@ pub struct Aggregate { pub aggr_expr: Vec, /// The schema description of the aggregate output pub schema: DFSchemaRef, + /// In order to pass constant group_by parameter to execution plan + pub is_global_group_by: bool, } impl Aggregate { @@ -2693,6 +2698,8 @@ impl Aggregate { new_schema.with_functional_dependencies(aggregate_func_dependencies)?, ); Ok(Self { + is_global_group_by: group_expr.len() == 1 + && is_constant_expression(&group_expr[0]), input, group_expr, aggr_expr, @@ -2714,6 +2721,11 @@ impl Aggregate { pub fn group_expr_len(&self) -> Result { grouping_set_expr_count(&self.group_expr) } + + pub fn with_is_global_group_by(mut self, is_global_group_by: bool) -> Self { + self.is_global_group_by = is_global_group_by; + self + } } /// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`. diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index dbe43128fd38..b77f6794859e 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -114,12 +114,14 @@ impl TreeNode for LogicalPlan { group_expr, aggr_expr, schema, + is_global_group_by, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Aggregate(Aggregate { input, group_expr, aggr_expr, schema, + is_global_group_by, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => rewrite_arc(input, f)? @@ -604,6 +606,7 @@ impl LogicalPlan { group_expr, aggr_expr, schema, + is_global_group_by, }) => map_until_stop_and_collect!( group_expr.into_iter().map_until_stop_and_collect(&mut f), aggr_expr, @@ -615,6 +618,7 @@ impl LogicalPlan { group_expr, aggr_expr, schema, + is_global_group_by, }) }), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 45e5409ae9ac..79749fbc9bf2 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -572,6 +572,7 @@ impl CommonSubexprEliminate { // Since group_expr may have changed, schema may also. Use try_new method. let new_agg = if transformed { Aggregate::try_new(new_input, new_group_expr, new_aggr_expr)? + .with_is_global_group_by(aggregate.is_global_group_by) } else { Aggregate::try_new_with_schema( new_input, @@ -579,6 +580,7 @@ impl CommonSubexprEliminate { new_aggr_expr, orig_schema, )? + .with_is_global_group_by(aggregate.is_global_group_by) }; let new_agg = LogicalPlan::Aggregate(new_agg); diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index e9d091d52b00..f86bb2eb1faf 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -116,8 +116,11 @@ impl OptimizerRule for EliminateDuplicatedExpr { Transformed::no }; - Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr) - .map(|f| transformed(LogicalPlan::Aggregate(f))) + Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr).map(|f| { + transformed(LogicalPlan::Aggregate( + f.with_is_global_group_by(agg.is_global_group_by), + )) + }) } _ => Ok(Transformed::no(plan)), } diff --git a/datafusion/optimizer/src/eliminate_group_by_constant.rs b/datafusion/optimizer/src/eliminate_group_by_constant.rs index 584e33edbe8d..06576500bc99 100644 --- a/datafusion/optimizer/src/eliminate_group_by_constant.rs +++ b/datafusion/optimizer/src/eliminate_group_by_constant.rs @@ -64,11 +64,14 @@ impl OptimizerRule for EliminateGroupByConstant { return Ok(Transformed::no(LogicalPlan::Aggregate(aggregate))); } - let simplified_aggregate = LogicalPlan::Aggregate(Aggregate::try_new( - aggregate.input, - nonconst_group_expr.into_iter().cloned().collect(), - aggregate.aggr_expr.clone(), - )?); + let simplified_aggregate = LogicalPlan::Aggregate( + Aggregate::try_new( + aggregate.input, + nonconst_group_expr.into_iter().cloned().collect(), + aggregate.aggr_expr.clone(), + )? + .with_is_global_group_by(aggregate.is_global_group_by), + ); let projection_expr = aggregate.group_expr.into_iter().chain(aggregate.aggr_expr); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index ac4ed87a4a1a..705409edb2cd 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -200,7 +200,11 @@ fn optimize_projections( new_group_bys, new_aggr_expr, ) - .map(LogicalPlan::Aggregate) + .map(|f| { + LogicalPlan::Aggregate( + f.with_is_global_group_by(aggregate.is_global_group_by), + ) + }) }); } LogicalPlan::Window(window) => { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d72da9b30049..bb9ff0f2087d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -274,6 +274,12 @@ pub struct AggregateExec { /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, cache: PlanProperties, + + /// use this parameter for situation like group by true, since the group + /// expr is optimized to None in the optimizer and we could not tell if a + /// AggregateExec has group by or not, which influence the final result set + /// (whether return a null or a empty set) see + pub is_global_group_by: bool, } impl AggregateExec { @@ -295,6 +301,7 @@ impl AggregateExec { input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), + is_global_group_by: false, } } @@ -427,6 +434,7 @@ impl AggregateExec { limit: None, input_order_mode, cache, + is_global_group_by: false, }) } @@ -435,6 +443,12 @@ impl AggregateExec { &self.mode } + // is global_group_by + pub fn with_is_global_group_by(mut self, is_global_group_by: bool) -> Self { + self.is_global_group_by = is_global_group_by; + self + } + /// Set the `limit` of this AggExec pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; @@ -475,6 +489,11 @@ impl AggregateExec { self.limit } + /// whether it is a global group by exec + pub fn is_global_group_by(&self) -> bool { + self.is_global_group_by + } + fn execute_typed( &self, partition: usize, @@ -718,7 +737,8 @@ impl ExecutionPlan for AggregateExec { Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), - )?; + )? + .with_is_global_group_by(self.is_global_group_by); me.limit = self.limit; Ok(Arc::new(me)) diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 99417e4ee3e9..e991dfe8dfbf 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -74,6 +74,8 @@ impl AggregateStream { let agg_schema = Arc::clone(&agg.schema); let agg_filter_expr = agg.filter_expr.clone(); + let is_global_group_by = agg.is_global_group_by; + let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(&context))?; @@ -102,7 +104,7 @@ impl AggregateStream { reservation, finished: false, }; - let stream = futures::stream::unfold(inner, |mut this| async move { + let stream = futures::stream::unfold(inner, move |mut this| async move { if this.finished { return None; } @@ -140,11 +142,21 @@ impl AggregateStream { let result = finalize_aggregation(&mut this.accumulators, &this.mode) .and_then(|columns| { - RecordBatch::try_new( - Arc::clone(&this.schema), - columns, - ) - .map_err(Into::into) + //println!("null count is {:?} \n columns[0].len() is {:?} \n columns len is {:?}", columns[0].null_count(), columns[0].len(), columns.len()); + if columns[0].null_count() == columns[0].len() + && columns.len() == 1 + && is_global_group_by + { + Ok(RecordBatch::new_empty(Arc::clone( + &this.schema, + ))) + } else { + RecordBatch::try_new( + Arc::clone(&this.schema), + columns, + ) + .map_err(Into::into) + } }) .record_output(&this.baseline_metrics); @@ -153,7 +165,6 @@ impl AggregateStream { result } }; - this.finished = true; return Some((result, this)); } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 1e36fd05bb8e..f9f21a383ead 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5581,5 +5581,22 @@ EXPLAIN SELECT AVG(v1) FROM t1 GROUP BY 1 + 1 = -1 having 1 + 1 = -1; logical_plan EmptyRelation physical_plan EmptyExec +query R +SELECT AVG(v1) FROM t1 GROUP BY false having true; +---- + +query I +SELECT SUM(v1) FROM t1 GROUP BY false having 1 = 1; +---- + + +statement ok +insert into t1 values(1); + +query I +SELECT SUM(v1) FROM t1 GROUP BY false having 1 = 1; +---- +1 + statement ok drop table t1;