Skip to content

Commit

Permalink
GpuInsertIntoHiveTable supports parquet
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed May 28, 2024
1 parent 64bbb36 commit d61654b
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 41 deletions.
172 changes: 172 additions & 0 deletions integration_tests/src/main/python/hive_parquet_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_sql_writes_are_equal_collect
from data_gen import *
from hive_write_test import _restricted_timestamp
from marks import allow_non_gpu, ignore_order
from spark_session import with_cpu_session, is_before_spark_320

# Disable the meta conversion from Hive write to FrameData write in Spark, to test
# "GpuInsertIntoHiveTable" for Parquet write.
_write_to_hive_conf = {"spark.sql.hive.convertMetastoreParquet": False}

_hive_basic_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen,
DateGen(start=date(1590, 1, 1)), _restricted_timestamp(),
DecimalGen(precision=19, scale=1, nullable=True),
DecimalGen(precision=23, scale=5, nullable=True),
DecimalGen(precision=36, scale=3, nullable=True)]

_hive_basic_struct_gen = StructGen(
[['c'+str(ind), c_gen] for ind, c_gen in enumerate(_hive_basic_gens)])

_hive_struct_gens = [
_hive_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', _hive_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

_hive_array_gens = [ArrayGen(sub_gen) for sub_gen in _hive_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

_hive_map_gens = [simple_string_to_string_map_gen] + [MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen,
lambda nullable=True: _restricted_timestamp(nullable=nullable),
lambda nullable=True: DateGen(start=date(1590, 1, 1), nullable=nullable),
lambda nullable=True: DecimalGen(precision=19, scale=1, nullable=nullable),
lambda nullable=True: DecimalGen(precision=36, scale=5, nullable=nullable)]]

_hive_write_gens = [_hive_basic_gens, _hive_struct_gens, _hive_array_gens, _hive_map_gens]


@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("is_ctas", [True, False], ids=['CTAS', 'CTTW'])
@pytest.mark.parametrize("gens", _hive_write_gens, ids=idfn)
def test_write_parquet_into_hive_table(spark_tmp_table_factory, is_ctas, gens):

def gen_table(spark):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str

(input_table, input_schema) = with_cpu_session(gen_table)

def write_to_hive_sql(spark, output_table):
if is_ctas:
# Create Table As Select
return [
"CREATE TABLE {} STORED AS PARQUET AS SELECT * FROM {}".format(
output_table, input_table)
]
else:
# Create Table Then Write
return [
"CREATE TABLE {} ({}) STORED AS PARQUET".format(output_table, input_schema),
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]

assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
write_to_hive_sql,
_write_to_hive_conf)


@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("is_static", [True, False], ids=['Static_Partition', 'Dynamic_Partition'])
def test_write_parquet_into_partitioned_hive_table(spark_tmp_table_factory, is_static):
# Generate hive table in Parquet format
def gen_table(spark):
# gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
dates = [date(2024, 2, 28), date(2024, 2, 27), date(2024, 2, 26)]
gen_list = [('a', int_gen),
('b', long_gen),
('c', short_gen),
('d', string_gen),
('part', SetValuesGen(DateType(), dates))]
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table

input_table = with_cpu_session(gen_table)

def partitioned_write_to_hive_sql(spark, output_table):
sql_create_part_table = (
"CREATE TABLE {} (a INT, b LONG, c SHORT, d STRING) "
"PARTITIONED BY (part DATE) STORED AS PARQUET"
).format(output_table)
if is_static:
return [
# sql_1: Create partitioned hive table
sql_create_part_table,
# sql_2: Static partition write only to partition 'par2'
"INSERT OVERWRITE TABLE {} PARTITION (part='2024-02-25') "
"SELECT a, b, c, d FROM {}".format(output_table, input_table)
]
else:
return [
# sql_1: Create partitioned hive table
sql_create_part_table,
# sql_2: Dynamic partition write
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]
all_confs = copy_and_update(_write_to_hive_conf, {
"hive.exec.dynamic.partition.mode": "nonstrict"})
assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
partitioned_write_to_hive_sql,
all_confs)


zstd_param = pytest.param('ZSTD',
marks=pytest.mark.skipif(is_before_spark_320(), reason="zstd is not supported before 320"))

@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("comp_type", ['UNCOMPRESSED', 'SNAPPY', zstd_param])
def test_write_compressed_parquet_into_hive_table(spark_tmp_table_factory, comp_type):
# Generate hive table in Parquet format
def gen_table(spark):
gens = _hive_basic_gens + _hive_struct_gens + _hive_array_gens + _hive_map_gens
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str

input_table, schema_str = with_cpu_session(gen_table)

def write_to_hive_sql(spark, output_table):
return [
# Create table with compression type
"CREATE TABLE {} ({}) STORED AS PARQUET "
"TBLPROPERTIES ('parquet.compression'='{}')".format(
output_table, schema_str, comp_type),
# Insert into table
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]

assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
write_to_hive_sql,
_write_to_hive_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -321,7 +321,7 @@ class GpuParquetWriter(
new GpuColumnVector(cv.dataType, deepTransformColumn(cv.getBase, cv.dataType))
.asInstanceOf[org.apache.spark.sql.vectorized.ColumnVector]
}
new ColumnarBatch(transformedCols)
new ColumnarBatch(transformedCols, batch.numRows())
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,9 @@
package org.apache.spark.sql.hive.rapids

import java.nio.charset.Charset
import java.util.Locale

import ai.rapids.cudf.{CSVWriterOptions, DType, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter}
import ai.rapids.cudf.{CompressionType, CSVWriterOptions, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter}
import com.google.common.base.Charsets
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
Expand All @@ -27,14 +28,88 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.hive.rapids.shims.GpuInsertIntoHiveTableMeta
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuHiveTextFileFormat extends Logging {
object GpuHiveFileFormat extends Logging {
private val parquetOutputFormatClass =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
private val parquetSerdeClass =
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"

private def checkIfEnabled(meta: GpuInsertIntoHiveTableMeta): Unit = {
def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta): Option[ColumnarFileFormat] = {
val insertCmd = meta.wrapped
// Bucketing write
if (insertCmd.table.bucketSpec.isDefined) {
meta.willNotWorkOnGpu("bucketed tables are not supported yet")
}

// Infer the file format from the serde string, similar as what Spark does in
// RelationConversions for Hive.
val serde = insertCmd.table.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
val tempFileFormat = if (serde.contains("parquet")) {
// Parquet specific tagging
tagGpuSupportForParquet(meta)
} else {
// Default to text file format
tagGpuSupportForText(meta)
}

if (meta.canThisBeReplaced) {
Some(tempFileFormat)
} else {
None
}
}

private def tagGpuSupportForParquet(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = {
val insertCmd = meta.wrapped
val storage = insertCmd.table.storage
// Configs check for Parquet write enabling/disabling

// FIXME Need to check serde and output format classes ?
if (storage.outputFormat.getOrElse("") != parquetOutputFormatClass) {
meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " +
s"only $parquetOutputFormatClass is currently supported for Parquet")
}
if (storage.serde.getOrElse("") != parquetSerdeClass) {
meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " +
s"only $parquetSerdeClass is currently supported for Parquet")
}

// Decimal type check
val hasIntOrLongBackedDec = insertCmd.query.schema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, {
case dec: DecimalType if dec.precision <= Decimal.MAX_LONG_DIGITS => true
case _ => false
})
}
if (hasIntOrLongBackedDec) {
meta.willNotWorkOnGpu("decimal can fit inside an int or a long is not supported " +
s"for Parquet. Hive always writes decimal as binary array but GPU writes it " +
s"as an int or a long")
}

// FIXME Need a new format type for Hive Parquet write ?
FileFormatChecks.tag(meta, insertCmd.table.schema, ParquetFormatType, WriteFileOp)

// Compression type
val parquetOptions = new ParquetOptions(insertCmd.table.properties, insertCmd.conf)
val compressionType =
GpuParquetFileFormat.parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse {
meta.willNotWorkOnGpu("compression codec " +
s"${parquetOptions.compressionCodecClassName} is not supported for Parquet")
CompressionType.NONE
}
new GpuHiveParquetFileFormat(compressionType)
}

private def tagGpuSupportForText(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = {
import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._
if (!meta.conf.isHiveDelimitedTextEnabled) {
meta.willNotWorkOnGpu("Hive text I/O has been disabled. To enable this, " +
s"set ${RapidsConf.ENABLE_HIVE_TEXT} to true")
Expand All @@ -43,21 +118,16 @@ object GpuHiveTextFileFormat extends Logging {
meta.willNotWorkOnGpu("writing Hive delimited text tables has been disabled, " +
s"to enable this, set ${RapidsConf.ENABLE_HIVE_TEXT_WRITE} to true")
}
}

def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta)
: Option[ColumnarFileFormat] = {
checkIfEnabled(meta)

val insertCommand = meta.wrapped
val storage = insertCommand.table.storage
if (storage.outputFormat.getOrElse("") != textOutputFormat) {
meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " +
s"only $textOutputFormat is currently supported")
s"only $textOutputFormat is currently supported for text")
}
if (storage.serde.getOrElse("") != lazySimpleSerDe) {
meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " +
s"only $lazySimpleSerDe is currently supported")
s"only $lazySimpleSerDe is currently supported for text")
}

val serializationFormat = storage.properties.getOrElse(serializationKey, "1")
Expand Down Expand Up @@ -86,28 +156,60 @@ object GpuHiveTextFileFormat extends Logging {
meta.willNotWorkOnGpu("only UTF-8 is supported as the charset")
}

if (insertCommand.table.bucketSpec.isDefined) {
meta.willNotWorkOnGpu("bucketed tables are not supported")
}

if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toLowerCase
!= "false") {
if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toBoolean) {
meta.willNotWorkOnGpu("compressed output is not supported, " +
"set hive.exec.compress.output to false to enable writing Hive text via GPU")
}

FileFormatChecks.tag(meta,
insertCommand.table.schema,
HiveDelimitedTextFormatType,
WriteFileOp)
FileFormatChecks.tag(meta, insertCommand.table.schema, HiveDelimitedTextFormatType,
WriteFileOp)

new GpuHiveTextFileFormat()
}
}

class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFormat {

override def prepareWrite(sparkSession: SparkSession, job: Job,
options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = {

// Avoid referencing the outer object.
val compressionType = compType
new ColumnarOutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String =
compressionType match {
case CompressionType.NONE => ".parquet"
case ct => s".${ct.name().toLowerCase(Locale.ROOT)}.parquet"
}

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType)
}
}
}
}

class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
context: TaskAttemptContext, compType: CompressionType)
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) {

Some(new GpuHiveTextFileFormat())
override protected val tableWriter: CudfTableWriter = {
val optionsBuilder = SchemaUtils
.writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
writeInt96 = true, // Hive 1.2 write timestamp as INT96
parquetFieldIdEnabled = false)
.withCompressionType(compType)
Table.writeParquetChunked(optionsBuilder.build(), this)
}

}

class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging {

override def supportDataType(dataType: DataType): Boolean = isSupportedType(dataType)
override def supportDataType(dataType: DataType): Boolean =
GpuHiveTextFileUtils.isSupportedType(dataType)

override def prepareWrite(sparkSession: SparkSession,
job: Job,
Expand Down
Loading

0 comments on commit d61654b

Please sign in to comment.