Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge SortMergeJoin filtered batches into larger batches #14160

Merged
merged 4 commits into from
Jan 22, 2025

Conversation

comphead
Copy link
Contributor

Which issue does this PR close?

Closes #14050.

Rationale for this change

Filtered SortMergeJoin outputs the data after left row shift which is not performant, merging batches into bigger chunks close to batch_size

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Jan 17, 2025
@comphead comphead marked this pull request as draft January 17, 2025 01:17
@comphead comphead marked this pull request as ready for review January 17, 2025 18:36
@comphead comphead changed the title Merge SortMergeJoin filtered batches into bigger batches Merge SortMergeJoin filtered batches into larger batches Jan 17, 2025
@ozankabak
Copy link
Contributor

Can we use the BatchCoalescer and BatchSplitter objects in SMJ as well? AFAICT other joins use such common mechanisms

@comphead
Copy link
Contributor Author

BatchCoalescer

Thanks @ozankabak I'll check it out

@comphead
Copy link
Contributor Author

comphead commented Jan 19, 2025

Tbh, I was not able to find BatchCoalescer in joins, the closest was CoalesceBatchesExec in bunch of physical plan nodes including sort_preserving_merge.rs 🤔 But it will make the code cleaner for sure!

Appreciate if you can point me how it is being used in the joins to have join code base consistent.
WDYT to go forward with this PR to close the bug and I create another ticket to improve the codebase using BatchCoalescer ?

@berkaysynnada
Copy link
Contributor

BatchCoalescer is not used in joins yet, since CoalesceBatchesExec appears after the joins having filter, in case of the output batches might have a lower row count than target batch size. So, why cannot we follow the same pattern in SMJ? If collecting batches in the join itself is more performant, then we should also refactor the other joins as well?

On the other hand, BatchSplitter is used in other joins, and SMJ could (should) have it too, as there is no other way of splitting the batches according to target batch size.

@comphead
Copy link
Contributor Author

BatchCoalescer

Thanks @berkaysynnada for your feedback, if I got you right, you prefer to call the CoalesceBatchesExec just AFTER the SortMergeJoinExec called in physical planner like for other join types?

I checked some tests in joins.rs and looks like the CoalesceBatches called before the join

    let expected = {
        [
            "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
        ]
    };

perhaps I'm missing something?

@berkaysynnada
Copy link
Contributor

BatchCoalescer

Thanks @berkaysynnada for your feedback, if I got you right, you prefer to call the CoalesceBatchesExec just AFTER the SortMergeJoinExec called in physical planner like for other join types?

I checked some tests in joins.rs and looks like the CoalesceBatches called before the join

    let expected = {
        [
            "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
        ]
    };

perhaps I'm missing something?

CoalesceBatches' in your example exist because of hash repartitions (CoalesceBatches rule adds a CoalesceBatchesExec on top of FilterExec, HashJoinExec, and hash-repartition).

I've thought about this, and I believe the most optimal solution is to make all join operators capable of performing both coalescing and splitting in a built-in manner. This is because the output of a join can either be smaller or larger than the target batch size. Ideally, there should be no need (or only minimal need) for CoalesceBatchesExec.

To achieve this built-in coalescing and splitting, we can leverage existing tools like BatchSplitter and BatchCoalescer (although there are no current examples of BatchCoalescer being used in joins). My suggestion is to generalize these tools so they can be utilized by any operator and applied wherever this mechanism is needed. As this pattern becomes more common, it will be easier to expand its usage and simplify its application.

@comphead
Copy link
Contributor Author

BatchCoalescer

Thanks @berkaysynnada for your feedback, if I got you right, you prefer to call the CoalesceBatchesExec just AFTER the SortMergeJoinExec called in physical planner like for other join types?
I checked some tests in joins.rs and looks like the CoalesceBatches called before the join

    let expected = {
        [
            "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
            "  CoalesceBatchesExec: target_batch_size=8192",
            "    RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
            "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
            // "     CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
        ]
    };

perhaps I'm missing something?

CoalesceBatches' in your example exist because of hash repartitions (CoalesceBatches rule adds a CoalesceBatchesExec on top of FilterExec, HashJoinExec, and hash-repartition).

I've thought about this, and I believe the most optimal solution is to make all join operators capable of performing both coalescing and splitting in a built-in manner. This is because the output of a join can either be smaller or larger than the target batch size. Ideally, there should be no need (or only minimal need) for CoalesceBatchesExec.

To achieve this built-in coalescing and splitting, we can leverage existing tools like BatchSplitter and BatchCoalescer (although there are no current examples of BatchCoalescer being used in joins). My suggestion is to generalize these tools so they can be utilized by any operator and applied wherever this mechanism is needed. As this pattern becomes more common, it will be easier to expand its usage and simplify its application.

Thanks @berkaysynnada. Builtin options probably can be implemented with the sending a BatchCoalescer into the join instead of writing the custom code like in this implementation.

WDYT if we merge this PR to fix the bug for now and I start a discussion to unify coalesce/split approaches for the joins?

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada. Builtin options probably can be implemented with the > sending a BatchCoalescer into the join instead of writing the custom code like in this > implementation.

WDYT if we merge this PR to fix the bug for now and I start a discussion to unify > coalesce/split approaches for the joins?

Opening an issue for this sounds good. I've taken a look to the changes and LGTM. Are you planning to add some tests to avoid someone breaking this coalescing behavior?

@comphead
Copy link
Contributor Author

Filed #14238

@comphead
Copy link
Contributor Author

Filed #14239 for tests, thanks @berkaysynnada for the review

@comphead comphead merged commit 0ba6e70 into apache:main Jan 22, 2025
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize filtered SortMergeJoin to avoid producing small/empty batches
3 participants