Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add value_from_statisics to AggregateUDFImpl, remove special case for min/max/count aggregate statistics #12296

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ impl AggregateUDF {
self.inner.is_descending()
}

/// Returns true if the function is min. Used by the optimizer
pub fn is_min(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not we should do...
We need to understand the context and have a general function instead of a specialize name matching function.

default_value is a good example, it is only used in count for now, but it could extend to any function if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayzhan211 looking to this comment from @alamb #11151 (comment) it might be that those "general functions" might not exists, will do some homework

Copy link
Contributor

@jayzhan211 jayzhan211 Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is really no "general context", I think it is fine to just leave them as it is. Having specific function name matching in Impl Trait doesn't make sense to me 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jayzhan211 that it is important that these methods describe some property about the function rather than "what the function is"

What if we added a function like this:

/// Return the value of this aggregate function from statistics
///
/// If the value of this aggregate, can be determined using only the 
/// statistics, return `Some(value)`, otherwise return `None` (the default)
///
/// # Arguments
/// * `statistics`: the statistics describing the input to this aggregate functions
/// * `args`: the arguments passed to the aggregate function 
///
/// The value of some aggregate functions such as `COUNT`, `MIN` and `MAX` 
/// can be determined using statistics, if known
///
fn value_from_stats(&self, statistics: &Statistics, arguments: &[Arc<PhysicalExpr>]) -> Option<ScalarValue> { None }

I think you could then implement this function for min / max and count (moving logic out of the aggregate statistics optimizer). It might need some other information (like schema for types, for example) but I think it would be pretty straight forward

self.inner.is_min()
}

/// Returns true if the function is max. Used by the optimizer
pub fn is_max(&self) -> bool {
self.inner.is_max()
}
/// Returns true if the function is count. Used by the optimizer
pub fn is_count(&self) -> bool {
self.inner.is_count()
}
/// See [`AggregateUDFImpl::default_value`] for more details.
pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
self.inner.default_value(data_type)
Expand Down Expand Up @@ -575,6 +588,19 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
None
}

// Returns true if the function is min. Used by the optimizer
fn is_min(&self) -> bool {
false
}
// Returns true if the function is max. Used by the optimizer
fn is_max(&self) -> bool {
false
}
// Returns true if the function is count. Used by the optimizer
fn is_count(&self) -> bool {
false
}

/// Returns default value of the function given the input is all `null`.
///
/// Most of the aggregate function return Null if input is Null,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ impl AggregateUDFImpl for Count {
fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(0)))
}

fn is_count(&self) -> bool {
true
}
}

#[derive(Debug)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions-aggregate/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ impl AggregateUDFImpl for Max {
fn is_descending(&self) -> Option<bool> {
Some(true)
}

fn is_max(&self) -> bool {
true
}
fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
}
Expand Down Expand Up @@ -1052,6 +1056,10 @@ impl AggregateUDFImpl for Min {
Some(false)
}

fn is_min(&self) -> bool {
true
}

fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
}
Expand Down
39 changes: 5 additions & 34 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn take_optimizable_column_and_table_count(
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
let col_stats = &stats.column_statistics;
if is_non_distinct_count(agg_expr) {
if agg_expr.fun().is_count() && !agg_expr.is_distinct() {
if let Precision::Exact(num_rows) = stats.num_rows {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
Expand Down Expand Up @@ -181,7 +181,7 @@ fn take_optimizable_min(
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if is_min(agg_expr) {
if agg_expr.fun().is_min() {
if let Ok(min_data_type) =
ScalarValue::try_from(agg_expr.field().data_type())
{
Expand All @@ -191,7 +191,7 @@ fn take_optimizable_min(
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if is_min(agg_expr) {
if agg_expr.fun().is_min() {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
Expand Down Expand Up @@ -227,7 +227,7 @@ fn take_optimizable_max(
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if is_max(agg_expr) {
if agg_expr.fun().is_max() {
if let Ok(max_data_type) =
ScalarValue::try_from(agg_expr.field().data_type())
{
Expand All @@ -237,7 +237,7 @@ fn take_optimizable_max(
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if is_max(agg_expr) {
if agg_expr.fun().is_max() {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
Expand All @@ -263,32 +263,3 @@ fn take_optimizable_max(
}
None
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool {
if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {
return true;
}
false
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_min(agg_expr: &AggregateFunctionExpr) -> bool {
if agg_expr.fun().name().to_lowercase() == "min" {
return true;
}
false
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_max(agg_expr: &AggregateFunctionExpr) -> bool {
if agg_expr.fun().name().to_lowercase() == "max" {
return true;
}
false
}

// See tests in datafusion/core/tests/physical_optimizer