Skip to content

Commit

Permalink
adding more situation
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Aug 10, 2024
1 parent 0cebec7 commit 5b8177d
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_agg_exec.input_schema(),
)
.map(|combined_agg| {
combined_agg.with_limit(agg_exec.limit())
combined_agg
.with_limit(agg_exec.limit())
.with_is_global_group_by(
agg_exec.is_global_group_by(),
)
})
.ok()
.map(Arc::new)
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
input.equivalence_properties(),
)?;

let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_expr);
let aggr_exec = aggr_exec
.with_new_aggr_exprs(aggr_expr)
.with_is_global_group_by(aggr_exec.is_global_group_by);

Ok(Transformed::yes(Arc::new(aggr_exec) as _))
} else {
Expand Down
39 changes: 23 additions & 16 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ impl DefaultPhysicalPlanner {
input,
group_expr,
aggr_expr,
is_global_group_by,
..
}) => {
// Initially need to perform the aggregate and then merge the partitions
Expand Down Expand Up @@ -691,14 +692,17 @@ impl DefaultPhysicalPlanner {
let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) =
multiunzip(agg_filter);

let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
input_exec,
physical_input_schema.clone(),
)?);
let initial_aggr = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
input_exec,
physical_input_schema.clone(),
)?
.with_is_global_group_by(*is_global_group_by),
);

// update group column indices based on partial aggregate plan evaluation
let final_group: Vec<Arc<dyn PhysicalExpr>> =
Expand Down Expand Up @@ -732,14 +736,17 @@ impl DefaultPhysicalPlanner {
.collect(),
);

Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
physical_input_schema.clone(),
)?)
Arc::new(
AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
physical_input_schema.clone(),
)?
.with_is_global_group_by(*is_global_group_by),
)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => self
.create_project_physical_exec(
Expand Down
14 changes: 13 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr::{
is_constant_expression, Placeholder, Sort as SortExpr, WindowFunction,
};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
Expand Down Expand Up @@ -656,6 +658,7 @@ impl LogicalPlan {
group_expr,
aggr_expr,
schema: _,
is_global_group_by: _,
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
Expand Down Expand Up @@ -2629,6 +2632,8 @@ pub struct Aggregate {
pub aggr_expr: Vec<Expr>,
/// The schema description of the aggregate output
pub schema: DFSchemaRef,
/// In order to pass constant group_by parameter to execution plan
pub is_global_group_by: bool,
}

impl Aggregate {
Expand Down Expand Up @@ -2693,6 +2698,8 @@ impl Aggregate {
new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
);
Ok(Self {
is_global_group_by: group_expr.len() == 1
&& is_constant_expression(&group_expr[0]),
input,
group_expr,
aggr_expr,
Expand All @@ -2714,6 +2721,11 @@ impl Aggregate {
pub fn group_expr_len(&self) -> Result<usize> {
grouping_set_expr_count(&self.group_expr)
}

pub fn with_is_global_group_by(mut self, is_global_group_by: bool) -> Self {
self.is_global_group_by = is_global_group_by;
self
}
}

/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ impl TreeNode for LogicalPlan {
group_expr,
aggr_expr,
schema,
is_global_group_by,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
schema,
is_global_group_by,
})
}),
LogicalPlan::Sort(Sort { expr, input, fetch }) => rewrite_arc(input, f)?
Expand Down Expand Up @@ -604,6 +606,7 @@ impl LogicalPlan {
group_expr,
aggr_expr,
schema,
is_global_group_by,
}) => map_until_stop_and_collect!(
group_expr.into_iter().map_until_stop_and_collect(&mut f),
aggr_expr,
Expand All @@ -615,6 +618,7 @@ impl LogicalPlan {
group_expr,
aggr_expr,
schema,
is_global_group_by,
})
}),

Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,15 @@ impl CommonSubexprEliminate {
// Since group_expr may have changed, schema may also. Use try_new method.
let new_agg = if transformed {
Aggregate::try_new(new_input, new_group_expr, new_aggr_expr)?
.with_is_global_group_by(aggregate.is_global_group_by)
} else {
Aggregate::try_new_with_schema(
new_input,
new_group_expr,
new_aggr_expr,
orig_schema,
)?
.with_is_global_group_by(aggregate.is_global_group_by)
};
let new_agg = LogicalPlan::Aggregate(new_agg);

Expand Down
7 changes: 5 additions & 2 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ impl OptimizerRule for EliminateDuplicatedExpr {
Transformed::no
};

Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
.map(|f| transformed(LogicalPlan::Aggregate(f)))
Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr).map(|f| {
transformed(LogicalPlan::Aggregate(
f.with_is_global_group_by(agg.is_global_group_by),
))
})
}
_ => Ok(Transformed::no(plan)),
}
Expand Down
13 changes: 8 additions & 5 deletions datafusion/optimizer/src/eliminate_group_by_constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ impl OptimizerRule for EliminateGroupByConstant {
return Ok(Transformed::no(LogicalPlan::Aggregate(aggregate)));
}

let simplified_aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
aggregate.input,
nonconst_group_expr.into_iter().cloned().collect(),
aggregate.aggr_expr.clone(),
)?);
let simplified_aggregate = LogicalPlan::Aggregate(
Aggregate::try_new(
aggregate.input,
nonconst_group_expr.into_iter().cloned().collect(),
aggregate.aggr_expr.clone(),
)?
.with_is_global_group_by(aggregate.is_global_group_by),
);

let projection_expr =
aggregate.group_expr.into_iter().chain(aggregate.aggr_expr);
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ fn optimize_projections(
new_group_bys,
new_aggr_expr,
)
.map(LogicalPlan::Aggregate)
.map(|f| {
LogicalPlan::Aggregate(
f.with_is_global_group_by(aggregate.is_global_group_by),
)
})
});
}
LogicalPlan::Window(window) => {
Expand Down
22 changes: 21 additions & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ pub struct AggregateExec {
/// Describes how the input is ordered relative to the group by columns
input_order_mode: InputOrderMode,
cache: PlanProperties,

/// use this parameter for situation like group by true, since the group
/// expr is optimized to None in the optimizer and we could not tell if a
/// AggregateExec has group by or not, which influence the final result set
/// (whether return a null or a empty set) see <https://github.com/apache/datafusion/issues/11748>
pub is_global_group_by: bool,
}

impl AggregateExec {
Expand All @@ -295,6 +301,7 @@ impl AggregateExec {
input: Arc::clone(&self.input),
schema: Arc::clone(&self.schema),
input_schema: Arc::clone(&self.input_schema),
is_global_group_by: false,
}
}

Expand Down Expand Up @@ -427,6 +434,7 @@ impl AggregateExec {
limit: None,
input_order_mode,
cache,
is_global_group_by: false,
})
}

Expand All @@ -435,6 +443,12 @@ impl AggregateExec {
&self.mode
}

// is global_group_by
pub fn with_is_global_group_by(mut self, is_global_group_by: bool) -> Self {
self.is_global_group_by = is_global_group_by;
self
}

/// Set the `limit` of this AggExec
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
Expand Down Expand Up @@ -475,6 +489,11 @@ impl AggregateExec {
self.limit
}

/// whether it is a global group by exec
pub fn is_global_group_by(&self) -> bool {
self.is_global_group_by
}

fn execute_typed(
&self,
partition: usize,
Expand Down Expand Up @@ -718,7 +737,8 @@ impl ExecutionPlan for AggregateExec {
Arc::clone(&children[0]),
Arc::clone(&self.input_schema),
Arc::clone(&self.schema),
)?;
)?
.with_is_global_group_by(self.is_global_group_by);
me.limit = self.limit;

Ok(Arc::new(me))
Expand Down
25 changes: 18 additions & 7 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ impl AggregateStream {
let agg_schema = Arc::clone(&agg.schema);
let agg_filter_expr = agg.filter_expr.clone();

let is_global_group_by = agg.is_global_group_by;

let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;

Expand Down Expand Up @@ -102,7 +104,7 @@ impl AggregateStream {
reservation,
finished: false,
};
let stream = futures::stream::unfold(inner, |mut this| async move {
let stream = futures::stream::unfold(inner, move |mut this| async move {
if this.finished {
return None;
}
Expand Down Expand Up @@ -140,11 +142,21 @@ impl AggregateStream {
let result =
finalize_aggregation(&mut this.accumulators, &this.mode)
.and_then(|columns| {
RecordBatch::try_new(
Arc::clone(&this.schema),
columns,
)
.map_err(Into::into)
//println!("null count is {:?} \n columns[0].len() is {:?} \n columns len is {:?}", columns[0].null_count(), columns[0].len(), columns.len());
if columns[0].null_count() == columns[0].len()
&& columns.len() == 1
&& is_global_group_by
{
Ok(RecordBatch::new_empty(Arc::clone(
&this.schema,
)))
} else {
RecordBatch::try_new(
Arc::clone(&this.schema),
columns,
)
.map_err(Into::into)
}
})
.record_output(&this.baseline_metrics);

Expand All @@ -153,7 +165,6 @@ impl AggregateStream {
result
}
};

this.finished = true;
return Some((result, this));
}
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5581,5 +5581,22 @@ EXPLAIN SELECT AVG(v1) FROM t1 GROUP BY 1 + 1 = -1 having 1 + 1 = -1;
logical_plan EmptyRelation
physical_plan EmptyExec

query R
SELECT AVG(v1) FROM t1 GROUP BY false having true;
----

query I
SELECT SUM(v1) FROM t1 GROUP BY false having 1 = 1;
----


statement ok
insert into t1 values(1);

query I
SELECT SUM(v1) FROM t1 GROUP BY false having 1 = 1;
----
1

statement ok
drop table t1;

0 comments on commit 5b8177d

Please sign in to comment.