Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GpuInsertIntoHiveTable supports parquet #11

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions integration_tests/src/main/python/hive_parquet_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_sql_writes_are_equal_collect
from data_gen import *
from hive_write_test import _restricted_timestamp
from marks import allow_non_gpu, ignore_order
from spark_session import with_cpu_session, is_before_spark_320

# Disable the meta conversion from Hive write to FrameData write in Spark, to test
# "GpuInsertIntoHiveTable" for Parquet write.
_write_to_hive_conf = {"spark.sql.hive.convertMetastoreParquet": False}

_hive_basic_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen,
DateGen(start=date(1590, 1, 1)), _restricted_timestamp(),
DecimalGen(precision=19, scale=1, nullable=True),
DecimalGen(precision=23, scale=5, nullable=True),
DecimalGen(precision=36, scale=3, nullable=True)]

_hive_basic_struct_gen = StructGen(
[['c'+str(ind), c_gen] for ind, c_gen in enumerate(_hive_basic_gens)])

_hive_struct_gens = [
_hive_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', _hive_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

_hive_array_gens = [ArrayGen(sub_gen) for sub_gen in _hive_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

_hive_map_gens = [simple_string_to_string_map_gen] + [MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen,
lambda nullable=True: _restricted_timestamp(nullable=nullable),
lambda nullable=True: DateGen(start=date(1590, 1, 1), nullable=nullable),
lambda nullable=True: DecimalGen(precision=19, scale=1, nullable=nullable),
lambda nullable=True: DecimalGen(precision=36, scale=5, nullable=nullable)]]

_hive_write_gens = [_hive_basic_gens, _hive_struct_gens, _hive_array_gens, _hive_map_gens]


@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("is_ctas", [True, False], ids=['CTAS', 'CTTW'])
@pytest.mark.parametrize("gens", _hive_write_gens, ids=idfn)
def test_write_parquet_into_hive_table(spark_tmp_table_factory, is_ctas, gens):

def gen_table(spark):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str

(input_table, input_schema) = with_cpu_session(gen_table)

def write_to_hive_sql(spark, output_table):
if is_ctas:
# Create Table As Select
return [
"CREATE TABLE {} STORED AS PARQUET AS SELECT * FROM {}".format(
output_table, input_table)
]
else:
# Create Table Then Write
return [
"CREATE TABLE {} ({}) STORED AS PARQUET".format(output_table, input_schema),
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]

assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
write_to_hive_sql,
_write_to_hive_conf)


@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("is_static", [True, False], ids=['Static_Partition', 'Dynamic_Partition'])
def test_write_parquet_into_partitioned_hive_table(spark_tmp_table_factory, is_static):
# Generate hive table in Parquet format
def gen_table(spark):
# gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
dates = [date(2024, 2, 28), date(2024, 2, 27), date(2024, 2, 26)]
gen_list = [('a', int_gen),
('b', long_gen),
('c', short_gen),
('d', string_gen),
('part', SetValuesGen(DateType(), dates))]
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table

input_table = with_cpu_session(gen_table)

def partitioned_write_to_hive_sql(spark, output_table):
sql_create_part_table = (
"CREATE TABLE {} (a INT, b LONG, c SHORT, d STRING) "
"PARTITIONED BY (part DATE) STORED AS PARQUET"
).format(output_table)
if is_static:
return [
# sql_1: Create partitioned hive table
sql_create_part_table,
# sql_2: Static partition write only to partition 'par2'
"INSERT OVERWRITE TABLE {} PARTITION (part='2024-02-25') "
"SELECT a, b, c, d FROM {}".format(output_table, input_table)
]
else:
return [
# sql_1: Create partitioned hive table
sql_create_part_table,
# sql_2: Dynamic partition write
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]
all_confs = copy_and_update(_write_to_hive_conf, {
"hive.exec.dynamic.partition.mode": "nonstrict"})
assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
partitioned_write_to_hive_sql,
all_confs)


zstd_param = pytest.param('ZSTD',
marks=pytest.mark.skipif(is_before_spark_320(), reason="zstd is not supported before 320"))

@allow_non_gpu(*non_utc_allow)
@ignore_order(local=True)
@pytest.mark.parametrize("comp_type", ['UNCOMPRESSED', 'SNAPPY', zstd_param])
def test_write_compressed_parquet_into_hive_table(spark_tmp_table_factory, comp_type):
# Generate hive table in Parquet format
def gen_table(spark):
gens = _hive_basic_gens + _hive_struct_gens + _hive_array_gens + _hive_map_gens
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str

input_table, schema_str = with_cpu_session(gen_table)

def write_to_hive_sql(spark, output_table):
return [
# Create table with compression type
"CREATE TABLE {} ({}) STORED AS PARQUET "
"TBLPROPERTIES ('parquet.compression'='{}')".format(
output_table, schema_str, comp_type),
# Insert into table
"INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table)
]

assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
write_to_hive_sql,
_write_to_hive_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -321,7 +321,7 @@ class GpuParquetWriter(
new GpuColumnVector(cv.dataType, deepTransformColumn(cv.getBase, cv.dataType))
.asInstanceOf[org.apache.spark.sql.vectorized.ColumnVector]
}
new ColumnarBatch(transformedCols)
new ColumnarBatch(transformedCols, batch.numRows())
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,9 @@
package org.apache.spark.sql.hive.rapids

import java.nio.charset.Charset
import java.util.Locale

import ai.rapids.cudf.{CSVWriterOptions, DType, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter}
import ai.rapids.cudf.{CompressionType, CSVWriterOptions, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter}
import com.google.common.base.Charsets
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
Expand All @@ -27,14 +28,88 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.hive.rapids.shims.GpuInsertIntoHiveTableMeta
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuHiveTextFileFormat extends Logging {
object GpuHiveFileFormat extends Logging {
private val parquetOutputFormatClass =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
private val parquetSerdeClass =
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"

private def checkIfEnabled(meta: GpuInsertIntoHiveTableMeta): Unit = {
def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta): Option[ColumnarFileFormat] = {
val insertCmd = meta.wrapped
// Bucketing write
if (insertCmd.table.bucketSpec.isDefined) {
meta.willNotWorkOnGpu("bucketed tables are not supported yet")
}

// Infer the file format from the serde string, similar as what Spark does in
// RelationConversions for Hive.
val serde = insertCmd.table.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
val tempFileFormat = if (serde.contains("parquet")) {
// Parquet specific tagging
tagGpuSupportForParquet(meta)
} else {
// Default to text file format
tagGpuSupportForText(meta)
}

if (meta.canThisBeReplaced) {
Some(tempFileFormat)
} else {
None
}
}

private def tagGpuSupportForParquet(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = {
val insertCmd = meta.wrapped
val storage = insertCmd.table.storage
// Configs check for Parquet write enabling/disabling

// FIXME Need to check serde and output format classes ?
if (storage.outputFormat.getOrElse("") != parquetOutputFormatClass) {
meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " +
s"only $parquetOutputFormatClass is currently supported for Parquet")
}
if (storage.serde.getOrElse("") != parquetSerdeClass) {
meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " +
s"only $parquetSerdeClass is currently supported for Parquet")
}

// Decimal type check
val hasIntOrLongBackedDec = insertCmd.query.schema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, {
case dec: DecimalType if dec.precision <= Decimal.MAX_LONG_DIGITS => true
case _ => false
})
}
if (hasIntOrLongBackedDec) {
meta.willNotWorkOnGpu("decimal can fit inside an int or a long is not supported " +
s"for Parquet. Hive always writes decimal as binary array but GPU writes it " +
s"as an int or a long")
}

// FIXME Need a new format type for Hive Parquet write ?
FileFormatChecks.tag(meta, insertCmd.table.schema, ParquetFormatType, WriteFileOp)

// Compression type
val parquetOptions = new ParquetOptions(insertCmd.table.properties, insertCmd.conf)
val compressionType =
GpuParquetFileFormat.parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse {
meta.willNotWorkOnGpu("compression codec " +
s"${parquetOptions.compressionCodecClassName} is not supported for Parquet")
CompressionType.NONE
}
new GpuHiveParquetFileFormat(compressionType)
}

private def tagGpuSupportForText(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = {
import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._
if (!meta.conf.isHiveDelimitedTextEnabled) {
meta.willNotWorkOnGpu("Hive text I/O has been disabled. To enable this, " +
s"set ${RapidsConf.ENABLE_HIVE_TEXT} to true")
Expand All @@ -43,21 +118,16 @@ object GpuHiveTextFileFormat extends Logging {
meta.willNotWorkOnGpu("writing Hive delimited text tables has been disabled, " +
s"to enable this, set ${RapidsConf.ENABLE_HIVE_TEXT_WRITE} to true")
}
}

def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta)
: Option[ColumnarFileFormat] = {
checkIfEnabled(meta)

val insertCommand = meta.wrapped
val storage = insertCommand.table.storage
if (storage.outputFormat.getOrElse("") != textOutputFormat) {
meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " +
s"only $textOutputFormat is currently supported")
s"only $textOutputFormat is currently supported for text")
}
if (storage.serde.getOrElse("") != lazySimpleSerDe) {
meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " +
s"only $lazySimpleSerDe is currently supported")
s"only $lazySimpleSerDe is currently supported for text")
}

val serializationFormat = storage.properties.getOrElse(serializationKey, "1")
Expand Down Expand Up @@ -86,28 +156,60 @@ object GpuHiveTextFileFormat extends Logging {
meta.willNotWorkOnGpu("only UTF-8 is supported as the charset")
}

if (insertCommand.table.bucketSpec.isDefined) {
meta.willNotWorkOnGpu("bucketed tables are not supported")
}

if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toLowerCase
!= "false") {
if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toBoolean) {
meta.willNotWorkOnGpu("compressed output is not supported, " +
"set hive.exec.compress.output to false to enable writing Hive text via GPU")
}

FileFormatChecks.tag(meta,
insertCommand.table.schema,
HiveDelimitedTextFormatType,
WriteFileOp)
FileFormatChecks.tag(meta, insertCommand.table.schema, HiveDelimitedTextFormatType,
WriteFileOp)

new GpuHiveTextFileFormat()
}
}

class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFormat {

override def prepareWrite(sparkSession: SparkSession, job: Job,
options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = {

// Avoid referencing the outer object.
val compressionType = compType
new ColumnarOutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String =
compressionType match {
case CompressionType.NONE => ".parquet"
case ct => s".${ct.name().toLowerCase(Locale.ROOT)}.parquet"
}

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType)
}
}
}
}

class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
context: TaskAttemptContext, compType: CompressionType)
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) {

Some(new GpuHiveTextFileFormat())
override protected val tableWriter: CudfTableWriter = {
val optionsBuilder = SchemaUtils
.writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
writeInt96 = true, // Hive 1.2 write timestamp as INT96
parquetFieldIdEnabled = false)
.withCompressionType(compType)
Table.writeParquetChunked(optionsBuilder.build(), this)
}

}

class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging {

override def supportDataType(dataType: DataType): Boolean = isSupportedType(dataType)
override def supportDataType(dataType: DataType): Boolean =
GpuHiveTextFileUtils.isSupportedType(dataType)

override def prepareWrite(sparkSession: SparkSession,
job: Job,
Expand Down
Loading
Loading