From d61654b0efa14bb4dd25414556fd180ed2050512 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Thu, 29 Feb 2024 09:19:07 +0800 Subject: [PATCH] GpuInsertIntoHiveTable supports parquet Signed-off-by: Firestarman --- .../main/python/hive_parquet_write_test.py | 172 ++++++++++++++++++ .../spark/rapids/GpuParquetFileFormat.scala | 4 +- ...leFormat.scala => GpuHiveFileFormat.scala} | 152 +++++++++++++--- .../rapids/shims/GpuInsertIntoHiveTable.scala | 17 +- .../rapids/shims/GpuInsertIntoHiveTable.scala | 17 +- 5 files changed, 321 insertions(+), 41 deletions(-) create mode 100644 integration_tests/src/main/python/hive_parquet_write_test.py rename sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/{GpuHiveTextFileFormat.scala => GpuHiveFileFormat.scala} (53%) diff --git a/integration_tests/src/main/python/hive_parquet_write_test.py b/integration_tests/src/main/python/hive_parquet_write_test.py new file mode 100644 index 00000000000..d6657db18ae --- /dev/null +++ b/integration_tests/src/main/python/hive_parquet_write_test.py @@ -0,0 +1,172 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_sql_writes_are_equal_collect +from data_gen import * +from hive_write_test import _restricted_timestamp +from marks import allow_non_gpu, ignore_order +from spark_session import with_cpu_session, is_before_spark_320 + +# Disable the meta conversion from Hive write to FrameData write in Spark, to test +# "GpuInsertIntoHiveTable" for Parquet write. +_write_to_hive_conf = {"spark.sql.hive.convertMetastoreParquet": False} + +_hive_basic_gens = [ + byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, + DateGen(start=date(1590, 1, 1)), _restricted_timestamp(), + DecimalGen(precision=19, scale=1, nullable=True), + DecimalGen(precision=23, scale=5, nullable=True), + DecimalGen(precision=36, scale=3, nullable=True)] + +_hive_basic_struct_gen = StructGen( + [['c'+str(ind), c_gen] for ind, c_gen in enumerate(_hive_basic_gens)]) + +_hive_struct_gens = [ + _hive_basic_struct_gen, + StructGen([['child0', byte_gen], ['child1', _hive_basic_struct_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + +_hive_array_gens = [ArrayGen(sub_gen) for sub_gen in _hive_basic_gens] + [ + ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), + ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), + ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] + +_hive_map_gens = [simple_string_to_string_map_gen] + [MapGen(f(nullable=False), f()) for f in [ + BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, + lambda nullable=True: _restricted_timestamp(nullable=nullable), + lambda nullable=True: DateGen(start=date(1590, 1, 1), nullable=nullable), + lambda nullable=True: DecimalGen(precision=19, scale=1, nullable=nullable), + lambda nullable=True: DecimalGen(precision=36, scale=5, nullable=nullable)]] + +_hive_write_gens = [_hive_basic_gens, _hive_struct_gens, _hive_array_gens, _hive_map_gens] + + +@allow_non_gpu(*non_utc_allow) +@ignore_order(local=True) +@pytest.mark.parametrize("is_ctas", [True, False], ids=['CTAS', 'CTTW']) +@pytest.mark.parametrize("gens", _hive_write_gens, ids=idfn) +def test_write_parquet_into_hive_table(spark_tmp_table_factory, is_ctas, gens): + + def gen_table(spark): + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] + types_sql_str = ','.join('{} {}'.format( + name, gen.data_type.simpleString()) for name, gen in gen_list) + data_table = spark_tmp_table_factory.get() + gen_df(spark, gen_list).createOrReplaceTempView(data_table) + return data_table, types_sql_str + + (input_table, input_schema) = with_cpu_session(gen_table) + + def write_to_hive_sql(spark, output_table): + if is_ctas: + # Create Table As Select + return [ + "CREATE TABLE {} STORED AS PARQUET AS SELECT * FROM {}".format( + output_table, input_table) + ] + else: + # Create Table Then Write + return [ + "CREATE TABLE {} ({}) STORED AS PARQUET".format(output_table, input_schema), + "INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table) + ] + + assert_gpu_and_cpu_sql_writes_are_equal_collect( + spark_tmp_table_factory, + write_to_hive_sql, + _write_to_hive_conf) + + +@allow_non_gpu(*non_utc_allow) +@ignore_order(local=True) +@pytest.mark.parametrize("is_static", [True, False], ids=['Static_Partition', 'Dynamic_Partition']) +def test_write_parquet_into_partitioned_hive_table(spark_tmp_table_factory, is_static): + # Generate hive table in Parquet format + def gen_table(spark): + # gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] + dates = [date(2024, 2, 28), date(2024, 2, 27), date(2024, 2, 26)] + gen_list = [('a', int_gen), + ('b', long_gen), + ('c', short_gen), + ('d', string_gen), + ('part', SetValuesGen(DateType(), dates))] + data_table = spark_tmp_table_factory.get() + gen_df(spark, gen_list).createOrReplaceTempView(data_table) + return data_table + + input_table = with_cpu_session(gen_table) + + def partitioned_write_to_hive_sql(spark, output_table): + sql_create_part_table = ( + "CREATE TABLE {} (a INT, b LONG, c SHORT, d STRING) " + "PARTITIONED BY (part DATE) STORED AS PARQUET" + ).format(output_table) + if is_static: + return [ + # sql_1: Create partitioned hive table + sql_create_part_table, + # sql_2: Static partition write only to partition 'par2' + "INSERT OVERWRITE TABLE {} PARTITION (part='2024-02-25') " + "SELECT a, b, c, d FROM {}".format(output_table, input_table) + ] + else: + return [ + # sql_1: Create partitioned hive table + sql_create_part_table, + # sql_2: Dynamic partition write + "INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table) + ] + all_confs = copy_and_update(_write_to_hive_conf, { + "hive.exec.dynamic.partition.mode": "nonstrict"}) + assert_gpu_and_cpu_sql_writes_are_equal_collect( + spark_tmp_table_factory, + partitioned_write_to_hive_sql, + all_confs) + + +zstd_param = pytest.param('ZSTD', + marks=pytest.mark.skipif(is_before_spark_320(), reason="zstd is not supported before 320")) + +@allow_non_gpu(*non_utc_allow) +@ignore_order(local=True) +@pytest.mark.parametrize("comp_type", ['UNCOMPRESSED', 'SNAPPY', zstd_param]) +def test_write_compressed_parquet_into_hive_table(spark_tmp_table_factory, comp_type): + # Generate hive table in Parquet format + def gen_table(spark): + gens = _hive_basic_gens + _hive_struct_gens + _hive_array_gens + _hive_map_gens + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] + types_sql_str = ','.join('{} {}'.format( + name, gen.data_type.simpleString()) for name, gen in gen_list) + data_table = spark_tmp_table_factory.get() + gen_df(spark, gen_list).createOrReplaceTempView(data_table) + return data_table, types_sql_str + + input_table, schema_str = with_cpu_session(gen_table) + + def write_to_hive_sql(spark, output_table): + return [ + # Create table with compression type + "CREATE TABLE {} ({}) STORED AS PARQUET " + "TBLPROPERTIES ('parquet.compression'='{}')".format( + output_table, schema_str, comp_type), + # Insert into table + "INSERT OVERWRITE TABLE {} SELECT * FROM {}".format(output_table, input_table) + ] + + assert_gpu_and_cpu_sql_writes_are_equal_collect( + spark_tmp_table_factory, + write_to_hive_sql, + _write_to_hive_conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index e8ae977b1f6..25105386b3d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -321,7 +321,7 @@ class GpuParquetWriter( new GpuColumnVector(cv.dataType, deepTransformColumn(cv.getBase, cv.dataType)) .asInstanceOf[org.apache.spark.sql.vectorized.ColumnVector] } - new ColumnarBatch(transformedCols) + new ColumnarBatch(transformedCols, batch.numRows()) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTextFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala similarity index 53% rename from sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTextFileFormat.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 4595ea87ed3..e9eafff53ca 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveTextFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,9 @@ package org.apache.spark.sql.hive.rapids import java.nio.charset.Charset +import java.util.Locale -import ai.rapids.cudf.{CSVWriterOptions, DType, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} +import ai.rapids.cudf.{CompressionType, CSVWriterOptions, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} import com.google.common.base.Charsets import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource @@ -27,14 +28,88 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.hive.rapids.shims.GpuInsertIntoHiveTableMeta -import org.apache.spark.sql.types.{DataType, StringType, StructType} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{DataType, Decimal, DecimalType, StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -object GpuHiveTextFileFormat extends Logging { +object GpuHiveFileFormat extends Logging { + private val parquetOutputFormatClass = + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" + private val parquetSerdeClass = + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" - private def checkIfEnabled(meta: GpuInsertIntoHiveTableMeta): Unit = { + def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta): Option[ColumnarFileFormat] = { + val insertCmd = meta.wrapped + // Bucketing write + if (insertCmd.table.bucketSpec.isDefined) { + meta.willNotWorkOnGpu("bucketed tables are not supported yet") + } + + // Infer the file format from the serde string, similar as what Spark does in + // RelationConversions for Hive. + val serde = insertCmd.table.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + val tempFileFormat = if (serde.contains("parquet")) { + // Parquet specific tagging + tagGpuSupportForParquet(meta) + } else { + // Default to text file format + tagGpuSupportForText(meta) + } + + if (meta.canThisBeReplaced) { + Some(tempFileFormat) + } else { + None + } + } + + private def tagGpuSupportForParquet(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = { + val insertCmd = meta.wrapped + val storage = insertCmd.table.storage + // Configs check for Parquet write enabling/disabling + + // FIXME Need to check serde and output format classes ? + if (storage.outputFormat.getOrElse("") != parquetOutputFormatClass) { + meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " + + s"only $parquetOutputFormatClass is currently supported for Parquet") + } + if (storage.serde.getOrElse("") != parquetSerdeClass) { + meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " + + s"only $parquetSerdeClass is currently supported for Parquet") + } + + // Decimal type check + val hasIntOrLongBackedDec = insertCmd.query.schema.exists { field => + TrampolineUtil.dataTypeExistsRecursively(field.dataType, { + case dec: DecimalType if dec.precision <= Decimal.MAX_LONG_DIGITS => true + case _ => false + }) + } + if (hasIntOrLongBackedDec) { + meta.willNotWorkOnGpu("decimal can fit inside an int or a long is not supported " + + s"for Parquet. Hive always writes decimal as binary array but GPU writes it " + + s"as an int or a long") + } + + // FIXME Need a new format type for Hive Parquet write ? + FileFormatChecks.tag(meta, insertCmd.table.schema, ParquetFormatType, WriteFileOp) + + // Compression type + val parquetOptions = new ParquetOptions(insertCmd.table.properties, insertCmd.conf) + val compressionType = + GpuParquetFileFormat.parseCompressionType(parquetOptions.compressionCodecClassName) + .getOrElse { + meta.willNotWorkOnGpu("compression codec " + + s"${parquetOptions.compressionCodecClassName} is not supported for Parquet") + CompressionType.NONE + } + new GpuHiveParquetFileFormat(compressionType) + } + + private def tagGpuSupportForText(meta: GpuInsertIntoHiveTableMeta): ColumnarFileFormat = { + import org.apache.spark.sql.hive.rapids.GpuHiveTextFileUtils._ if (!meta.conf.isHiveDelimitedTextEnabled) { meta.willNotWorkOnGpu("Hive text I/O has been disabled. To enable this, " + s"set ${RapidsConf.ENABLE_HIVE_TEXT} to true") @@ -43,21 +118,16 @@ object GpuHiveTextFileFormat extends Logging { meta.willNotWorkOnGpu("writing Hive delimited text tables has been disabled, " + s"to enable this, set ${RapidsConf.ENABLE_HIVE_TEXT_WRITE} to true") } - } - - def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta) - : Option[ColumnarFileFormat] = { - checkIfEnabled(meta) val insertCommand = meta.wrapped val storage = insertCommand.table.storage if (storage.outputFormat.getOrElse("") != textOutputFormat) { meta.willNotWorkOnGpu(s"unsupported output-format found: ${storage.outputFormat}, " + - s"only $textOutputFormat is currently supported") + s"only $textOutputFormat is currently supported for text") } if (storage.serde.getOrElse("") != lazySimpleSerDe) { meta.willNotWorkOnGpu(s"unsupported serde found: ${storage.serde}, " + - s"only $lazySimpleSerDe is currently supported") + s"only $lazySimpleSerDe is currently supported for text") } val serializationFormat = storage.properties.getOrElse(serializationKey, "1") @@ -86,28 +156,60 @@ object GpuHiveTextFileFormat extends Logging { meta.willNotWorkOnGpu("only UTF-8 is supported as the charset") } - if (insertCommand.table.bucketSpec.isDefined) { - meta.willNotWorkOnGpu("bucketed tables are not supported") - } - - if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toLowerCase - != "false") { + if (insertCommand.conf.getConfString("hive.exec.compress.output", "false").toBoolean) { meta.willNotWorkOnGpu("compressed output is not supported, " + "set hive.exec.compress.output to false to enable writing Hive text via GPU") } - FileFormatChecks.tag(meta, - insertCommand.table.schema, - HiveDelimitedTextFormatType, - WriteFileOp) + FileFormatChecks.tag(meta, insertCommand.table.schema, HiveDelimitedTextFormatType, + WriteFileOp) + + new GpuHiveTextFileFormat() + } +} + +class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFormat { + + override def prepareWrite(sparkSession: SparkSession, job: Job, + options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = { + + // Avoid referencing the outer object. + val compressionType = compType + new ColumnarOutputWriterFactory { + override def getFileExtension(context: TaskAttemptContext): String = + compressionType match { + case CompressionType.NONE => ".parquet" + case ct => s".${ct.name().toLowerCase(Locale.ROOT)}.parquet" + } + + override def newInstance(path: String, + dataSchema: StructType, + context: TaskAttemptContext): ColumnarOutputWriter = { + new GpuHiveParquetWriter(path, dataSchema, context, compressionType) + } + } + } +} + +class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, + context: TaskAttemptContext, compType: CompressionType) + extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) { - Some(new GpuHiveTextFileFormat()) + override protected val tableWriter: CudfTableWriter = { + val optionsBuilder = SchemaUtils + .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + writeInt96 = true, // Hive 1.2 write timestamp as INT96 + parquetFieldIdEnabled = false) + .withCompressionType(compType) + Table.writeParquetChunked(optionsBuilder.build(), this) } + } class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging { - override def supportDataType(dataType: DataType): Boolean = isSupportedType(dataType) + override def supportDataType(dataType: DataType): Boolean = + GpuHiveTextFileUtils.isSupportedType(dataType) override def prepareWrite(sparkSession: SparkSession, job: Job, diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala index 92fb72801c8..a04d35acc22 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala @@ -57,7 +57,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.client.hive._ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable -import org.apache.spark.sql.hive.rapids.{GpuHiveTextFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} +import org.apache.spark.sql.hive.rapids.{GpuHiveFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} import org.apache.spark.sql.vectorized.ColumnarBatch final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, @@ -69,16 +69,17 @@ final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, private var fileFormat: Option[ColumnarFileFormat] = None override def tagSelfForGpuInternal(): Unit = { - // Only Hive delimited text writes are currently supported. - // Check whether that is the format currently in play. - fileFormat = GpuHiveTextFileFormat.tagGpuSupport(this) + fileFormat = GpuHiveFileFormat.tagGpuSupport(this) } override def convertToGpu(): GpuDataWritingCommand = { + val format = fileFormat.getOrElse( + throw new IllegalStateException("fileFormat missing, tagSelfForGpu not called?")) + GpuInsertIntoHiveTable( table = wrapped.table, partition = wrapped.partition, - fileFormat = this.fileFormat.get, + fileFormat = format, query = wrapped.query, overwrite = wrapped.overwrite, ifPartitionNotExists = wrapped.ifPartitionNotExists, @@ -326,8 +327,10 @@ case class GpuInsertIntoHiveTable( if (!fs.delete(path, true)) { throw RapidsHiveErrors.cannotRemovePartitionDirError(path) } - // Don't let Hive do overwrite operation since it is slower. - doHiveOverwrite = false + // Don't let Hive do overwrite operation since it is slower. But still give a + // chance to forcely override this for some customized cases when this + // operation is optimized. + doHiveOverwrite = hadoopConf.getBoolean("hive.movetask.enable.dir.move", false) } } } diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala index 9105ab50e1e..418ad22a486 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.InsertIntoHiveTable -import org.apache.spark.sql.hive.rapids.{GpuHiveTextFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} +import org.apache.spark.sql.hive.rapids.{GpuHiveFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} import org.apache.spark.sql.vectorized.ColumnarBatch final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, @@ -59,16 +59,17 @@ final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, private var fileFormat: Option[ColumnarFileFormat] = None override def tagSelfForGpuInternal(): Unit = { - // Only Hive delimited text writes are currently supported. - // Check whether that is the format currently in play. - fileFormat = GpuHiveTextFileFormat.tagGpuSupport(this) + fileFormat = GpuHiveFileFormat.tagGpuSupport(this) } override def convertToGpu(): GpuDataWritingCommand = { + val format = fileFormat.getOrElse( + throw new IllegalStateException("fileFormat missing, tagSelfForGpu not called?")) + GpuInsertIntoHiveTable( table = wrapped.table, partition = wrapped.partition, - fileFormat = this.fileFormat.get, + fileFormat = format, query = wrapped.query, overwrite = wrapped.overwrite, ifPartitionNotExists = wrapped.ifPartitionNotExists, @@ -315,8 +316,10 @@ case class GpuInsertIntoHiveTable( if (!fs.delete(path, true)) { throw RapidsHiveErrors.cannotRemovePartitionDirError(path) } - // Don't let Hive do overwrite operation since it is slower. - doHiveOverwrite = false + // Don't let Hive do overwrite operation since it is slower. But still give a + // chance to forcely override this for some customized cases when this + // operation is optimized. + doHiveOverwrite = hadoopConf.getBoolean("hive.movetask.enable.dir.move", false) } } }