Skip to content

Commit

Permalink
Refactor Union output computation out to reuse it in the single-pass …
Browse files Browse the repository at this point in the history
…Analyzer
  • Loading branch information
vladimirg-db committed Dec 24, 2024
1 parent 2c1c4d2 commit 895ae37
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ object Union {
def apply(left: LogicalPlan, right: LogicalPlan): Union = {
Union (left :: right :: Nil)
}

// updating nullability to make all the children consistent
def mergeChildOutputs(childOutputs: Seq[Seq[Attribute]]): Seq[Attribute] = {
childOutputs.transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
}
}
}
}

/**
Expand Down Expand Up @@ -526,20 +541,7 @@ case class Union(

private lazy val lazyOutput: Seq[Attribute] = computeOutput()

// updating nullability to make all the children consistent
private def computeOutput(): Seq[Attribute] = {
children.map(_.output).transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
}
}
}
private def computeOutput(): Seq[Attribute] = Union.mergeChildOutputs(children.map(_.output))

/**
* Maps the constraints containing a given (original) sequence of attributes to those with a
Expand Down

0 comments on commit 895ae37

Please sign in to comment.