Skip to content

Commit

Permalink
[SPARK-47305][SQL] Fix PruneFilters to tag the isStreaming flag of Lo…
Browse files Browse the repository at this point in the history
…calRelation correctly when the plan has both batch and streaming

### What changes were proposed in this pull request?

This PR proposes to fix PruneFilters to tag the isStreaming flag of LocalRelation correctly when the plan has both batch and streaming.

### Why are the changes needed?

When filter is evaluated to be always false, PruneFilters replaces the filter with empty LocalRelation, which effectively prunes filter. The logic cares about migration of the isStreaming flag, but incorrectly migrated in some case, via picking up the value of isStreaming flag from root node rather than filter (or child).

isStreaming flag is true if the value of isStreaming flag from any of children is true. Flipping the coin, some children might have isStreaming flag as "false". If the filter being pruned is a descendant to such children (in other word, ancestor of streaming node), LocalRelation is incorrectly tagged as streaming where it should be batch.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT verifying the fix. The new UT fails without this PR and passes with this PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45406 from HeartSaVioR/SPARK-47305.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 8d6bd9b)
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Mar 7, 2024
1 parent 245a33c commit 7e5d592
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1636,9 +1636,9 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
// If the filter condition always evaluate to null or false,
// replace the input with an empty relation.
case Filter(Literal(null, _), child) =>
LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
case Filter(Literal(false, BooleanType), child) =>
LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming)
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
// If any deterministic condition is guaranteed to be true given the constraints on the child's
// output, remove the condition
case f @ Filter(fc, p: LogicalPlan) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Literal, UnspecifiedFrame}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, UnspecifiedFrame}
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Expand, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StructType}

Expand Down Expand Up @@ -221,6 +221,45 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("SPARK-47305 correctly tag isStreaming when propagating empty relation " +
"with the plan containing batch and streaming") {
val data = Seq(Row(1))

val outputForStream = Seq($"a".int)
val schemaForStream = DataTypeUtils.fromAttributes(outputForStream)
val converterForStream = CatalystTypeConverters.createToCatalystConverter(schemaForStream)

val outputForBatch = Seq($"b".int)
val schemaForBatch = DataTypeUtils.fromAttributes(outputForBatch)
val converterForBatch = CatalystTypeConverters.createToCatalystConverter(schemaForBatch)

val streamingRelation = LocalRelation(
outputForStream,
data.map(converterForStream(_).asInstanceOf[InternalRow]),
isStreaming = true)
val batchRelation = LocalRelation(
outputForBatch,
data.map(converterForBatch(_).asInstanceOf[InternalRow]),
isStreaming = false)

val query = streamingRelation
.join(batchRelation.where(false).select($"b"), LeftOuter,
Some(EqualTo($"a", $"b")))

val analyzedQuery = query.analyze

val optimized = Optimize.execute(analyzedQuery)
// This is to deal with analysis for join condition. We expect the analyzed plan to contain
// filter and projection in batch relation, and know they will go away after optimization.
// The point to check here is that the node is replaced with "empty" LocalRelation, but the
// flag `isStreaming` is properly propagated.
val correctAnswer = analyzedQuery transform {
case Project(_, Filter(_, l: LocalRelation)) => l.copy(data = Seq.empty)
}

comparePlans(optimized, correctAnswer)
}

test("don't propagate empty streaming relation through agg") {
val output = Seq($"a".int)
val data = Seq(Row(1))
Expand Down

0 comments on commit 7e5d592

Please sign in to comment.