Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouped Aggregate in row format #2375

Merged
merged 9 commits into from
May 7, 2022
Merged

Grouped Aggregate in row format #2375

merged 9 commits into from
May 7, 2022

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Apr 29, 2022

Which issue does this PR close?

Closes #2452.
Partly fix #2455.

Rationale for this change

Using row format in grouped aggregate has several benefits over the current Vec<ScalarValue>:

  1. compare group keys directly on Vec<u8>
  2. save memory by storing state without datatype information

What changes are included in this PR?

  1. A new Accumulator trait manipulates data in row format, with five most basic accumulators: Max, Min, Sum, Count, Avg.
  2. RowAccessor for fast, in-place Vec<u8> row fields update.
  3. Branching AggregateExec to use row-based group aggregate when applicable.
  4. Make the datafusion-row crate default for datafusion-core and datafusion-physical-expr.

Are there any user-facing changes?

No.

No.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Apr 29, 2022
@andygrove
Copy link
Member

The current PR seems scary in size, maybe I should move the physical_plan folder re-org as a separate PR first.

I think that would help.

Are we replacing HashAggregate completely with a new row based aggregate or do we want to support both? Does hash aggregate still have advantages for some use cases? Maybe we can have a config setting for which one to use?

@yjshen
Copy link
Member Author

yjshen commented Apr 30, 2022

Sorry to mix two things into one PR. I would divide this as separate PRs. One for each of these ideas:

  1. Promote physical-plan/hash_aggregates.rs to a directory, and rename it to aggregates. We already have a hash-based implementation, GroupedHashAggregateStream for aggregate with grouping keys, and a non-hash implementation for aggregate without grouping keys (It's a single record state but named HashAggregateStream although it's not related to Hash at all).
  1. Use row format to store grouping keys and accumulator states when all accumulator states are fixed-sized. Use Vec<ScalarValue> for all other cases (when we have at least one var length accumulator state, or any of the AggregateExprs doesn't support row-based accumulator yet).

Maybe we can have a config setting for which one to use

I think the choice between row-based accumulator states vs Vec<ScalarValue> based accumulator states will depend on row-based accumulator capability during query execution, we are only using row-based aggregate states when we have all its accumulators support. (If and only if we are sure that the row-based version will always outperform Vec<ScalarValue> version whenever applicable, based on benchmark results of course.)

@yjshen yjshen self-assigned this May 1, 2022
@yjshen yjshen marked this pull request as ready for review May 5, 2022 11:19
@yjshen yjshen changed the title WIP: Use row format for aggregate Grouped Aggregate in row format May 5, 2022
@alamb
Copy link
Contributor

alamb commented May 5, 2022

I am starting to check this out -- I'll try to finish today but I may run out of time.

@alamb
Copy link
Contributor

alamb commented May 5, 2022

@yjshen do you have any benchmark numbers you can share?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some superficial comments on the API design -- I hope to dig more into the implementation later today.

This is looking very cool @yjshen

datafusion/core/Cargo.toml Show resolved Hide resolved
datafusion/physical-expr/src/aggregate/accumulator_v2.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/hash_utils.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/aggregate/mod.rs Show resolved Hide resolved
datafusion/physical-expr/src/aggregate/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this code, and all in all I think it is great. Nice work @yjshen 🏆

I would like to see the following two things prior to approving this PR:

  • See some sort of performance benchmarks showing this is faster than master
  • Try this change against the IOx test suite

I will plan to try against IOx tomorrow.

Concern about Code Duplication

I am somewhat concerned that this PR ends up with a parallel implementation of GroupByHash as well as some of the aggregates.

This approach is fairly nice because it is backwards compatible and thus this PR allows us to make make incremental progress 👍

However, I worry that we now have two slightly different implementations which will diverge over time and we will be stuck with these these two forms forever if we don't have the discipline to complete the transition. This would make the codebase more complicated and harder to work with over time.

Perhaps I can make this less concerning by enumerating what work remains to entirely switch to RowAggregate (and remove AggregateStream entirely).

None => {
this.finished = true;
let timer = this.baseline_metrics.elapsed_compute().timer();
let result = create_batch_from_map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code effectively makes one massive output record batch -- I think it is also what GroupedHashAggregateStream does but it would be better in my opinion to stream the output (aka respect the batch_size configuration. Maybe we can file a ticket to do so

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan to do this in #1570 as my next step.

@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
}
}

#[derive(Debug)]
struct SumAccumulatorV2 {
index: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to document what this is an index into -- aka document what the parameters are

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new method state_index(&self) -> usize and explained the meaning in RowAccumulator doc.

s: &ScalarValue,
) -> Result<()> {
match (dt, s) {
// float64 coerces everything to f64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I almost wonder how valuable supporting all these types are -- like I wonder if we can use u64 or i64 accumulators for all integer types and f64 for floats and reduce the code. I don't think this PR is making things any better or worse, but it just seems like these type match statements are so common and repetitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. We should clean this up by probably by checking type coercions.

accessor: &mut RowAccessor,
) -> Result<()> {
let values = &values[0];
add_to_row(&self.datatype, self.index, accessor, &sum_batch(values)?)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is needed to go through sum_batch here (which turns the sum into a ScalarValue) -- perhaps we could call the appropriate sum kernel followed by a direct update

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially use sum_batch here mainly to reduce code duplication with that of SumAccumulator, besides, there's a decimal sum_batch that isn't included in the compute kernel yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, possibly related #2447

datafusion/core/src/physical_plan/aggregates/row_hash.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/aggregates/row_hash.rs Outdated Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented May 6, 2022

Thanks @alamb, for the detailed review ❤️. I'll try to answer or fix them today.

Micro benchmark: aggregate_query_sql

Existing aggregate_query_sql with a newly added case:

    c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \     
                 FROM t GROUP BY u64_wide, utf8",
            )
        })
    });

I'm using a compound group by key with many distinct values in the newly added case.

The results are:

The master branch

aggregate_query_group_by                        
                        time:   [2.0366 ms 2.0448 ms 2.0531 ms]

aggregate_query_group_by_with_filter                        
                        time:   [1.4311 ms 1.4338 ms 1.4366 ms]

aggregate_query_group_by_u64 15 12                        
                        time:   [2.0208 ms 2.0283 ms 2.0358 ms]

aggregate_query_group_by_with_filter_u64 15 12                        
                        time:   [1.4242 ms 1.4269 ms 1.4296 ms]

aggregate_query_group_by_with_filter_multiple_keys                        
                        time:   [1.4709 ms 1.4732 ms 1.4756 ms]

aggregate_query_group_by_u64_multiple_keys                        
                        time:   [14.239 ms 14.408 ms 14.589 ms]

This PR

aggregate_query_group_by                        
                        time:   [2.7145 ms 2.7273 ms 2.7400 ms]

aggregate_query_group_by_with_filter                        
                        time:   [1.4331 ms 1.4364 ms 1.4397 ms]

aggregate_query_group_by_u64 15 12                        
                        time:   [2.7358 ms 2.7493 ms 2.7631 ms]

aggregate_query_group_by_with_filter_u64 15 12                        
                        time:   [1.4269 ms 1.4298 ms 1.4328 ms]

aggregate_query_group_by_with_filter_multiple_keys                        
                        time:   [1.4761 ms 1.4788 ms 1.4818 ms]

aggregate_query_group_by_u64_multiple_keys                        
                        time:   [12.580 ms 12.787 ms 12.999 ms]

Improved: the newly introduced case with many distinct groups.

Regressed: group by with fewer groups.
I try to check regression on aggregate_query_group_by and aggregate_query_group_by_u64 15 12 by flamegaph-ing it, but find only 12 samples GroupedHashAggregateStream::next and takes 1.49% of all the samples.

Edit: by moving the physical plan creation part out from bench timing, there's still not much big difference on the flamegraph, AggregateStreamV1/V2::next only shows less than 10 samples and ~1% of all samples.

TPC-H query 1 ( Aggregate with four distinct states)

cargo run --release --features "mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path /home/yijie/sort_test/tpch-parquet --format parquet --query 1 --batch-size 4096

The Master branch

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 192.6 ms and returned 4 rows
Query 1 iteration 1 took 189.0 ms and returned 4 rows
Query 1 iteration 2 took 196.0 ms and returned 4 rows
Query 1 avg time: 192.55 ms

This PR

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 189.8 ms and returned 4 rows
Query 1 iteration 1 took 187.9 ms and returned 4 rows
Query 1 iteration 2 took 186.3 ms and returned 4 rows
Query 1 avg time: 188.00 ms

No difference in performance is observed, which is expected since there are few groups and mainly about the states in-cache calculation.

TPC-H q1 modified, with more groups:

select
    l_orderkey,
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_orderkey,
    l_returnflag,
    l_linestatus
order by
    l_orderkey,
    l_returnflag,
    l_linestatus;

The master branch

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 3956.7 ms and returned 2084634 rows
Query 1 iteration 1 took 3885.2 ms and returned 2084634 rows
Query 1 iteration 2 took 3928.9 ms and returned 2084634 rows
Query 1 avg time: 3923.62 ms

This PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 3219.5 ms and returned 2084634 rows
Query 1 iteration 1 took 3130.5 ms and returned 2084634 rows
Query 1 iteration 2 took 3107.4 ms and returned 2084634 rows
Query 1 avg time: 3152.48 ms

There are noticeable performance improvements as the number of grouping grows.

@@ -144,7 +173,8 @@ fn sum_decimal_batch(
}

// sums the array and returns a ScalarValue of its corresponding type.
pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result<ScalarValue> {
let values = &cast(values, sum_type)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the partial fix for #2455. We should cast the input array to sum result datatype first to alleviate the possibility of overflow. Further, we should have a wrapping sum kernel as well as a try_sum kernel to produce wrapped results or nulls in the case of overflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we could internally consider summing smaller integers using u128 and then detecting overflow at the end. 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are very nice benchmark improvements @yjshen 👍

I ran the IOx test suite against this branch (https://github.com/influxdata/influxdb_iox/pull/4531) and it seems to have worked great

@andygrove andygrove merged commit 6786203 into apache:master May 7, 2022
@yjshen
Copy link
Member Author

yjshen commented May 8, 2022

Perhaps I can make this less concerning by enumerating what work remains to entirely switch to RowAggregate (and remove AggregateStream entirely).

@alamb @andygrove I revisited our current row implementation and listed all the TODO items I could think of in #1861, and in the process, I think we can eliminate these code duplications and constantly improve performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sum should not panicked when overflow GroupedHashAggregate in row format
3 participants