diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 1c8f7a97dd7fe..7c36e3bc79af7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -783,6 +783,7 @@ object View { "spark.sql.hive.convertMetastoreParquet", "spark.sql.hive.convertMetastoreOrc", "spark.sql.hive.convertInsertingPartitionedTable", + "spark.sql.hive.convertInsertingUnpartitionedTable", "spark.sql.hive.convertMetastoreCtas" ).contains(key) || key.startsWith("spark.sql.catalog.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index d71d0d43683cb..cb5e7e7f42d29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -360,6 +360,7 @@ object ViewHelper extends SQLConfHelper with Logging { "spark.sql.hive.convertMetastoreParquet", "spark.sql.hive.convertMetastoreOrc", "spark.sql.hive.convertInsertingPartitionedTable", + "spark.sql.hive.convertInsertingUnpartitionedTable", "spark.sql.hive.convertMetastoreCtas", SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5972a9df78ecc..42e30ac0db934 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -194,6 +194,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { * - When writing to non-partitioned Hive-serde Parquet/Orc tables * - When writing to partitioned Hive-serde Parquet/Orc tables when * `spark.sql.hive.convertInsertingPartitionedTable` is true + * - When writing to unpartitioned Hive-serde Parquet/Orc tables when + * `spark.sql.hive.convertInsertingUnpartitionedTable` is true * - When writing to directory with Hive-serde * - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS * - When scanning Hive-serde Parquet/ORC tables @@ -230,7 +232,8 @@ case class RelationConversions( case InsertIntoStatement( r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) + (r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE) || + !r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE)) && isConvertible(r) => InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols, query, overwrite, ifPartitionNotExists, byName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 68f34bd2beb01..b381b35e5f59a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -154,6 +154,16 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val CONVERT_INSERTING_UNPARTITIONED_TABLE = + buildConf("spark.sql.hive.convertInsertingUnpartitionedTable") + .doc("When set to true, and `spark.sql.hive.convertMetastoreParquet` or " + + "`spark.sql.hive.convertMetastoreOrc` is true, the built-in ORC/Parquet writer is used" + + "to process inserting into unpartitioned ORC/Parquet tables created by using the HiveSQL " + + "syntax.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas") .doc("When set to true, Spark will try to use built-in data source writer " + "instead of Hive serde in CTAS. This flag is effective only if " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index e52d9b639dc4f..610fc246cd845 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -284,6 +284,43 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } + test("SPARK-47850 ORC conversation could be applied for unpartitioned table insertion") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + Seq("true", "false").foreach { conversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true", + HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key -> conversion) { + withTable("dummy_orc_unpartitioned") { + spark.sql( + s""" + |CREATE TABLE dummy_orc_unpartitioned(key INT, value STRING) + |STORED AS ORC + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO TABLE dummy_orc_unpartitioned + |SELECT key, value FROM single + """.stripMargin) + + val orcUnpartitionedTable = TableIdentifier("dummy_orc_unpartitioned", Some("default")) + if (conversion == "true") { + // if converted, we refresh the cached relation. + assert(getCachedDataSourceTable(orcUnpartitionedTable) === null) + } else { + // otherwise, not cached. + assert(getCachedDataSourceTable(orcUnpartitionedTable) === null) + } + + val df = spark.sql("SELECT key, value FROM dummy_orc_unpartitioned WHERE key=0") + checkAnswer(df, singleRowDF) + } + } + } + } + } + test("SPARK-32234 read ORC table with column names all starting with '_col'") { Seq("native", "hive").foreach { orcImpl => Seq("false", "true").foreach { vectorized =>