Skip to content

Commit

Permalink
[SPARK-50650][SQL] Improve logging in single-pass Analyzer
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. Log initial unresolved plans. This was we see the full plan, and track the downwards traversal.
2. Log expression tree changes in the same manner as operator tree changes.

### Why are the changes needed?

To make single-pass Analyzer debugging easier.

Examples:
![image](https://github.com/user-attachments/assets/507c3daf-0fe2-48a9-a66c-73e038550511)

![image](https://github.com/user-attachments/assets/8d8c9fbc-7371-4d29-a8c5-71aba57a65e5)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49271 from vladimirg-db/vladimirg-db/single-pass-analyzer/improve-plan-logger.

Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
vladimirg-db authored and cloud-fan committed Dec 26, 2024
1 parent be2da52 commit a483dfd
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ import org.apache.spark.sql.types.MetadataBuilder
* operators which are nested in expressions.
* @param scopes [[NameScopeStack]] to resolve the expression tree in the correct scope.
* @param functionResolution [[FunctionResolution]] to resolve function expressions.
* @param planLogger [[PlanLogger]] to log expression tree resolution events.
*/
class ExpressionResolver(
resolver: Resolver,
scopes: NameScopeStack,
functionResolution: FunctionResolution)
functionResolution: FunctionResolution,
planLogger: PlanLogger)
extends TreeNodeResolver[Expression, Expression]
with ProducesUnresolvedSubtree
with ResolvesExpressionChildren
Expand Down Expand Up @@ -118,15 +120,17 @@ class ExpressionResolver(
* In this case `IN` is an expression and `SELECT 1` is a nested operator tree for which
* the [[ExpressionResolver]] would invoke the [[Resolver]].
*/
override def resolve(unresolvedExpression: Expression): Expression =
override def resolve(unresolvedExpression: Expression): Expression = {
planLogger.logExpressionTreeResolutionEvent(unresolvedExpression, "Unresolved expression tree")

if (unresolvedExpression
.getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY)
.nonEmpty) {
unresolvedExpression
} else {
throwIfNodeWasResolvedEarlier(unresolvedExpression)

val resolvedExpr = unresolvedExpression match {
val resolvedExpression = unresolvedExpression match {
case unresolvedBinaryArithmetic: BinaryArithmetic =>
binaryArithmeticResolver.resolve(unresolvedBinaryArithmetic)
case unresolvedExtractANSIIntervalDays: ExtractANSIIntervalDays =>
Expand Down Expand Up @@ -157,10 +161,13 @@ class ExpressionResolver(
}
}

markNodeAsResolved(resolvedExpr)
markNodeAsResolved(resolvedExpression)

planLogger.logExpressionTreeResolution(unresolvedExpression, resolvedExpression)

resolvedExpr
resolvedExpression
}
}

private def resolveNamedExpression(
unresolvedNamedExpression: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys.QUERY_PLAN
import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -27,32 +28,71 @@ import org.apache.spark.sql.internal.SQLConf
* [[PlanLogger]] is used by the [[Resolver]] to log intermediate resolution results.
*/
class PlanLogger extends Logging {
private val logLevel = SQLConf.get.planChangeLogLevel
private val planChangeLogLevel = SQLConf.get.planChangeLogLevel
private val expressionTreeChangeLogLevel = SQLConf.get.expressionTreeChangeLogLevel

/**
* Logs the transition from the `unresolvedPlan` to the `resolvedPlan`.
*/
def log(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
logBasedOnLevel(() => createMessage(unresolvedPlan, resolvedPlan))
def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = {
log(() => log"""
|=== Plan resolution: ${MDC(MESSAGE, event)} ===
|${MDC(QUERY_PLAN, plan.treeString)}
""".stripMargin, planChangeLogLevel)
}

private def createMessage(
unresolvedPlan: LogicalPlan,
resolvedPlan: LogicalPlan): MessageWithContext =
log"""
|=== Unresolved/resolved operator subtree ===
def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
log(
() =>
log"""
|=== Unresolved plan -> Resolved plan ===
|${MDC(
QUERY_PLAN,
sideBySide(unresolvedPlan.treeString, resolvedPlan.treeString).mkString("\n")
)}
""".stripMargin

private def logBasedOnLevel(createMessage: () => MessageWithContext): Unit = logLevel match {
case "TRACE" => logTrace(createMessage().message)
case "DEBUG" => logDebug(createMessage().message)
case "INFO" => logInfo(createMessage())
case "WARN" => logWarning(createMessage())
case "ERROR" => logError(createMessage())
case _ => logTrace(createMessage().message)
QUERY_PLAN,
sideBySide(
unresolvedPlan.withNewChildren(resolvedPlan.children).treeString,
resolvedPlan.treeString
).mkString("\n")
)}
""".stripMargin,
planChangeLogLevel
)
}

def logExpressionTreeResolutionEvent(expressionTree: Expression, event: String): Unit = {
log(
() => log"""
|=== Expression tree resolution: ${MDC(MESSAGE, event)} ===
|${MDC(QUERY_PLAN, expressionTree.treeString)}
""".stripMargin,
expressionTreeChangeLogLevel
)
}

def logExpressionTreeResolution(
unresolvedExpressionTree: Expression,
resolvedExpressionTree: Expression): Unit = {
log(
() =>
log"""
|=== Unresolved expression tree -> Resolved expression tree ===
|${MDC(
QUERY_PLAN,
sideBySide(
unresolvedExpressionTree
.withNewChildren(resolvedExpressionTree.children)
.treeString,
resolvedExpressionTree.treeString
).mkString("\n")
)}
""".stripMargin,
expressionTreeChangeLogLevel
)
}

private def log(createMessage: () => MessageWithContext, logLevel: String): Unit =
logLevel match {
case "TRACE" => logTrace(createMessage().message)
case "DEBUG" => logDebug(createMessage().message)
case "INFO" => logInfo(createMessage())
case "WARN" => logWarning(createMessage())
case "ERROR" => logError(createMessage())
case _ => logTrace(createMessage().message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ class Resolver(
with TracksResolvedNodes[LogicalPlan]
with DelegatesResolutionToExtensions {
private val scopes = new NameScopeStack
private val planLogger = new PlanLogger
private val relationResolution = Resolver.createRelationResolution(catalogManager)
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
private val expressionResolver = new ExpressionResolver(this, scopes, functionResolution)
private val expressionResolver =
new ExpressionResolver(this, scopes, functionResolution, planLogger)
private val limitExpressionResolver = new LimitExpressionResolver(expressionResolver)
private val planLogger = new PlanLogger

/**
* [[relationMetadataProvider]] is used to resolve metadata for relations. It's initialized with
Expand All @@ -101,6 +102,8 @@ class Resolver(
def lookupMetadataAndResolve(
unresolvedPlan: LogicalPlan,
analyzerBridgeState: Option[AnalyzerBridgeState] = None): LogicalPlan = {
planLogger.logPlanResolutionEvent(unresolvedPlan, "Lookup metadata and resolve")

relationMetadataProvider = analyzerBridgeState match {
case Some(analyzerBridgeState) =>
new BridgedRelationMetadataProvider(
Expand Down Expand Up @@ -134,6 +137,8 @@ class Resolver(
* producing a fully resolved plan or a descriptive error message.
*/
override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = {
planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan")

throwIfNodeWasResolvedEarlier(unresolvedPlan)

val resolvedPlan =
Expand Down Expand Up @@ -167,7 +172,9 @@ class Resolver(
}

markNodeAsResolved(resolvedPlan)
planLogger.log(unresolvedPlan, resolvedPlan)

planLogger.logPlanResolution(unresolvedPlan, resolvedPlan)

resolvedPlan
}

Expand Down Expand Up @@ -260,7 +267,10 @@ class Resolver(
private def resolveRelation(unresolvedRelation: UnresolvedRelation): LogicalPlan = {
relationMetadataProvider.getRelationWithResolvedMetadata(unresolvedRelation) match {
case Some(relationWithResolvedMetadata) =>
planLogger.log(unresolvedRelation, relationWithResolvedMetadata)
planLogger.logPlanResolutionEvent(
relationWithResolvedMetadata,
"Relation metadata retrieved"
)

withPosition(unresolvedRelation) {
resolve(relationWithResolvedMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,19 @@ object SQLConf {
.booleanConf
.createWithDefault(Utils.isTesting)

val EXPRESSION_TREE_CHANGE_LOG_LEVEL = buildConf("spark.sql.expressionTreeChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the unresolved expression tree to " +
"the resolved expression tree in the single-pass bottom-up Resolver. The value can be " +
"'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'trace'.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
"Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " +
"'trace', 'debug', 'info', 'warn' and 'error'.")
.createWithDefault("trace")

val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation")
.internal()
.doc(s"Similar to ${PLAN_CHANGE_VALIDATION.key}, this validates plan changes and runs after " +
Expand Down Expand Up @@ -5578,6 +5591,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)

def expressionTreeChangeLogLevel: String = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)

def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)

def dynamicPartitionPruningUseStats: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionResolution
import org.apache.spark.sql.catalyst.analysis.resolver.{
ExpressionResolver,
NameScopeStack,
PlanLogger,
Resolver
}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, ExprId}
Expand Down Expand Up @@ -127,7 +128,8 @@ class TracksResolvedNodesSuite extends QueryTest with SharedSparkSession {
new FunctionResolution(
spark.sessionState.catalogManager,
Resolver.createRelationResolution(spark.sessionState.catalogManager)
)
),
new PlanLogger
)
}
}

0 comments on commit a483dfd

Please sign in to comment.