Skip to content

Commit

Permalink
Improve logging in single-pass Analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirg-db committed Dec 23, 2024
1 parent 876450c commit 0dd8452
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ import org.apache.spark.sql.types.MetadataBuilder
* operators which are nested in expressions.
* @param scopes [[NameScopeStack]] to resolve the expression tree in the correct scope.
* @param functionResolution [[FunctionResolution]] to resolve function expressions.
* @param planLogger [[PlanLogger]] to log expression tree resolution events.
*/
class ExpressionResolver(
resolver: Resolver,
scopes: NameScopeStack,
functionResolution: FunctionResolution)
functionResolution: FunctionResolution,
planLogger: PlanLogger)
extends TreeNodeResolver[Expression, Expression]
with ProducesUnresolvedSubtree
with ResolvesExpressionChildren
Expand Down Expand Up @@ -118,15 +120,17 @@ class ExpressionResolver(
* In this case `IN` is an expression and `SELECT 1` is a nested operator tree for which
* the [[ExpressionResolver]] would invoke the [[Resolver]].
*/
override def resolve(unresolvedExpression: Expression): Expression =
override def resolve(unresolvedExpression: Expression): Expression = {
planLogger.logExpressionTreeResolutionEvent(unresolvedExpression, "Unresolved expression tree")

if (unresolvedExpression
.getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY)
.nonEmpty) {
unresolvedExpression
} else {
throwIfNodeWasResolvedEarlier(unresolvedExpression)

val resolvedExpr = unresolvedExpression match {
val resolvedExpression = unresolvedExpression match {
case unresolvedBinaryArithmetic: BinaryArithmetic =>
binaryArithmeticResolver.resolve(unresolvedBinaryArithmetic)
case unresolvedExtractANSIIntervalDays: ExtractANSIIntervalDays =>
Expand Down Expand Up @@ -157,10 +161,13 @@ class ExpressionResolver(
}
}

markNodeAsResolved(resolvedExpr)
markNodeAsResolved(resolvedExpression)

planLogger.logExpressionTreeResolution(unresolvedExpression, resolvedExpression)

resolvedExpr
resolvedExpression
}
}

private def resolveNamedExpression(
unresolvedNamedExpression: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys.QUERY_PLAN
import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -27,32 +28,64 @@ import org.apache.spark.sql.internal.SQLConf
* [[PlanLogger]] is used by the [[Resolver]] to log intermediate resolution results.
*/
class PlanLogger extends Logging {
private val logLevel = SQLConf.get.planChangeLogLevel
private val planChangeLogLevel = SQLConf.get.planChangeLogLevel
private val expressionTreeChangeLogLevel = SQLConf.get.expressionTreeChangeLogLevel

/**
* Logs the transition from the `unresolvedPlan` to the `resolvedPlan`.
*/
def log(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
logBasedOnLevel(() => createMessage(unresolvedPlan, resolvedPlan))
def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = {
log(() => log"""
|=== Plan resolution: ${MDC(MESSAGE, event)} ===
|${MDC(QUERY_PLAN, plan.treeString)}
""".stripMargin, planChangeLogLevel)
}

private def createMessage(
unresolvedPlan: LogicalPlan,
resolvedPlan: LogicalPlan): MessageWithContext =
log"""
|=== Unresolved/resolved operator subtree ===
def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
log(
() =>
log"""
|=== Unresolved plan -> Resolved plan ===
|${MDC(
QUERY_PLAN,
sideBySide(unresolvedPlan.treeString, resolvedPlan.treeString).mkString("\n")
)}
""".stripMargin

private def logBasedOnLevel(createMessage: () => MessageWithContext): Unit = logLevel match {
case "TRACE" => logTrace(createMessage().message)
case "DEBUG" => logDebug(createMessage().message)
case "INFO" => logInfo(createMessage())
case "WARN" => logWarning(createMessage())
case "ERROR" => logError(createMessage())
case _ => logTrace(createMessage().message)
QUERY_PLAN,
sideBySide(unresolvedPlan.treeString, resolvedPlan.treeString).mkString("\n")
)}
""".stripMargin,
planChangeLogLevel
)
}

def logExpressionTreeResolutionEvent(expressionTree: Expression, event: String): Unit = {
log(
() => log"""
|=== Expression tree resolution: ${MDC(MESSAGE, event)} ===
|${MDC(QUERY_PLAN, expressionTree.treeString)}
""".stripMargin,
expressionTreeChangeLogLevel
)
}

def logExpressionTreeResolution(
unresolvedExpressionTree: Expression,
resolvedExpressionTree: Expression): Unit = {
log(
() =>
log"""
|=== Unresolved expression tree -> Resolved expression tree ===
|${MDC(
QUERY_PLAN,
sideBySide(unresolvedExpressionTree.treeString, resolvedExpressionTree.treeString)
.mkString("\n")
)}
""".stripMargin,
expressionTreeChangeLogLevel
)
}

private def log(createMessage: () => MessageWithContext, logLevel: String): Unit =
logLevel match {
case "TRACE" => logTrace(createMessage().message)
case "DEBUG" => logDebug(createMessage().message)
case "INFO" => logInfo(createMessage())
case "WARN" => logWarning(createMessage())
case "ERROR" => logError(createMessage())
case _ => logTrace(createMessage().message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ class Resolver(
with TracksResolvedNodes[LogicalPlan]
with DelegatesResolutionToExtensions {
private val scopes = new NameScopeStack
private val planLogger = new PlanLogger
private val relationResolution = Resolver.createRelationResolution(catalogManager)
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
private val expressionResolver = new ExpressionResolver(this, scopes, functionResolution)
private val expressionResolver =
new ExpressionResolver(this, scopes, functionResolution, planLogger)
private val limitExpressionResolver = new LimitExpressionResolver(expressionResolver)
private val planLogger = new PlanLogger

/**
* [[relationMetadataProvider]] is used to resolve metadata for relations. It's initialized with
Expand All @@ -101,6 +102,8 @@ class Resolver(
def lookupMetadataAndResolve(
unresolvedPlan: LogicalPlan,
analyzerBridgeState: Option[AnalyzerBridgeState] = None): LogicalPlan = {
planLogger.logPlanResolutionEvent(unresolvedPlan, "Lookup metadata and resolve")

relationMetadataProvider = analyzerBridgeState match {
case Some(analyzerBridgeState) =>
new BridgedRelationMetadataProvider(
Expand Down Expand Up @@ -134,6 +137,8 @@ class Resolver(
* producing a fully resolved plan or a descriptive error message.
*/
override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = {
planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan")

throwIfNodeWasResolvedEarlier(unresolvedPlan)

val resolvedPlan =
Expand Down Expand Up @@ -167,7 +172,9 @@ class Resolver(
}

markNodeAsResolved(resolvedPlan)
planLogger.log(unresolvedPlan, resolvedPlan)

planLogger.logPlanResolution(unresolvedPlan, resolvedPlan)

resolvedPlan
}

Expand Down Expand Up @@ -260,7 +267,10 @@ class Resolver(
private def resolveRelation(unresolvedRelation: UnresolvedRelation): LogicalPlan = {
relationMetadataProvider.getRelationWithResolvedMetadata(unresolvedRelation) match {
case Some(relationWithResolvedMetadata) =>
planLogger.log(unresolvedRelation, relationWithResolvedMetadata)
planLogger.logPlanResolutionEvent(
relationWithResolvedMetadata,
"Relation metadata retrieved"
)

withPosition(unresolvedRelation) {
resolve(relationWithResolvedMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,19 @@ object SQLConf {
.booleanConf
.createWithDefault(Utils.isTesting)

val EXPRESSION_TREE_CHANGE_LOG_LEVEL = buildConf("spark.sql.expressionTreeChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the unresolved expression tree to " +
"the resolved expression tree in the single-pass bottom-up Resolver. The value can be " +
"'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'trace'.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
"Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " +
"'trace', 'debug', 'info', 'warn' and 'error'.")
.createWithDefault("trace")

val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation")
.internal()
.doc(s"Similar to ${PLAN_CHANGE_VALIDATION.key}, this validates plan changes and runs after " +
Expand Down Expand Up @@ -5578,6 +5591,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)

def expressionTreeChangeLogLevel: String = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)

def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)

def dynamicPartitionPruningUseStats: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
Expand Down

0 comments on commit 0dd8452

Please sign in to comment.