Skip to content

Commit

Permalink
[SPARK-47850][SQL][HIVE] Support converting insert for unpartitioned …
Browse files Browse the repository at this point in the history
…Hive table
  • Loading branch information
pan3793 committed Apr 15, 2024
1 parent e7d0ba7 commit f7dd647
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ object View {
"spark.sql.hive.convertMetastoreParquet",
"spark.sql.hive.convertMetastoreOrc",
"spark.sql.hive.convertInsertingPartitionedTable",
"spark.sql.hive.convertInsertingUnpartitionedTable",
"spark.sql.hive.convertMetastoreCtas"
).contains(key) || key.startsWith("spark.sql.catalog.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ object ViewHelper extends SQLConfHelper with Logging {
"spark.sql.hive.convertMetastoreParquet",
"spark.sql.hive.convertMetastoreOrc",
"spark.sql.hive.convertInsertingPartitionedTable",
"spark.sql.hive.convertInsertingUnpartitionedTable",
"spark.sql.hive.convertMetastoreCtas",
SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
* - When writing to non-partitioned Hive-serde Parquet/Orc tables
* - When writing to partitioned Hive-serde Parquet/Orc tables when
* `spark.sql.hive.convertInsertingPartitionedTable` is true
* - When writing to unpartitioned Hive-serde Parquet/Orc tables when
* `spark.sql.hive.convertInsertingUnpartitionedTable` is true
* - When writing to directory with Hive-serde
* - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS
* - When scanning Hive-serde Parquet/ORC tables
Expand Down Expand Up @@ -230,7 +232,8 @@ case class RelationConversions(
case InsertIntoStatement(
r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
(!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
(r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE) ||
!r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE))
&& isConvertible(r) =>
InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols,
query, overwrite, ifPartitionNotExists, byName)
Expand Down
10 changes: 10 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)

val CONVERT_INSERTING_UNPARTITIONED_TABLE =
buildConf("spark.sql.hive.convertInsertingUnpartitionedTable")
.doc("When set to true, and `spark.sql.hive.convertMetastoreParquet` or " +
"`spark.sql.hive.convertMetastoreOrc` is true, the built-in ORC/Parquet writer is used" +
"to process inserting into unpartitioned ORC/Parquet tables created by using the HiveSQL " +
"syntax.")
.version("4.0.0")
.booleanConf
.createWithDefault(true)

val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas")
.doc("When set to true, Spark will try to use built-in data source writer " +
"instead of Hive serde in CTAS. This flag is effective only if " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,43 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
}
}

test("SPARK-47850 ORC conversation could be applied for unpartitioned table insertion") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")
Seq("true", "false").foreach { conversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key -> conversion) {
withTable("dummy_orc_unpartitioned") {
spark.sql(
s"""
|CREATE TABLE dummy_orc_unpartitioned(key INT, value STRING)
|STORED AS ORC
""".stripMargin)

spark.sql(
s"""
|INSERT INTO TABLE dummy_orc_unpartitioned
|SELECT key, value FROM single
""".stripMargin)

val orcUnpartitionedTable = TableIdentifier("dummy_orc_unpartitioned", Some("default"))
if (conversion == "true") {
// if converted, we refresh the cached relation.
assert(getCachedDataSourceTable(orcUnpartitionedTable) === null)
} else {
// otherwise, not cached.
assert(getCachedDataSourceTable(orcUnpartitionedTable) === null)
}

val df = spark.sql("SELECT key, value FROM dummy_orc_unpartitioned WHERE key=0")
checkAnswer(df, singleRowDF)
}
}
}
}
}

test("SPARK-32234 read ORC table with column names all starting with '_col'") {
Seq("native", "hive").foreach { orcImpl =>
Seq("false", "true").foreach { vectorized =>
Expand Down

0 comments on commit f7dd647

Please sign in to comment.