Skip to content

Commit

Permalink
Merge C2C code to main
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
Chong Gao committed Nov 13, 2024
1 parent fcede85 commit 58b7f48
Show file tree
Hide file tree
Showing 12 changed files with 1,196 additions and 26 deletions.
15 changes: 15 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,21 @@ EOF
fi
export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'}

if [[ "$VELOX_TEST" -eq 1 ]]; then
if [ -z "${VELOX_JARS}" ]; then
echo "Error: Environment VELOX_JARS is not set."
exit 1
fi
export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${VELOX_JARS//:/,}"
export PYSP_TEST_spark_memory_offHeap_enabled=true
export PYSP_TEST_spark_memory_offHeap_size=512M
export PYSP_TEST_spark_gluten_loadLibFromJar=true
export PYSP_TEST_spark_rapids_sql_loadVelox=true
if [[ "$VELOX_HDFS_TEST" -eq 1 ]]; then
export PYSP_TEST_spark_rapids_sql_velox_useVeloxHDFS=true
fi
fi

SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}"
if [[ "${SPARK_SHELL_SMOKE_TEST}" != "0" ]]; then
echo "Running spark-shell smoke test..."
Expand Down
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@
<maven.scalastyle.skip>false</maven.scalastyle.skip>
<dist.jar.compress>true</dist.jar.compress>
<spark330.iceberg.version>0.14.1</spark330.iceberg.version>
<gluten.version>1.2.0</gluten.version>
<!--
If true, disables verification that all Shims be built as of one and the same git
commit hash. Do not use for CI!
Expand Down Expand Up @@ -1067,6 +1068,26 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${gluten.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${gluten.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.gluten</groupId>
<artifactId>spark-sql-columnar-shims-spark34</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
28 changes: 28 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${gluten.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${gluten.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.gluten</groupId>
<artifactId>spark-sql-columnar-shims-spark34</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-velox_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<!-- #if scala-2.13 --><!--
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI
import java.nio.ByteBuffer
import java.nio.channels.SeekableByteChannel
import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent._

Expand Down Expand Up @@ -58,6 +57,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rapids.velox.{FileCopyRange, VeloxHDFS}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -464,7 +464,6 @@ private case class GpuParquetFileFilterHandler(

private val PARQUET_ENCRYPTION_CONFS = Seq("parquet.encryption.kms.client.class",
"parquet.encryption.kms.client.class", "parquet.crypto.factory.class")
private val PARQUET_MAGIC_ENCRYPTED = "PARE".getBytes(StandardCharsets.US_ASCII)

private def isParquetTimeInInt96(parquetType: Type): Boolean = {
parquetType match {
Expand Down Expand Up @@ -536,8 +535,12 @@ private case class GpuParquetFileFilterHandler(
private def readFooterBuffer(
filePath: Path,
conf: Configuration): HostMemoryBuffer = {
PerfIO.readParquetFooterBuffer(filePath, conf, verifyParquetMagic)
.getOrElse(readFooterBufUsingHadoop(filePath, conf))
PerfIO.readParquetFooterBuffer(
filePath, conf, GpuParquetUtils.verifyParquetMagic)
.getOrElse(
VeloxHDFS.readParquetFooterBuffer(filePath, conf)
.getOrElse(readFooterBufUsingHadoop(filePath, conf))
)
}

private def readFooterBufUsingHadoop(filePath: Path, conf: Configuration): HostMemoryBuffer = {
Expand All @@ -558,7 +561,7 @@ private case class GpuParquetFileFilterHandler(
val magic = new Array[Byte](MAGIC.length)
inputStream.readFully(magic)
val footerIndex = footerLengthIndex - footerLength
verifyParquetMagic(filePath, magic)
GpuParquetUtils.verifyParquetMagic(filePath, magic)
if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
throw new RuntimeException(s"corrupted file: the footer index is not within " +
s"the file: $footerIndex")
Expand All @@ -583,21 +586,6 @@ private case class GpuParquetFileFilterHandler(
}
}


private def verifyParquetMagic(filePath: Path, magic: Array[Byte]): Unit = {
if (!util.Arrays.equals(MAGIC, magic)) {
if (util.Arrays.equals(PARQUET_MAGIC_ENCRYPTED, magic)) {
throw new RuntimeException("The GPU does not support reading encrypted Parquet " +
"files. To read encrypted or columnar encrypted files, disable the GPU Parquet " +
s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.")
} else {
throw new RuntimeException(s"$filePath is not a Parquet file. " +
s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " +
s"but found ${util.Arrays.toString(magic)}")
}
}
}

private def readAndFilterFooter(
file: PartitionedFile,
conf : Configuration,
Expand Down Expand Up @@ -659,8 +647,15 @@ private case class GpuParquetFileFilterHandler(
}
}
}.getOrElse {
ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
VeloxHDFS.readParquetFooterBuffer(filePath, conf).map { hmb =>
withResource(hmb) { _ =>
ParquetFileReader.readFooter(new HMBInputFile(hmb),
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
}.getOrElse {
ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
}
}
}
Expand Down Expand Up @@ -1604,13 +1599,36 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
conf, out.buffer, filePath.toUri,
coalescedRanges.map(r => IntRangeWithOffset(r.offset, r.length, r.outputOffset))
).getOrElse {
// try to read data through VeloxHDFS if necessary
val streamHandle = VeloxHDFS.createInputFileStream(filePathString)
if (streamHandle > 0) {
// Builds ParquetCopyRange while computing total read size
val ranges = ArrayBuffer.empty[FileCopyRange]
val bufferAddr = out.buffer.getAddress
val readSize = coalescedRanges.foldLeft(0L) { (acc, blockCopy) =>
ranges += FileCopyRange(
blockCopy.offset,
blockCopy.length,
bufferAddr + blockCopy.outputOffset
)
out.seek(blockCopy.outputOffset + blockCopy.length)
acc + blockCopy.length
}
// leverage velox::HdfsReadFile to buffer Hdfs files (which based on libhdfs3)
VeloxHDFS.copyRangesFromFile(
filePathString, streamHandle, ranges,
closeAfterFinished = true
)
readSize
} else {
withResource(filePath.getFileSystem(conf).open(filePath)) { in =>
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
coalescedRanges.foldLeft(0L) { (acc, blockCopy) =>
acc + copyDataRange(blockCopy, in, out, copyBuffer)
}
}
}
}
// try to cache the remote ranges that were copied
remoteCopies.foreach { range =>
metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES, NoopMetric) += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,14 @@

package com.nvidia.spark.rapids

import java.nio.charset.StandardCharsets
import java.util
import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileWriter.MAGIC
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath}
import org.apache.parquet.schema.MessageType

Expand Down Expand Up @@ -83,4 +87,27 @@ object GpuParquetUtils extends Logging {

block
}

/**
* Verify the Magic code stored in the Parquet Footer
*
* @param filePath the path of Parquet file
* @param magic the Magic code extracted from the file
*/
def verifyParquetMagic(filePath: Path, magic: Array[Byte]): Unit = {
if (!util.Arrays.equals(MAGIC, magic)) {
if (util.Arrays.equals(PARQUET_MAGIC_ENCRYPTED, magic)) {
throw new RuntimeException("The GPU does not support reading encrypted Parquet " +
"files. To read encrypted or columnar encrypted files, disable the GPU Parquet " +
s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.")
} else {
throw new RuntimeException(s"$filePath is not a Parquet file. " +
s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " +
s"but found ${util.Arrays.toString(magic)}")
}
}
}

private val PARQUET_MAGIC_ENCRYPTED = "PARE".getBytes(StandardCharsets.US_ASCII)

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

private class GpuRowToColumnConverter(schema: StructType) extends Serializable {
class GpuRowToColumnConverter(schema: StructType) extends Serializable {
private val converters = schema.fields.map {
f => GpuRowToColumnConverter.getConverterForType(f.dataType, f.nullable)
}
Expand Down Expand Up @@ -594,7 +594,8 @@ class RowToColumnarIterator(
numOutputRows: GpuMetric = NoopMetric,
numOutputBatches: GpuMetric = NoopMetric,
streamTime: GpuMetric = NoopMetric,
opTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] {
opTime: GpuMetric = NoopMetric,
acquireGpuTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] {

private val targetSizeBytes = localGoal.targetSizeBytes
private var targetRows = 0
Expand Down Expand Up @@ -650,7 +651,11 @@ class RowToColumnarIterator(
// note that TaskContext.get() can return null during unit testing so we wrap it in an
// option here
Option(TaskContext.get())
.foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx))
.foreach { ctx =>
val acquireGpuStart = System.nanoTime()
GpuSemaphore.acquireIfNecessary(ctx)
acquireGpuTime += System.nanoTime() - acquireGpuStart
}

val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN,
opTime)) { _ =>
Expand Down
59 changes: 59 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,53 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val PARQUET_VELOX_READER = conf("spark.rapids.sql.parquet.useVelox")
.doc("Use velox to do ParquetScan (on CPUs)")
.internal()
.booleanConf
.createWithDefault(true)

object VeloxFilterPushdownType extends Enumeration {
val ALL_SUPPORTED, NONE, UNCHANGED = Value
}

val PUSH_DOWN_FILTERS_TO_VELOX = conf("spark.rapids.sql.parquet.pushDownFiltersToVelox")
.doc("Push down all supported filters to Velox if set to ALL_SUPPORTED. " +
"If set to NONE, no filters will be pushed down so all filters are on the GPU. " +
"If set to UNCHANGED, filters will be both pushed down and keeped on the GPU. " +
"UNCHANGED is to make the behavior same as before.")
.internal()
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(VeloxFilterPushdownType.values.map(_.toString))
.createWithDefault(VeloxFilterPushdownType.ALL_SUPPORTED.toString)

val ENABLE_NATIVE_VELOX_CONVERTER = conf("spark.rapids.sql.enableNativeVeloxConverter")
.doc("Re-formatting VeloxColumn to align with the memory layout of GpuColumn directly")
.internal()
.booleanConf
.createWithDefault(true)

val PARQUET_VELOX_PRELOAD_CAP = conf("spark.rapids.sql.parquet.veloxPreloadedBatches")
.doc("Preloading capacity of VeloxParquetScan. If > 0, will enable preloading" +
" VeloxScanIterator asynchronously in a separate thread")
.internal()
.integerConf
.createWithDefault(0)

val ENABLE_VELOX_HDFS = conf("spark.rapids.sql.velox.useVeloxHDFS")
.doc("Use HDFS reader of velox to do buffering instead of Hadoop Java API")
.internal()
.startupOnly()
.booleanConf
.createWithDefault(false)

val LOAD_VELOX = conf("spark.rapids.sql.loadVelox")
.doc("Load Velox (through Gluten) as a spark driver plugin")
.startupOnly()
.booleanConf
.createWithDefault(true)

val HASH_AGG_REPLACE_MODE = conf("spark.rapids.sql.hashAgg.replaceMode")
.doc("Only when hash aggregate exec has these modes (\"all\" by default): " +
"\"all\" (try to replace all aggregates, default), " +
Expand Down Expand Up @@ -2816,6 +2863,18 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS)

lazy val parquetVeloxReader: Boolean = get(PARQUET_VELOX_READER)

lazy val pushDownFiltersToVelox: String = get(PUSH_DOWN_FILTERS_TO_VELOX)

lazy val enableNativeVeloxConverter: Boolean = get(ENABLE_NATIVE_VELOX_CONVERTER)

lazy val parquetVeloxPreloadCapacity: Int = get(PARQUET_VELOX_PRELOAD_CAP)

lazy val enableVeloxHDFS: Boolean = get(ENABLE_VELOX_HDFS)

lazy val loadVelox: Boolean = get(LOAD_VELOX)

lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE)

lazy val partialMergeDistinctEnabled: Boolean = get(PARTIAL_MERGE_DISTINCT_ENABLED)
Expand Down
Loading

0 comments on commit 58b7f48

Please sign in to comment.