Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Apr 17, 2024
1 parent 4dea7d0 commit c1f3ad8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, Ins
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS
import org.apache.spark.sql.internal.HiveSerDe


Expand Down Expand Up @@ -248,11 +249,11 @@ case class RelationConversions(
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always applied before
// `HiveAnalysis` and both of these rules are running once.
case i@InsertIntoHiveTable(
case i @ InsertIntoHiveTable(
tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) && i.getTagValue(i.BY_CTAS).isDefined =>
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) && i.getTagValue(BY_CTAS).isDefined =>
// validation is required to be done here before relation conversion.
DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
val hiveTable = DDLUtils.readHiveTable(tableDesc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDe
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand}
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS

/**
* Create table and insert the query result into it.
Expand Down Expand Up @@ -105,7 +106,7 @@ case class CreateHiveTableAsSelectCommand(
overwrite = false,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames)
insertHive.setTagValue(insertHive.BY_CTAS, ())
insertHive.setTagValue(BY_CTAS, ())
insertHive
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ case class InsertIntoHiveTable(
@transient hiveTmpPath: HiveTempPath
) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {

/**
* A tag to identify if this command is created by a CTAS.
*/
val BY_CTAS = TreeNodeTag[Unit]("by_ctas")

override def staticPartitions: TablePartitionSpec = {
partition.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
}
Expand Down Expand Up @@ -241,6 +236,12 @@ case class InsertIntoHiveTable(
}

object InsertIntoHiveTable extends V1WritesHiveUtils {

/**
* A tag to identify if this command is created by a CTAS.
*/
val BY_CTAS = TreeNodeTag[Unit]("by_ctas")

def apply(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand Down

0 comments on commit c1f3ad8

Please sign in to comment.