Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSSION]: Inconsistent Behavior Between prefer_existing_sort and AggregateExec's required_input_ordering #14231

Open
mertak-synnada opened this issue Jan 22, 2025 · 6 comments

Comments

@mertak-synnada
Copy link
Contributor

I noticed an inconsistency in how the optimizer handles ordering in certain scenarios, particularly involving the prefer_existing_sort configuration and the creation behavior of AggregateExec.

1. Background on prefer_existing_sort

The prefer_existing_sort configuration, part of the enforce_distribution optimizer rule, determines whether the optimizer should use an order-preserving RepartitionExec or a non-order-preserving one. If order needs to be satisfied above the RepartitionExec, a SortExec is added.

if (!ordering_satisfied || !order_preserving_variants_desirable)
&& child.data
{
child = replace_order_preserving_variants(child)?;
// If ordering requirements were satisfied before repartitioning,
// make sure ordering requirements are still satisfied after.
if ordering_satisfied {
// Make sure to satisfy ordering requirement:
child = add_sort_above_with_check(
child,
required_input_ordering.clone(),
None,
);
}
}

2. Creation Behavior of AggregateExec

AggregateExec sets its required_input_ordering based solely on its group-by expressions without checking any configuration like prefer_existing_sort. This effectively makes the ordering a hard requirement.

let indices = get_ordered_partition_by_indices(&groupby_exprs, &input);
let mut new_requirement = LexRequirement::new(
indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>(),
);
let req = get_finer_aggregate_exprs_requirement(
&mut aggr_expr,
&group_by,
input_eq_properties,
&mode,
)?;
new_requirement.inner.extend(req);
new_requirement = new_requirement.collapse();

3. The issue

When these two behaviors interact, if the order is being preserved below a RepartitionExec and above the RepartitionExec if there's an AggregateExec, the optimizer decides to add a SortExec, no matter what prefer_existing_sort is set (because now it's a hard requirement).

AggregateExec: mode=FinalPartitioned, ...
  SortExec: ..., preserve_partitioning=[true]
    RepartitionExec: ....
      CsvExec: ...

While AggregateExec benefits from receiving ordered input, adding a SortExec in this context can incur a significant performance cost, negating any benefits of preserving the order.

4. Possible solutions:

A straightforward approach could involve AggregateExec respecting the prefer_existing_sort configuration before adding ordering requirements. However, this introduces challenges:

The prefer_existing_sort setting exists at the optimizer level and injecting it into AggregateExec may lead to poor design. Also evaluating this configuration at runtime feels conceptually incorrect.

Given these challenges, I wanted to open a discussion on alternative solutions or design approaches to address this behavior.

Looking forward to hearing the community's thoughts on this!

@mertak-synnada mertak-synnada changed the title Inconsistent Behavior Between prefer_existing_sort and AggregateExec's required_input_ordering [DISCUSSION]: Inconsistent Behavior Between prefer_existing_sort and AggregateExec's required_input_ordering Jan 23, 2025
@ozankabak
Copy link
Contributor

@alamb, before we make an attempt into this, do you have any thoughts you can share?

@alamb
Copy link
Contributor

alamb commented Jan 23, 2025

The description of the issue on this ticket makes sense to me

AggregateExec sets its required_input_ordering based solely on its group-by expressions without checking any configuration like prefer_existing_sort. This effectively makes the ordering a hard requirement.

Yes I agree this is non ideal

While AggregateExec benefits from receiving ordered input, adding a SortExec in this context can incur a significant performance cost, negating any benefits of preserving the order.

100% agree

A straightforward approach could involve AggregateExec respecting the prefer_existing_sort configuration before adding ordering requirements. However, this introduces challenges:

Agree. Looking at the code, it seems to me like input_order_mode and sort_requirement are inter-related concepts but that is not currently reflected in the code

let input_order_mode = if indices.len() == groupby_exprs.len()
&& !indices.is_empty()
&& group_by.groups.len() == 1
{
InputOrderMode::Sorted
} else if !indices.is_empty() {
InputOrderMode::PartiallySorted(indices)
} else {
InputOrderMode::Linear
};

What about the following:

  1. If the input is already sorted by the appropriate group keys, set input_order_mode appropriately and set the required input sort order (to match the existing sort order)
  2. If the input is not already sorted on the appropriate group keys, then set input_order_mode to InputOrderMode::Linear and set required_sort_expression to None (aka don't resort the data)

This would mean:

  1. The optimizer can take advantage of existing sort orders
  2. Will not resort input if it isn't already sorted

The downside is that the optimizer would not resort data even when it might be beneficial (e.g. if the data could be sorted cheaply with a prefix and then use a smaller hash table). However making that optimization work requires a choice between partial sort + ordered group by is better than a hash group by.

Our optimizer today has no cost model / framework to evaluate such tradeoffs (and there are known challenges with cost estimate based optimizers anyways)

@ozankabak
Copy link
Contributor

Maybe we can create an API like beneficial_input_ordering to complement required_input_ordering, and optimizer rule(s) can check that information and combine it with user flags such as prefer_existing_sort when deciding to add (or not add) SortExecs?

@alamb
Copy link
Contributor

alamb commented Jan 24, 2025

My preference is to try and keep the core of datafusion focused on executing the plans as provided as much as possible, and performing "always good optimizations"

For optimizations where there is some tradeoff (like choosing between sorting for sort merge join or hashing, for example) I strongly suggest we keep as much of that out of the core as possible (and use user defined passes instead).

The rationale is that when tradeoffs are present, no particular choice will be ideal for all usecase (hence why we already have prefer_existing_sort). I can imagine some systems that want to prioritize plans that require less memory but more compute, as well as other systems that would prefer maximum performance even if it takes more memory, etc

If we make the optimizer passes in the core of datafusion have baked in tradeoffs/heuristics I think it will just get more and more complicated as people try to change how the tradeoffs work

I feel strongly enough about this to help with the project

@ozankabak
Copy link
Contributor

I agree with this. Therefore I am trying to think of an API that will expose the information necessary for downstream rules/projects/users to use them if they want, but we won't have to make decisions for them unless they are of the "always good" kind.

IIRC, we have the "benefits_from" kind of API for partitioning. I wonder if doing the same makes sense here too. I will think more about this.

@ozankabak
Copy link
Contributor

I feel strongly enough about this to help with the project

Great, let's collaborate on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants