From 0e3484fc9bef8561092797477e4ea3375c155b91 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 15 Nov 2024 17:21:18 +0000 Subject: [PATCH] Preserve original field order when merging schemas See snowplow/schema-ddl#213 When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a merged struct column combining fields from new and old schema. For some loaders it is important that newly-added nested fields come after the original fields. E.g. Lake Loader with Hudi and Glue sync enabled. --- .../myvendor/myschema/jsonschema/10-0-0 | 14 +++++ .../myvendor/myschema/jsonschema/10-0-1 | 15 +++++ .../transform/NonAtomicFieldsSpec.scala | 55 ++++++++++++++++--- project/Dependencies.scala | 2 +- 4 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-0 create mode 100644 modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-1 diff --git a/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-0 b/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-0 new file mode 100644 index 0000000..e3a3e92 --- /dev/null +++ b/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "myschema", + "format": "jsonschema", + "version": "10-0-0" + }, + "type": "object", + "properties": { + "col_m": {"type": "string"}, + "col_n": {"type": "string"} + } +} diff --git a/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-1 b/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-1 new file mode 100644 index 0000000..4a9f9cc --- /dev/null +++ b/modules/loaders-common/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/10-0-1 @@ -0,0 +1,15 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "myschema", + "format": "jsonschema", + "version": "10-0-1" + }, + "type": "object", + "properties": { + "col_a": {"type": "string"}, + "col_m": {"type": "string"}, + "col_z": {"type": "string"} + } +} diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala index 1092231..b747fa8 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala @@ -30,6 +30,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { return a JSON field for the Iglu Central anything-a schema $ue5 return a field prefixed with underscore if field starts with a digit $ueDigit return a merged schema if the batch has a schema with empty properties and additionalProperties=false $emptyProp + return a merged struct with field order prioritizing original fields before new additions $fieldOrder when resolving for known schemas in contexts should return an un-merged schema if the batch uses the first schema in a series $c1 @@ -96,8 +97,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expectedStruct = Type.Struct( NonEmptyVector.of( Field("col_a", Type.String, Required), - Field("col_c", Type.String, Nullable), - Field("col_b", Type.String, Nullable) + Field("col_b", Type.String, Nullable), + Field("col_c", Type.String, Nullable) ) ) @@ -131,8 +132,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expectedStruct = Type.Struct( NonEmptyVector.of( Field("col_a", Type.String, Required), - Field("col_c", Type.String, Nullable), - Field("col_b", Type.String, Nullable) + Field("col_b", Type.String, Nullable), + Field("col_c", Type.String, Nullable) ) ) @@ -261,6 +262,44 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { } } + def fieldOrder = { + + val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "myvendor", "myschema", 10) + + val input = Map( + tabledEntity -> Set((0, 0), (0, 1)) + ) + + val expected = { + val expectedStruct = Type.Struct( + NonEmptyVector.of( + // original fields + Field("col_m", Type.String, Nullable), + Field("col_n", Type.String, Nullable), + // newly added fields + Field("col_a", Type.String, Nullable), // earlier alphabetically + Field("col_z", Type.String, Nullable) // later alphabetically + ) + ) + + val expectedField = Field("unstruct_event_myvendor_myschema_10", expectedStruct, Nullable, Set.empty) + + TypedTabledEntity( + tabledEntity, + expectedField, + Set((0, 0), (0, 1)), + Nil + ) + } + + NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) => + (failures must beEmpty) and + (fields must haveSize(1)) and + (fields.head must beEqualTo(expected)) + } + + } + def c1 = { val tabledEntity = TabledEntity(TabledEntity.Context, "myvendor", "myschema", 7) @@ -310,8 +349,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { NonEmptyVector.of( Field("_schema_version", Type.String, Required), Field("col_a", Type.String, Required), - Field("col_c", Type.String, Nullable), - Field("col_b", Type.String, Nullable) + Field("col_b", Type.String, Nullable), + Field("col_c", Type.String, Nullable) ) ) @@ -349,8 +388,8 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { NonEmptyVector.of( Field("_schema_version", Type.String, Required), Field("col_a", Type.String, Required), - Field("col_c", Type.String, Nullable), - Field("col_b", Type.String, Nullable) + Field("col_b", Type.String, Nullable), + Field("col_c", Type.String, Nullable) ) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e050b3e..c68e448 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -37,7 +37,7 @@ object Dependencies { val azureSdk = "1.11.4" // Snowplow - val schemaDdl = "0.25.0" + val schemaDdl = "0.26.0" val badrows = "2.3.0" val igluClient = "4.0.0" val tracker = "2.0.0"