Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate #8391

Open
revans2 opened this issue May 24, 2023 · 0 comments · May be fixed by #11116
Open

[FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate #8391

revans2 opened this issue May 24, 2023 · 0 comments · May be fixed by #11116
Labels
feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@revans2
Copy link
Collaborator

revans2 commented May 24, 2023

Is your feature request related to a problem? Please describe.
Currently if the intermediate data for a hash aggregate is too large we fall back to sorting the data, splitting it on key boundaries and then merging the intermediate results.

// Use the out of core sort iterator to sort the batches by grouping key
outOfCoreIter = Some(GpuOutOfCoreSortIterator(
aggregatedBatchIter,
sorter,
LazilyGeneratedOrdering.forSchema(TrampolineUtil.fromAttributes(groupingAttributes)),
configuredTargetBatchSize,
opTime = metrics.opTime,
sortTime = metrics.sortTime,
outputBatches = NoopMetric,
outputRows = NoopMetric,
peakDevMemory = NoopMetric))
// The out of core sort iterator does not guarantee that a batch contains all of the values
// for a particular key, so add a key batching iterator to enforce this. That allows each batch
// to be merge-aggregated safely since all values associated with a particular key are
// guaranteed to be in the same batch.
val keyBatchingIter = new GpuKeyBatchingIterator(
outOfCoreIter.get,
sorter,
aggBatchTypes.toArray,
configuredTargetBatchSize,
numInputRows = NoopMetric,
numInputBatches = NoopMetric,
numOutputRows = NoopMetric,
numOutputBatches = NoopMetric,
concatTime = metrics.concatTime,
opTime = metrics.opTime,
peakDevMemory = NoopMetric)

This works, but we know that sort is very expensive, especially compared to doing hash partitioning.

#8390 was filed to see if we can find a cheaper way to avoid spilling as much memory as we build up batches, and also as a way to avoid doing the sort/key partitioning.

This is here to try and replace the sort/key partitioning entirely, and may even replace some of #8390 assuming that it works out well.

Describe the solution you'd like
I would like a patch that replaces the outOfCoreIter and keyBatchingIter with something closer to what happens in the GpuSubPartitionHashJoin

https://github.com/NVIDIA/spark-rapids/blob/branch-23.06/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala

The idea would be to partition each intermediate batch using a hash seed that is different from the one used to partition the data in a hash partitioning. Unlike join I think we could use a different partitioning implementation because we don't need to join keys of possibly different types together, but it might be nice to keep the code common if possible.

I was thinking that as soon as we see intermediate batches are larger than the target batch size, and we are not going to just output the data without a merge because of #7404, then we start to re-partition the intermediate results.

In the first version of this we do it just like join. We do one pass through the data and partition it 16 ways. After that first pass we can combine small partitions together and process them to produce a result that we can output. If there are any partitions larger than the target batch size we can split them further with a heuristic that should be fairly accurate based off of the size of the data, but again it should use a different seed that the previous passes. Then we can merge them and output them from the second pass.

If #8390 looks good it might me worth thinking about playing some games with merging intermediate results after we do the partitioning instead of doing a second pass at partitioning, but at that point we likely have 4 GiB of data cached (512 MiB per partition with 16 partitions) and I don't know how likely all of that has spilled, and if it has spilled is the cost to read back in that data is worth the savings in merging it.

For now I would say just stick with something simple and then we can modify it as we see needs.

As for testing I want to see a number of different situations that we test from both a performance and a reliability standpoint. All of these would be from the standpoint of a lot of data going into a small number of tasks. Like we had way too few shuffle partitions for the size of the data. A lot of this is going to really be about the cardinality and ordering of the grouping keys.

I want to see what happens when the key's
1. cardinality is high and is highly grouped (the data is almost sorted by the key, should get good combining initially, but not after a first pass)
2. cardinality is high and is randomly distributed (should get almost no combining in the partial)
3. cardinality is low and is highly grouped
4. cardinality is low and is randomly distributed
5. cardinality is medium and is high grouped
6. cardinality is medium and is randomly distributed

By high cardinality I mean we each key shows up 2 to 3 times in the entire dataset, for medium 200 to 300 times, and for low 20,000 to 30,000 times. But we want enough data that a single task cannot hold all of it in GPU memory. At least a few hundred GiB of data.

With this I would also like to see how it behaves with large numbers of aggregations. That can increase the size of the input data by adding lots of new columns, but also when combined with #8382 we might get lots of batches with relatively few rows in each.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify labels May 24, 2023
@mattahrens mattahrens added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label May 25, 2023
@revans2 revans2 added the performance A performance related task/issue label May 25, 2023
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 31, 2023
@binmahone binmahone linked a pull request Jul 1, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants