diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala index 0a9e2b9c5a872..1d072509626b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala @@ -64,11 +64,13 @@ import org.apache.spark.sql.types.MetadataBuilder * operators which are nested in expressions. * @param scopes [[NameScopeStack]] to resolve the expression tree in the correct scope. * @param functionResolution [[FunctionResolution]] to resolve function expressions. + * @param planLogger [[PlanLogger]] to log expression tree resolution events. */ class ExpressionResolver( resolver: Resolver, scopes: NameScopeStack, - functionResolution: FunctionResolution) + functionResolution: FunctionResolution, + planLogger: PlanLogger) extends TreeNodeResolver[Expression, Expression] with ProducesUnresolvedSubtree with ResolvesExpressionChildren @@ -118,7 +120,9 @@ class ExpressionResolver( * In this case `IN` is an expression and `SELECT 1` is a nested operator tree for which * the [[ExpressionResolver]] would invoke the [[Resolver]]. */ - override def resolve(unresolvedExpression: Expression): Expression = + override def resolve(unresolvedExpression: Expression): Expression = { + planLogger.logExpressionTreeResolutionEvent(unresolvedExpression, "Unresolved expression tree") + if (unresolvedExpression .getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY) .nonEmpty) { @@ -126,7 +130,7 @@ class ExpressionResolver( } else { throwIfNodeWasResolvedEarlier(unresolvedExpression) - val resolvedExpr = unresolvedExpression match { + val resolvedExpression = unresolvedExpression match { case unresolvedBinaryArithmetic: BinaryArithmetic => binaryArithmeticResolver.resolve(unresolvedBinaryArithmetic) case unresolvedExtractANSIIntervalDays: ExtractANSIIntervalDays => @@ -157,10 +161,13 @@ class ExpressionResolver( } } - markNodeAsResolved(resolvedExpr) + markNodeAsResolved(resolvedExpression) + + planLogger.logExpressionTreeResolution(unresolvedExpression, resolvedExpression) - resolvedExpr + resolvedExpression } + } private def resolveNamedExpression( unresolvedNamedExpression: Expression, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala index fcf1eab0c04a9..dcddc0f30e258 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.analysis.resolver import org.apache.spark.internal.{Logging, MDC, MessageWithContext} -import org.apache.spark.internal.LogKeys.QUERY_PLAN +import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.internal.SQLConf @@ -27,32 +28,64 @@ import org.apache.spark.sql.internal.SQLConf * [[PlanLogger]] is used by the [[Resolver]] to log intermediate resolution results. */ class PlanLogger extends Logging { - private val logLevel = SQLConf.get.planChangeLogLevel + private val planChangeLogLevel = SQLConf.get.planChangeLogLevel + private val expressionTreeChangeLogLevel = SQLConf.get.expressionTreeChangeLogLevel - /** - * Logs the transition from the `unresolvedPlan` to the `resolvedPlan`. - */ - def log(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = { - logBasedOnLevel(() => createMessage(unresolvedPlan, resolvedPlan)) + def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = { + log(() => log""" + |=== Plan resolution: ${MDC(MESSAGE, event)} === + |${MDC(QUERY_PLAN, plan.treeString)} + """.stripMargin, planChangeLogLevel) } - private def createMessage( - unresolvedPlan: LogicalPlan, - resolvedPlan: LogicalPlan): MessageWithContext = - log""" - |=== Unresolved/resolved operator subtree === + def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = { + log( + () => + log""" + |=== Unresolved plan -> Resolved plan === |${MDC( - QUERY_PLAN, - sideBySide(unresolvedPlan.treeString, resolvedPlan.treeString).mkString("\n") - )} - """.stripMargin - - private def logBasedOnLevel(createMessage: () => MessageWithContext): Unit = logLevel match { - case "TRACE" => logTrace(createMessage().message) - case "DEBUG" => logDebug(createMessage().message) - case "INFO" => logInfo(createMessage()) - case "WARN" => logWarning(createMessage()) - case "ERROR" => logError(createMessage()) - case _ => logTrace(createMessage().message) + QUERY_PLAN, + sideBySide(unresolvedPlan.treeString, resolvedPlan.treeString).mkString("\n") + )} + """.stripMargin, + planChangeLogLevel + ) } + + def logExpressionTreeResolutionEvent(expressionTree: Expression, event: String): Unit = { + log( + () => log""" + |=== Expression tree resolution: ${MDC(MESSAGE, event)} === + |${MDC(QUERY_PLAN, expressionTree.treeString)} + """.stripMargin, + expressionTreeChangeLogLevel + ) + } + + def logExpressionTreeResolution( + unresolvedExpressionTree: Expression, + resolvedExpressionTree: Expression): Unit = { + log( + () => + log""" + |=== Unresolved expression tree -> Resolved expression tree === + |${MDC( + QUERY_PLAN, + sideBySide(unresolvedExpressionTree.treeString, resolvedExpressionTree.treeString) + .mkString("\n") + )} + """.stripMargin, + expressionTreeChangeLogLevel + ) + } + + private def log(createMessage: () => MessageWithContext, logLevel: String): Unit = + logLevel match { + case "TRACE" => logTrace(createMessage().message) + case "DEBUG" => logDebug(createMessage().message) + case "INFO" => logInfo(createMessage()) + case "WARN" => logWarning(createMessage()) + case "ERROR" => logError(createMessage()) + case _ => logTrace(createMessage().message) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala index b0e6828a97a08..37b875abaade6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala @@ -70,11 +70,12 @@ class Resolver( with TracksResolvedNodes[LogicalPlan] with DelegatesResolutionToExtensions { private val scopes = new NameScopeStack + private val planLogger = new PlanLogger private val relationResolution = Resolver.createRelationResolution(catalogManager) private val functionResolution = new FunctionResolution(catalogManager, relationResolution) - private val expressionResolver = new ExpressionResolver(this, scopes, functionResolution) + private val expressionResolver = + new ExpressionResolver(this, scopes, functionResolution, planLogger) private val limitExpressionResolver = new LimitExpressionResolver(expressionResolver) - private val planLogger = new PlanLogger /** * [[relationMetadataProvider]] is used to resolve metadata for relations. It's initialized with @@ -101,6 +102,8 @@ class Resolver( def lookupMetadataAndResolve( unresolvedPlan: LogicalPlan, analyzerBridgeState: Option[AnalyzerBridgeState] = None): LogicalPlan = { + planLogger.logPlanResolutionEvent(unresolvedPlan, "Lookup metadata and resolve") + relationMetadataProvider = analyzerBridgeState match { case Some(analyzerBridgeState) => new BridgedRelationMetadataProvider( @@ -134,6 +137,8 @@ class Resolver( * producing a fully resolved plan or a descriptive error message. */ override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = { + planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan") + throwIfNodeWasResolvedEarlier(unresolvedPlan) val resolvedPlan = @@ -167,7 +172,9 @@ class Resolver( } markNodeAsResolved(resolvedPlan) - planLogger.log(unresolvedPlan, resolvedPlan) + + planLogger.logPlanResolution(unresolvedPlan, resolvedPlan) + resolvedPlan } @@ -260,7 +267,10 @@ class Resolver( private def resolveRelation(unresolvedRelation: UnresolvedRelation): LogicalPlan = { relationMetadataProvider.getRelationWithResolvedMetadata(unresolvedRelation) match { case Some(relationWithResolvedMetadata) => - planLogger.log(unresolvedRelation, relationWithResolvedMetadata) + planLogger.logPlanResolutionEvent( + relationWithResolvedMetadata, + "Relation metadata retrieved" + ) withPosition(unresolvedRelation) { resolve(relationWithResolvedMetadata) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index be883b2112d19..67b5e45c7f8e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -400,6 +400,19 @@ object SQLConf { .booleanConf .createWithDefault(Utils.isTesting) + val EXPRESSION_TREE_CHANGE_LOG_LEVEL = buildConf("spark.sql.expressionTreeChangeLog.level") + .internal() + .doc("Configures the log level for logging the change from the unresolved expression tree to " + + "the resolved expression tree in the single-pass bottom-up Resolver. The value can be " + + "'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'trace'.") + .version("4.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + "Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation") .internal() .doc(s"Similar to ${PLAN_CHANGE_VALIDATION.key}, this validates plan changes and runs after " + @@ -5578,6 +5591,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES) + def expressionTreeChangeLogLevel: String = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL) + def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED) def dynamicPartitionPruningUseStats: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala index 28ccebc89bc52..b7bf73f326fa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionResolution import org.apache.spark.sql.catalyst.analysis.resolver.{ ExpressionResolver, NameScopeStack, + PlanLogger, Resolver } import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, ExprId} @@ -127,7 +128,8 @@ class TracksResolvedNodesSuite extends QueryTest with SharedSparkSession { new FunctionResolution( spark.sessionState.catalogManager, Resolver.createRelationResolution(spark.sessionState.catalogManager) - ) + ), + new PlanLogger ) } }