Skip to content

Commit

Permalink
optimzing Expand+Aggregate in sqlw with many count distinct
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone committed May 14, 2024
1 parent c8e492e commit cdc867d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.util.Random

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuExpressionsUtils.NullVecCache
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -54,7 +55,9 @@ class GpuExpandExecMeta(
val projections = gpuProjections.map(_.map(_.convertToGpu()))
GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled,
preprojectEnabled = conf.isExpandPreprojectEnabled)
preprojectEnabled = conf.isExpandPreprojectEnabled,
cacheNullMaxCount = conf.expandCachingNullVecMaxCount,
coalesceAfter = conf.isCoalesceAfterExpandEnabled)
}
}

Expand All @@ -72,11 +75,17 @@ case class GpuExpandExec(
output: Seq[Attribute],
child: SparkPlan)(
useTieredProject: Boolean = false,
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {
preprojectEnabled: Boolean = false,
cacheNullMaxCount: Int = 0,
override val coalesceAfter: Boolean = true
) extends ShimUnaryExecNode with GpuExec {

override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef](
useTieredProject.asInstanceOf[java.lang.Boolean],
preprojectEnabled.asInstanceOf[java.lang.Boolean])
preprojectEnabled.asInstanceOf[java.lang.Boolean],
cacheNullMaxCount.asInstanceOf[java.lang.Integer],
coalesceAfter.asInstanceOf[java.lang.Boolean]
)

private val PRE_PROJECT_TIME = "preprojectTime"
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
Expand Down Expand Up @@ -127,7 +136,7 @@ case class GpuExpandExec(
}

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it))
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it), cacheNullMaxCount)
}
}

Expand Down Expand Up @@ -191,7 +200,8 @@ case class GpuExpandExec(
class GpuExpandIterator(
boundProjections: Seq[GpuTieredProject],
metrics: Map[String, GpuMetric],
it: Iterator[ColumnarBatch])
it: Iterator[ColumnarBatch],
cacheNullMaxCount: Int)
extends Iterator[ColumnarBatch] {

private var sb: Option[SpillableColumnarBatch] = None
Expand All @@ -206,9 +216,20 @@ class GpuExpandIterator(
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
sb.foreach(_.close())

if (cacheNullMaxCount > 0) {
import scala.collection.JavaConverters._
GpuExpressionsUtils.cachedNullVectors.get().values().asScala.foreach(_.close())
GpuExpressionsUtils.cachedNullVectors.get().clear()
}
}
}

if (cacheNullMaxCount > 0 && GpuExpressionsUtils.cachedNullVectors.get() == null) {
GpuExpressionsUtils.cachedNullVectors.set(new NullVecCache(cacheNullMaxCount))
}


override def hasNext: Boolean = sb.isDefined || it.hasNext

override def next(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.{ShimBinaryExpression, ShimExpression, ShimTernaryExpression, ShimUnaryExpression}
import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -52,6 +53,40 @@ object GpuExpressionsUtils {
"implemented and should have been disabled")
}

// This is only for ExpandExec which will generate a lot of null vectors
case class NullVecKey(d: DataType, n: Int)

class NullVecCache(private val maxNulls: Int)
extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) {
private var totalNulls: Long = 0L

override def clear(): Unit = {
super.clear()
totalNulls = 0
}

override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = {
if (v.getRowCount > maxNulls) {
throw new IllegalStateException(s"spark.rapids.sql.expandCachingNullVec.maxNulls" +
s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.")
}
val iter = entrySet().iterator()
while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) {
val entry = iter.next()
iter.remove()
totalNulls -= entry.getValue.getRowCount
}

val ret = super.put(key, v)
totalNulls += v.getRowCount
ret
}

override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException()
}

val cachedNullVectors = new ThreadLocal[NullVecCache]()

/**
* Tries to resolve a `GpuColumnVector` from a Scala `Any`.
*
Expand All @@ -73,7 +108,18 @@ object GpuExpressionsUtils {
def resolveColumnVector(any: Any, numRows: Int): GpuColumnVector = {
withResourceIfAllowed(any) {
case c: GpuColumnVector => c.incRefCount()
case s: GpuScalar => GpuColumnVector.from(s, numRows, s.dataType)
case s: GpuScalar =>
if (!s.isValid && cachedNullVectors.get() != null) {
if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) {
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
GpuColumnVector.from(s, numRows, s.dataType))
}

val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows))
ret.incRefCount()
} else {
GpuColumnVector.from(s, numRows, s.dataType)
}
case other =>
throw new IllegalArgumentException(s"Cannot resolve a ColumnVector from the value:" +
s" $other. Please convert it to a GpuScalar or a GpuColumnVector before returning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _: GpuDataSourceScanExec => true
case _: DataSourceV2ScanExecBase => true
case _: RDDScanExec => true // just in case an RDD was reading in data
case _: ExpandExec => true
case _ => false
}

Expand Down
18 changes: 18 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,20 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_COALESCE_AFTER_EXPAND = conf("spark.rapids.sql.coalesceAfterExpand.enabled")
.doc("When set to false disables the coalesce after GPU Expand. ")
.internal()
.booleanConf
.createWithDefault(false)

val EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT =
conf("spark.rapids.sql.expandCachingNullVec.maxNulls")
.doc("Max number of null scalar in null vectors to cache for GPU Expand. " +
"If the number of null scala exceeds this value, the null vectors will not be cached." +
"The value has to be positive for caching to be enabled.")
.internal().integerConf
.createWithDefault(0)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " +
Expand Down Expand Up @@ -2628,6 +2642,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isExpandPreprojectEnabled: Boolean = get(ENABLE_EXPAND_PREPROJECT)

lazy val isCoalesceAfterExpandEnabled: Boolean = get(ENABLE_COALESCE_AFTER_EXPAND)

lazy val expandCachingNullVecMaxCount: Int = get(EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT)

lazy val multiThreadReadNumThreads: Int = {
// Use the largest value set among all the options.
val deprecatedConfs = Seq(
Expand Down

0 comments on commit cdc867d

Please sign in to comment.