Skip to content

Commit

Permalink
[SPARK-50659][SQL] Refactor Union output computation out to reuse it …
Browse files Browse the repository at this point in the history
…in the single-pass Analyzer

### What changes were proposed in this pull request?

Refactor `computeOutput` out to the `Union` companion object.

### Why are the changes needed?

To reuse this piece of code in the single-pass Analyzer.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49283 from vladimirg-db/vladimirg-db/refactor-out-union-compute-output-to-use-in-single-pass-analyzer.

Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
vladimirg-db authored and cloud-fan committed Dec 25, 2024
1 parent 495e248 commit 062bc4c
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ object Union {
def apply(left: LogicalPlan, right: LogicalPlan): Union = {
Union (left :: right :: Nil)
}

// updating nullability to make all the children consistent
def mergeChildOutputs(childOutputs: Seq[Seq[Attribute]]): Seq[Attribute] = {
childOutputs.transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
}
}
}
}

/**
Expand Down Expand Up @@ -526,20 +541,7 @@ case class Union(

private lazy val lazyOutput: Seq[Attribute] = computeOutput()

// updating nullability to make all the children consistent
private def computeOutput(): Seq[Attribute] = {
children.map(_.output).transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
}
}
}
private def computeOutput(): Seq[Attribute] = Union.mergeChildOutputs(children.map(_.output))

/**
* Maps the constraints containing a given (original) sequence of attributes to those with a
Expand Down

0 comments on commit 062bc4c

Please sign in to comment.