Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into json_to_structs
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Sep 27, 2024
2 parents 38dcf4a + fbd4db9 commit b0e8354
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 171 deletions.
5 changes: 4 additions & 1 deletion docs/dev/lore.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ partitions.

You also need to set `spark.rapids.sql.lore.dumpPath` to tell LORE where to dump the data, the
value of which should point to a directory. All dumped data of a query will live in this
directory. A typical directory hierarchy would look like this:
directory. Note, the directory may either not exist, in which case it will be created, or it should be empty.
If the directory exists and contains files, an `IllegalArgumentException` will be thrown to prevent overwriting existing data.

A typical directory hierarchy would look like this:

```console
+ loreId-10/
Expand Down
310 changes: 155 additions & 155 deletions integration_tests/src/main/python/json_matrix_test.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion jenkins/spark-nightly-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ if [[ $SKIP_DEPLOY != 'true' ]]; then
distWithReducedPom "deploy"

# this deploys selected submodules that is unconditionally built with Spark 3.2.0
$MVN -B deploy -pl $DEPLOY_SUBMODULES \
$MVN -B deploy -pl "!${DIST_PL}" \
-Dbuildver=$SPARK_BASE_SHIM_VERSION \
-DskipTests \
-Dmaven.scaladoc.skip -Dmaven.scalastyle.skip=true \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private final class GpuSemaphore() extends Logging {
def completeTask(context: TaskContext): Unit = {
val taskAttemptId = context.taskAttemptId()
GpuTaskMetrics.get.updateRetry(taskAttemptId)
GpuTaskMetrics.get.updateMaxGpuMemory(taskAttemptId)
val refs = tasks.remove(taskAttemptId)
if (refs == null) {
throw new IllegalStateException(s"Completion of unknown task $taskAttemptId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,21 @@ object GpuLore {
}

def dumpObject[T: ClassTag](obj: T, path: Path, hadoopConf: Configuration): Unit = {
withResource(path.getFileSystem(hadoopConf)) { fs =>
withResource(fs.create(path, false)) { fout =>
val serializerStream = SparkEnv.get.serializer.newInstance().serializeStream(fout)
withResource(serializerStream) { ser =>
ser.writeObject(obj)
}
val fs = path.getFileSystem(hadoopConf)
withResource(fs.create(path, true)) { fout =>
val serializerStream = SparkEnv.get.serializer.newInstance().serializeStream(fout)
withResource(serializerStream) { ser =>
ser.writeObject(obj)
}
}
}

def loadObject[T: ClassTag](path: Path, hadoopConf: Configuration): T = {
withResource(path.getFileSystem(hadoopConf)) { fs =>
withResource(fs.open(path)) { fin =>
val serializerStream = SparkEnv.get.serializer.newInstance().deserializeStream(fin)
withResource(serializerStream) { ser =>
ser.readObject().asInstanceOf[T]
}
val fs = path.getFileSystem(hadoopConf)
withResource(fs.open(path)) { fin =>
val serializerStream = SparkEnv.get.serializer.newInstance().deserializeStream(fin)
withResource(serializerStream) { ser =>
ser.readObject().asInstanceOf[T]
}
}
}
Expand Down Expand Up @@ -186,6 +184,12 @@ object GpuLore {
idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement()
}
}
/**
* Executions that have checked the lore output root path.
* Key is [[SQLExecution.EXECUTION_ID_KEY]].
*/
private val loreOutputRootPathChecked: ConcurrentHashMap[String, Boolean] =
new ConcurrentHashMap[String, Boolean]()

def tagForLore(sparkPlan: SparkPlan, rapidsConf: RapidsConf): SparkPlan = {
val loreDumpIds = rapidsConf.loreDumpIds
Expand All @@ -197,6 +201,20 @@ object GpuLore {
s"when ${RapidsConf.LORE_DUMP_IDS.key} is set."))

val spark = SparkShimImpl.sessionFromPlan(sparkPlan)

Option(spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).foreach {
executionId =>
loreOutputRootPathChecked.computeIfAbsent(executionId, _ => {
val path = new Path(loreOutputRootPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path) && fs.listStatus(path).nonEmpty) {
throw new IllegalArgumentException(
s"LORE dump path $loreOutputRootPath already exists and is not empty.")
}
true
})
}

val hadoopConf = {
val sc = spark.sparkContext
sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch])
private def dumpCurrentBatch(): ColumnarBatch = {
val outputPath = pathOfBatch(split.index, batchIdx)
val outputStream = outputPath.getFileSystem(info.hadoopConf.value.value)
.create(outputPath, false)
.create(outputPath, true)
DumpUtils.dumpToParquet(nextBatch.get, outputStream)
nextBatch.get
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ object GpuJsonReadCommon {
.withLeadingZeros(options.allowNumericLeadingZeros)
.withNonNumericNumbers(options.allowNonNumericNumbers)
.withUnquotedControlChars(allowUnquotedControlChars)
.withCudfPruneSchema(true)
.withExperimental(true)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] {
override def value: NanoTime = NanoTime(_sum)
}

class HighWatermarkAccumulator extends AccumulatorV2[jl.Long, Long] {
private var _value = 0L
override def isZero: Boolean = _value == 0

override def copy(): HighWatermarkAccumulator = {
val newAcc = new HighWatermarkAccumulator
newAcc._value = this._value
newAcc
}

override def reset(): Unit = {
_value = 0
}

override def add(v: jl.Long): Unit = {
_value += v
}

override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match {
case wa: HighWatermarkAccumulator =>
_value = _value.max(wa._value)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Long = _value
}

class GpuTaskMetrics extends Serializable {
private val semWaitTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
Expand All @@ -91,6 +120,8 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromHostTimeNs = new NanoSecondAccumulator
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuRetryCount" -> retryCount,
Expand All @@ -100,7 +131,8 @@ class GpuTaskMetrics extends Serializable {
"gpuSpillToHostTime" -> spillToHostTimeNs,
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -178,6 +210,18 @@ class GpuTaskMetrics extends Serializable {
retryComputationTime.add(compNs)
}
}

def updateMaxGpuMemory(taskAttemptId: Long): Unit = {
val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId)
if (maxMem > 0) {
// This metric tracks the max amount of memory that is allocated on the gpu during
// the lifespan of a task. However, this update function only gets called once on task
// completion, whereas the actual logic tracking of the max value during memory allocations
// lives in the JNI. Therefore, we can stick the convention here of calling the add method
// instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.lore

import com.nvidia.spark.rapids.{FunSuiteWithTempDir, GpuColumnarToRowExec, RapidsConf, SparkQueryCompareTestSuite}
import com.nvidia.spark.rapids.Arm.withResource
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -147,6 +148,26 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w
}
}

test("Non-empty lore dump path") {
withGpuSparkSession{ spark =>
spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath)
spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "3[*]")

//Create a file in the root path
val path = new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/test")
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
withResource(fs.create(path, true)) { _ =>
}

val df = spark.range(0, 1000, 1, 100)
.selectExpr("id % 10 as key", "id % 100 as value")

assertThrows[IllegalArgumentException] {
df.collect()
}
}
}

private def doTestReplay(loreDumpIds: String)(dfFunc: SparkSession => DataFrame) = {
val loreId = OutputLoreId.parse(loreDumpIds).head._1
withGpuSparkSession { spark =>
Expand Down

0 comments on commit b0e8354

Please sign in to comment.