Skip to content

Commit

Permalink
[SPARK-49974][SQL] Move resolveRelations(...) out of the Analyzer.scala
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Move resolveRelation(...) and some of its dependencies out to a separate class, because it's reasonably self-contained.

### Why are the changes needed?

Analyzer.scala is 4K+ lines long, so it makes sense to gradually split it.

### Does this PR introduce _any_ user-facing change?

No, just the code is moved to a separate class.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

copilot.vim.

Closes apache#48475 from vladimirg-db/vladimirg-db/refactor-resolve-relations.

Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
vladimirg-db authored and MaxGekk committed Oct 15, 2024
1 parent 2ccdaba commit 14c01eb
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
Expand Down Expand Up @@ -203,6 +202,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
with CheckAnalysis with SQLConfHelper with ColumnResolutionHelper {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
private val relationResolution = new RelationResolution(catalogManager)

override protected def validatePlanChanges(
previousPlan: LogicalPlan,
Expand Down Expand Up @@ -972,30 +972,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}

private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
AnalysisContext.get.referredTempViewNames.exists { n =>
(n.length == nameParts.length) && n.zip(nameParts).forall {
case (a, b) => resolver(a, b)
}
}
}

// If we are resolving database objects (relations, functions, etc.) insides views, we may need to
// expand single or multi-part identifiers with the current catalog and namespace of when the
// view was created.
private def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts

if (nameParts.length == 1) {
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
} else if (catalogManager.isCatalogRegistered(nameParts.head)) {
nameParts
} else {
AnalysisContext.get.catalogAndNamespace.head +: nameParts
}
}

/**
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
Expand Down Expand Up @@ -1122,7 +1098,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
relationResolution.resolveRelation(u).getOrElse(u)
case other => other
}

Expand All @@ -1139,7 +1115,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case write: V2WriteCommand =>
write.table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).map(unwrapRelationPlan).map {
relationResolution.resolveRelation(u).map(unwrapRelationPlan).map {
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
case r: DataSourceV2Relation => write.withNewTable(r)
Expand All @@ -1154,12 +1130,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}

case u: UnresolvedRelation =>
resolveRelation(u).map(resolveViews).getOrElse(u)
relationResolution.resolveRelation(u).map(resolveViews).getOrElse(u)

case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)
resolveRelation(u, timeTravelSpec).getOrElse(r)
relationResolution.resolveRelation(u, timeTravelSpec).getOrElse(r)

case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
Expand Down Expand Up @@ -1194,40 +1170,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}.getOrElse(u)
}

private def lookupTempView(identifier: Seq[String]): Option[TemporaryViewRelation] = {
// We are resolving a view and this name is not a temp view when that view was created. We
// return None earlier here.
if (isResolvingView && !isReferredTempViewName(identifier)) return None
v1SessionCatalog.getRawLocalOrGlobalTempView(identifier)
}

private def resolveTempView(
identifier: Seq[String],
isStreaming: Boolean = false,
isTimeTravel: Boolean = false): Option[LogicalPlan] = {
lookupTempView(identifier).map { v =>
val tempViewPlan = v1SessionCatalog.getTempViewRelation(v)
if (isStreaming && !tempViewPlan.isStreaming) {
throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
}
if (isTimeTravel) {
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(identifier))
}
tempViewPlan
}
}

/**
* Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is
* for resolving DDL and misc commands.
*/
private def lookupTableOrView(
identifier: Seq[String],
viewOnly: Boolean = false): Option[LogicalPlan] = {
lookupTempView(identifier).map { tempView =>
relationResolution.lookupTempView(identifier).map { tempView =>
ResolvedTempView(identifier.asIdentifier, tempView.tableMeta)
}.orElse {
expandIdentifier(identifier) match {
relationResolution.expandIdentifier(identifier) match {
case CatalogAndIdentifier(catalog, ident) =>
if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) {
throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
Expand All @@ -1246,113 +1199,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}
}

private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
table: Option[Table],
options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] = {
table.map {
// To utilize this code path to execute V1 commands, e.g. INSERT,
// either it must be session catalog, or tracksPartitionsInCatalog
// must be false so it does not require use catalog to manage partitions.
// Obviously we cannot execute V1Table by V1 code path if the table
// is not from session catalog and the table still requires its catalog
// to manage partitions.
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog)
|| !v1Table.catalogTable.tracksPartitionsInCatalog =>
if (isStreaming) {
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError(
ident.quoted)
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
} else {
v1SessionCatalog.getRelation(v1Table.v1Table, options)
}

case table =>
if (isStreaming) {
val v1Fallback = table match {
case withFallback: V2TableWithV1Fallback =>
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true))
case _ => None
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
StreamingRelationV2(None, table.name, table, options, table.columns.toAttributes,
Some(catalog), Some(ident), v1Fallback))
} else {
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
}
}
}

/**
* Resolves relations to v1 relation if it's a v1 table from the session catalog, or to v2
* relation. This is for resolving DML commands and SELECT queries.
*/
private def resolveRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
val timeTravelSpecFromOptions = TimeTravelSpec.fromOptions(
u.options,
conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY),
conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY),
conf.sessionLocalTimeZone
)
if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) {
throw new AnalysisException("MULTIPLE_TIME_TRAVEL_SPEC", Map.empty[String, String])
}
val finalTimeTravelSpec = timeTravelSpec.orElse(timeTravelSpecFromOptions)
resolveTempView(u.multipartIdentifier, u.isStreaming, finalTimeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key =
((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq,
finalTimeTravelSpec)
AnalysisContext.get.relationCache.get(key).map { cache =>
val cachedRelation = cache.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}
u.getTagValue(LogicalPlan.PLAN_ID_TAG).map { planId =>
val cachedConnectRelation = cachedRelation.clone()
cachedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
cachedConnectRelation
}.getOrElse(cachedRelation)
}.orElse {
val writePrivilegesString =
Option(u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES))
val table = CatalogV2Util.loadTable(
catalog, ident, finalTimeTravelSpec, writePrivilegesString)
val loaded = createRelation(
catalog, ident, table, u.clearWritePrivileges.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
u.getTagValue(LogicalPlan.PLAN_ID_TAG).map { planId =>
loaded.map { loadedRelation =>
val loadedConnectRelation = loadedRelation.clone()
loadedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
loadedConnectRelation
}
}.getOrElse(loaded)
}
case _ => None
}
}
}

/** Consumes an unresolved relation and resolves it to a v1 or v2 relation or temporary view. */
def resolveRelationOrTempView(u: UnresolvedRelation): LogicalPlan = {
EliminateSubqueryAliases(resolveRelation(u).getOrElse(u))
}
}

/** Handle INSERT INTO for DSv2 */
Expand Down Expand Up @@ -2135,7 +1981,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
if (ResolveFunctions.lookupBuiltinOrTempFunction(nameParts, Some(f)).isDefined) {
f
} else {
val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(nameParts)
val fullName =
normalizeFuncName((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq)
if (externalFunctionNameSet.contains(fullName)) {
Expand Down Expand Up @@ -2186,7 +2033,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolvedNonPersistentFunc(nameParts.head, V1Function(info))
}
}.getOrElse {
val CatalogAndIdentifier(catalog, ident) = expandIdentifier(nameParts)
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(nameParts)
val fullName = catalog.name +: ident.namespace :+ ident.name
CatalogV2Util.loadFunction(catalog, ident).map { func =>
ResolvedPersistentFunc(catalog.asFunctionCatalog, ident, func)
Expand All @@ -2198,7 +2046,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
withPosition(u) {
try {
val resolvedFunc = resolveBuiltinOrTempTableFunction(u.name, u.functionArgs).getOrElse {
val CatalogAndIdentifier(catalog, ident) = expandIdentifier(u.name)
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(u.name)
if (CatalogV2Util.isSessionCatalog(catalog)) {
v1SessionCatalog.resolvePersistentTableFunction(
ident.asFunctionIdentifier, u.functionArgs)
Expand Down Expand Up @@ -2355,7 +2204,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
private[analysis] def resolveFunction(u: UnresolvedFunction): Expression = {
withPosition(u) {
resolveBuiltinOrTempFunction(u.nameParts, u.arguments, u).getOrElse {
val CatalogAndIdentifier(catalog, ident) = expandIdentifier(u.nameParts)
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(u.nameParts)
if (CatalogV2Util.isSessionCatalog(catalog)) {
resolveV1Function(ident.asFunctionIdentifier, u.arguments, u)
} else {
Expand Down
Loading

0 comments on commit 14c01eb

Please sign in to comment.