Skip to content

Commit

Permalink
Preserve original field order when merging schemas
Browse files Browse the repository at this point in the history
See snowplow/schema-ddl#213

When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a
merged struct column combining fields from new and old schema.

For some loaders it is important that newly-added nested fields come
after the original fields. E.g. Lake Loader with Hudi and Glue sync
enabled.
  • Loading branch information
istreeter committed Nov 25, 2024
1 parent 937db11 commit 0e3484f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "10-0-0"
},
"type": "object",
"properties": {
"col_m": {"type": "string"},
"col_n": {"type": "string"}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "10-0-1"
},
"type": "object",
"properties": {
"col_a": {"type": "string"},
"col_m": {"type": "string"},
"col_z": {"type": "string"}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
return a JSON field for the Iglu Central anything-a schema $ue5
return a field prefixed with underscore if field starts with a digit $ueDigit
return a merged schema if the batch has a schema with empty properties and additionalProperties=false $emptyProp
return a merged struct with field order prioritizing original fields before new additions $fieldOrder

when resolving for known schemas in contexts should
return an un-merged schema if the batch uses the first schema in a series $c1
Expand Down Expand Up @@ -96,8 +97,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Field("col_b", Type.String, Nullable),
Field("col_c", Type.String, Nullable)
)
)

Expand Down Expand Up @@ -131,8 +132,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Field("col_b", Type.String, Nullable),
Field("col_c", Type.String, Nullable)
)
)

Expand Down Expand Up @@ -261,6 +262,44 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
}
}

def fieldOrder = {

val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "myvendor", "myschema", 10)

val input = Map(
tabledEntity -> Set((0, 0), (0, 1))
)

val expected = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
// original fields
Field("col_m", Type.String, Nullable),
Field("col_n", Type.String, Nullable),
// newly added fields
Field("col_a", Type.String, Nullable), // earlier alphabetically
Field("col_z", Type.String, Nullable) // later alphabetically
)
)

val expectedField = Field("unstruct_event_myvendor_myschema_10", expectedStruct, Nullable, Set.empty)

TypedTabledEntity(
tabledEntity,
expectedField,
Set((0, 0), (0, 1)),
Nil
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
}

}

def c1 = {

val tabledEntity = TabledEntity(TabledEntity.Context, "myvendor", "myschema", 7)
Expand Down Expand Up @@ -310,8 +349,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
NonEmptyVector.of(
Field("_schema_version", Type.String, Required),
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Field("col_b", Type.String, Nullable),
Field("col_c", Type.String, Nullable)
)
)

Expand Down Expand Up @@ -349,8 +388,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
NonEmptyVector.of(
Field("_schema_version", Type.String, Required),
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Field("col_b", Type.String, Nullable),
Field("col_c", Type.String, Nullable)
)
)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Dependencies {
val azureSdk = "1.11.4"

// Snowplow
val schemaDdl = "0.25.0"
val schemaDdl = "0.26.0"
val badrows = "2.3.0"
val igluClient = "4.0.0"
val tracker = "2.0.0"
Expand Down

0 comments on commit 0e3484f

Please sign in to comment.