Skip to content

Commit

Permalink
Fix a test error related to SerializedTableColumn
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed May 16, 2024
1 parent 9ce9b12 commit 807b9c4
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
ProjectExec(exprs, c2r)
}.getOrElse(c2r)
p.withNewChildren(Array(newChild))
case exec: GpuShuffleExchangeExecBase =>
addPostShuffleCoalesce(
exec.withNewChildren(Seq(optimizeGpuPlanTransitions(exec.child))))
case p =>
p.withNewChildren(p.children.map(optimizeGpuPlanTransitions))
}
Expand Down Expand Up @@ -514,6 +511,24 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
p.withNewChildren(p.children.map(c => insertCoalesce(c, shouldDisable)))
}

/**
* Inserts a shuffle coalesce after every shuffle to coalesce the serialized tables
* on the host before copying the data to the GPU.
* @note This should not be used in combination with the RAPIDS shuffle.
*/
private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match {
case exec: GpuShuffleExchangeExecBase =>
// always follow a GPU shuffle with a shuffle coalesce
if (GpuShuffleEnv.serializingOnGpu(rapidsConf)) {
GpuCoalesceBatches(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)),
TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)),
rapidsConf.gpuTargetBatchSizeBytes)
}
case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce))
}

/**
* Inserts a transition to be running on the CPU columnar
*/
Expand Down Expand Up @@ -786,6 +801,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
updatedPlan = insertColumnarFromGpu(updatedPlan)
updatedPlan = insertCoalesce(updatedPlan)
// only insert shuffle coalesces when using normal shuffle
if (!GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
updatedPlan = insertShuffleCoalesce(updatedPlan)
}
if (plan.conf.adaptiveExecutionEnabled) {
updatedPlan = optimizeAdaptiveTransitions(updatedPlan, None)
} else {
Expand Down

0 comments on commit 807b9c4

Please sign in to comment.