Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve original field order when merging parquet Fields #213

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ object Field {
Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}

def normalize(field: Field): Field = {
val fieldType = field.fieldType match {
case Type.Struct(fields) => Type.Struct(collapseDuplicateFields(fields.map(normalize)))
case Type.Array(Type.Struct(fields), nullability) => Type.Array(
Type.Struct(collapseDuplicateFields(fields.map(normalize)))
, nullability)
def normalize(field: Field): Field =
field.copy(name = normalizeName(field), fieldType = normalizeType(field.fieldType))

private def normalizeType(t: Type): Type =
t match {
case Type.Struct(fields) => Type.Struct(collapseDuplicateFields(fields.map(normalize)).sortBy(_.name))
case Type.Array(el, nullability) => Type.Array(normalizeType(el), nullability)
case other => other
}
field.copy(name = normalizeName(field), fieldType = fieldType)
}

private def collapseDuplicateFields(normFields: NonEmptyVector[Field]): NonEmptyVector[Field] = {
val endMap = normFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ object Migrations {
val mergedType: Option[Type] = sourceType match {
case sourceStruct@Struct(sourceFields) => targetType match {
case [email protected](targetFields) =>
val forwardMigration = sourceFields.map(srcField => MigrationFieldPair(srcField.name :: path, srcField, targetStruct.focus(srcField.name)).migrations)
val forwardMigration = sourceFields.map {
srcField =>
MigrationFieldPair(srcField.name :: path, srcField, targetStruct.focus(srcField.name)).migrations
}
benjben marked this conversation as resolved.
Show resolved Hide resolved

// Comparing struct target fields to the source. This will detect additions.
val reverseMigration = targetFields.map(tgtField => MigrationFieldPair(tgtField.name :: path, tgtField, sourceStruct.focus(tgtField.name)).migrations)
val reverseMigration = targetFields.map {
tgtField =>
MigrationFieldPair(tgtField.name :: path, tgtField, sourceStruct.focus(tgtField.name)).migrations
}

migrations ++= forwardMigration.iterator.flatMap(_.migrations)

Expand All @@ -96,22 +102,23 @@ object Migrations {
val tgtFields = reverseMigration.toVector.traverse(_.result).toVector.flatten
val tgtFieldNames = tgtFields.map(_.name)
val allSrcFields = forwardMigration.toVector.traverse(_.result).toVector.flatten
val allSrcFieldMap = allSrcFields.map(f => f.name -> f).toMap
// swap fields in src and target as they would be rearranged in nested structs or arrays
val reorderedTgtFields = tgtFields.map { t =>
allSrcFieldMap.get(t.name) match {
case Some(value) if value.fieldType.isInstanceOf[Struct] => value
case Some(value) if value.fieldType.isInstanceOf[Array] => value
case _ => t
}
val allSrcFieldNames = allSrcFields.map(_.name)

val srcFields: Vector[Field] = allSrcFields.map {
srcField =>
if (tgtFieldNames.contains(srcField.name))
srcField
else
// drop not null constraints from removed fields.
srcField.copy(nullability = Type.Nullability.Nullable)
}
val newTgtFields = tgtFields.filter {
tgtField =>
!allSrcFieldNames.contains(tgtField.name)
}
val srcFields: Vector[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
// drop not null constrains from removed fields.
_.copy(nullability = Type.Nullability.Nullable)
)

// failed migration would produce no fields in source
NonEmptyVector.fromVector(reorderedTgtFields ++ srcFields).map { nonEmpty =>
NonEmptyVector.fromVector(srcFields ++ newTgtFields).map { nonEmpty =>
Type.Struct(nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
normalName handles camel case and disallowed characters $e13
normalize would collapse colliding names $e14
normalize would collapse colliding names with deterministic type selection $e15
normalize would sort fields in order of name $e16
normalize would sort nested fields in order of name $e17
"""

// a helper
Expand Down Expand Up @@ -409,15 +411,15 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
fieldType = Type.Struct(
fields = NonEmptyVector.of(
Field(
name = "_ga",
name = "__b",
fieldType = Type.Integer,
nullability = Nullable,
accessors = Set("_ga", "_Ga")
nullability = Nullable
),
Field(
name = "__b",
name = "_ga",
fieldType = Type.Integer,
nullability = Nullable
nullability = Nullable,
accessors = Set("_ga", "_Ga")
),
)
),
Expand Down Expand Up @@ -458,5 +460,65 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
(input1 must_== expected) and (input2 must_== expected)
}

def e16 = {
val input = {
val struct = Type.Struct(
NonEmptyVector.of(
Field("Apple", Type.String, Nullable),
Field("cherry", Type.String, Nullable),
Field("banana", Type.String, Nullable),
Field("Damson", Type.String, Nullable)
)
)
Field("top", struct, Nullable)
}

val expected = {
val struct = Type.Struct(
NonEmptyVector.of(
Field("apple", Type.String, Nullable, Set("Apple")),
Field("banana", Type.String, Nullable, Set("banana")),
Field("cherry", Type.String, Nullable, Set("cherry")),
Field("damson", Type.String, Nullable, Set("Damson"))
)
)
Field("top", struct, Nullable)
}

Field.normalize(input) must beEqualTo(expected)
}

def e17 = {
val input = {
val nested = Type.Struct(
NonEmptyVector.of(
Field("Apple", Type.String, Nullable),
Field("cherry", Type.String, Nullable),
Field("banana", Type.String, Nullable),
Field("Damson", Type.String, Nullable)
)
)
val arr = Field("nested_arr", Type.Array(nested, Nullable), Nullable)
val struct = Field("nested_obj", nested, Nullable)
Field("top", Type.Struct(NonEmptyVector.of(arr, struct)), Nullable)
}

val expected = {
val nested = Type.Struct(
NonEmptyVector.of(
Field("apple", Type.String, Nullable, Set("Apple")),
Field("banana", Type.String, Nullable, Set("banana")),
Field("cherry", Type.String, Nullable, Set("cherry")),
Field("damson", Type.String, Nullable, Set("Damson"))
)
)
val arr = Field("nested_arr", Type.Array(nested, Nullable), Nullable)
val struct = Field("nested_obj", nested, Nullable)
Field("top", Type.Struct(NonEmptyVector.of(arr, struct)), Nullable)
}

Field.normalize(input) must beEqualTo(expected)
}

private def fieldNormalName(name: String) = Field.normalizeName(Field(name, Type.String, nullability = Nullable))
}
Loading
Loading