From 4e6ecd3a38235e4f43bef1812e025880e70c5dab Mon Sep 17 00:00:00 2001 From: Will Baker Date: Tue, 14 Jan 2025 14:47:38 -0500 Subject: [PATCH] materialize-elasticsearch: materialize fields with multiple types and non singly-typed arrays Expands handling for fields that have multiple types, and arrays that do not have a single type of item. This is done by materializing the values for these fields as a synthetic object and putting that in a "flattened" index mapping, the same way as other objects are materialized. The generated object has a key of "json" and its value is the JSON representation of the original value. --- .../.snapshots/TestValidateAndApply | 39 ++++--- materialize-elasticsearch/driver.go | 4 + materialize-elasticsearch/transactor.go | 6 + materialize-elasticsearch/type_mapping.go | 108 ++++++++++-------- .../type_mapping_test.go | 44 ++++++- .../materialize-elasticsearch/snapshot.json | 100 +++++++++++++--- 6 files changed, 220 insertions(+), 81 deletions(-) diff --git a/materialize-elasticsearch/.snapshots/TestValidateAndApply b/materialize-elasticsearch/.snapshots/TestValidateAndApply index 5b4cedb291..24abc7e31d 100644 --- a/materialize-elasticsearch/.snapshots/TestValidateAndApply +++ b/materialize-elasticsearch/.snapshots/TestValidateAndApply @@ -1,15 +1,15 @@ Big Schema Initial Constraints: {"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields are able to be materialized"} -{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"} +{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"} {"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} -{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document is required for a standard updates materialization"} +{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} -{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"Primary key locations are required"} -{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} -{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} +{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"} +{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"} +{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} {"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} -{"Field":"objField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"} +{"Field":"objField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} @@ -39,14 +39,14 @@ Big Schema Initial Constraints: Big Schema Re-validated Constraints: {"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"} +{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} -{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} +{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} {"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} @@ -78,14 +78,14 @@ Big Schema Re-validated Constraints: Big Schema Changed Types Constraints: {"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"} +{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"boolField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'boolField' is already being materialized as endpoint type 'BOOLEAN' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"} {"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"intField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'intField' is already being materialized as endpoint type 'LONG' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"} {"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} -{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"} +{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"} {"Field":"objField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'objField' is already being materialized as endpoint type 'FLATTENED' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"} {"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"} @@ -117,11 +117,13 @@ Big Schema Changed Types Constraints: Big Schema Materialized Resource Schema With All Fields Required: {"Field":"_meta/flow_truncated","Type":"boolean"} +{"Field":"arrayField","Type":"flattened"} {"Field":"boolField","Type":"boolean"} {"Field":"flow_document","Type":"flattened"} {"Field":"flow_published_at","Type":"date"} {"Field":"intField","Type":"long"} {"Field":"key","Type":"keyword"} +{"Field":"multipleField","Type":"flattened"} {"Field":"numField","Type":"double"} {"Field":"objField","Type":"flattened"} {"Field":"stringDateField","Type":"date"} @@ -153,11 +155,13 @@ Big Schema Materialized Resource Schema With All Fields Required: Big Schema Materialized Resource Schema With No Fields Required: {"Field":"_meta/flow_truncated","Type":"boolean"} +{"Field":"arrayField","Type":"flattened"} {"Field":"boolField","Type":"boolean"} {"Field":"flow_document","Type":"flattened"} {"Field":"flow_published_at","Type":"date"} {"Field":"intField","Type":"long"} {"Field":"key","Type":"keyword"} +{"Field":"multipleField","Type":"flattened"} {"Field":"numField","Type":"double"} {"Field":"objField","Type":"flattened"} {"Field":"stringDateField","Type":"date"} @@ -189,14 +193,14 @@ Big Schema Materialized Resource Schema With No Fields Required: Big Schema Changed Types With Table Replacement Constraints: {"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields are able to be materialized"} -{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"} +{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} -{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document is required for a standard updates materialization"} +{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} -{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"Primary key locations are required"} -{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"} -{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"} +{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"} +{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"} +{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} @@ -234,6 +238,7 @@ Big Schema Materialized Resource Schema Changed Types With Table Replacement: {"Field":"flow_published_at","Type":"date"} {"Field":"intField","Type":"text"} {"Field":"key","Type":"keyword"} +{"Field":"multipleField","Type":"flattened"} {"Field":"nullField","Type":"flattened"} {"Field":"numField","Type":"boolean"} {"Field":"objField","Type":"text"} diff --git a/materialize-elasticsearch/driver.go b/materialize-elasticsearch/driver.go index fede893bc5..0f6c93a9fe 100644 --- a/materialize-elasticsearch/driver.go +++ b/materialize-elasticsearch/driver.go @@ -490,6 +490,7 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil allFields := append(b.FieldSelection.Keys, b.FieldSelection.Values...) fields := make([]string, 0, len(allFields)) floatFields := make([]bool, len(allFields)) + wrapFields := make([]bool, len(allFields)) for idx, field := range allFields { fields = append(fields, translateField(field)) @@ -497,6 +498,8 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil return nil, nil, nil, err } else if prop.Type == elasticTypeDouble { floatFields[idx] = true + } else if mustWrapAndFlatten(b.Collection.GetProjection(field)) { + wrapFields[idx] = true } } @@ -506,6 +509,7 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil deltaUpdates: res.DeltaUpdates, fields: fields, floatFields: floatFields, + wrapFields: wrapFields, docField: b.FieldSelection.Document, }) } diff --git a/materialize-elasticsearch/transactor.go b/materialize-elasticsearch/transactor.go index 2d9400c385..5ab3085e82 100644 --- a/materialize-elasticsearch/transactor.go +++ b/materialize-elasticsearch/transactor.go @@ -40,6 +40,10 @@ type binding struct { // these and we'll replace them with NULL. floatFields []bool + // Index of fields that are some type that cannot be materialized natively, and must be wrapped + // in a synthetic object and materialized into a flattened mapping. + wrapFields []bool + // Present if the binding includes the root document, empty if not. This is usually the default // "flow_document" but may have an alternate user-defined projection name. docField string @@ -223,6 +227,8 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { if s == "Infinity" || s == "-Infinity" || s == "NaN" { v = nil } + } else if b.wrapFields[idx] { + v = map[string]any{"json": v} } doc[b.fields[idx]] = v diff --git a/materialize-elasticsearch/type_mapping.go b/materialize-elasticsearch/type_mapping.go index e9c8da6f09..85da3666a9 100644 --- a/materialize-elasticsearch/type_mapping.go +++ b/materialize-elasticsearch/type_mapping.go @@ -43,6 +43,10 @@ var numericStringTypes = map[boilerplate.StringWithNumericFormat]elasticProperty } func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (property, error) { + if mustWrapAndFlatten(p) { + return objProp(), nil + } + type fieldConfig struct { Keyword bool `json:"keyword"` } @@ -58,17 +62,12 @@ func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (pr return property{Type: numericStringTypes[numericString], Coerce: true}, nil } - nonNullTypes := typesWithoutNull(types) - if len(nonNullTypes) == 0 { - panic("internal application error: expected a single non-null type") - } - - switch t := nonNullTypes[0]; t { + switch t := typesWithoutNull(types)[0]; t { case pf.JsonTypeArray: - // You can add an array of the same item to any field in Elasticsearch, - // and we only allow schematized arrays having a single type to be - // materialized. - return propForProjection(p, p.Inference.Array.ItemTypes, nil) + // Arrays of the same item type can be added to a field that has that + // type, so the created mapping will be for that singly-typed array + // item. + return propForProjection(p, p.Inference.Array.ItemTypes, fc) case pf.JsonTypeBoolean: return property{Type: elasticTypeBoolean}, nil case pf.JsonTypeInteger: @@ -103,15 +102,7 @@ func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (pr } } case pf.JsonTypeObject: - return property{ - Type: elasticTypeFlattened, - // See https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html - // This setting is to avoid extremely long strings causing errors due to Elastic's - // requirement that strings do not have a byte length longer than 32766. Long strings - // will not be indexed or stored in the Lucene index, but will still be present in the - // _source field. - IgnoreAbove: 32766 / 4, - }, nil + return objProp(), nil case pf.JsonTypeNumber: return property{Type: elasticTypeDouble}, nil default: @@ -156,29 +147,16 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp switch { case p.IsPrimaryKey: constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED - constraint.Reason = "Primary key locations are required" - case p.IsRootDocumentProjection() && !deltaUpdates: - constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED - constraint.Reason = "The root document is required for a standard updates materialization" - case p.IsRootDocumentProjection(): + constraint.Reason = "All Locations that are part of the collections key are required" + case p.IsRootDocumentProjection() && deltaUpdates: constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED constraint.Reason = "The root document should usually be materialized" - case p.Inference.IsSingleType() && slices.Contains(p.Inference.Types, "array"): - var itemTypes []string - if p.Inference.Array != nil { - itemTypes = typesWithoutNull(p.Inference.Array.ItemTypes) - } - - if len(itemTypes) == 1 && itemTypes[0] == "array" { - constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN - constraint.Reason = "Nested arrays cannot be materialized" - } else if len(itemTypes) == 1 { - constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED - constraint.Reason = "Arrays with a single item type can be materialized" - } else { - constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN - constraint.Reason = "Arrays with multiple or unknown item types cannot be materialized" - } + case p.IsRootDocumentProjection(): + constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED + constraint.Reason = "The root document must be materialized" + case len(p.Inference.Types) == 0: + constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN + constraint.Reason = "Cannot materialize a field with no types" case p.Field == "_meta/op": constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED constraint.Reason = "The operation type should usually be materialized" @@ -188,15 +166,16 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp case p.Inference.IsSingleScalarType() || isNumeric: constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED constraint.Reason = "The projection has a single scalar type" - case p.Inference.IsSingleType(): + case slices.Equal(p.Inference.Types, []string{"null"}): + constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN + constraint.Reason = "Cannot materialize a field where the only possible type is 'null'" + case p.Inference.IsSingleType() && slices.Contains(p.Inference.Types, "object"): constraint.Type = pm.Response_Validated_Constraint_FIELD_OPTIONAL - constraint.Reason = "This field is able to be materialized" - + constraint.Reason = "Object fields may be materialized" default: - // Anything else is either multiple different types or a single 'null' - // type. - constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN - constraint.Reason = "Cannot materialize this field" + // Any other case is one where the field is an array or has multiple types. + constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED + constraint.Reason = "This field is able to be materialized" } return &constraint @@ -230,3 +209,38 @@ func typesWithoutNull(ts []string) []string { return out } + +func mustWrapAndFlatten(p *pf.Projection) bool { + nonNullTypes := typesWithoutNull(p.Inference.Types) + + if len(nonNullTypes) != 1 { + return true + } + + if nonNullTypes[0] == "array" { + if p.Inference.Array == nil { + return true + } + + items := typesWithoutNull(p.Inference.Array.ItemTypes) + if len(items) != 1 || items[0] == "array" { + // Nested arrays don't work because we need some non-array type + // somewhere in order to actually create the mapping. + return true + } + } + + return false +} + +func objProp() property { + return property{ + Type: elasticTypeFlattened, + // See https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html + // This setting is to avoid extremely long strings causing errors due to Elastic's + // requirement that strings do not have a byte length longer than 32766. Long strings + // will not be indexed or stored in the Lucene index, but will still be present in the + // _source field. + IgnoreAbove: 32766 / 4, + } +} diff --git a/materialize-elasticsearch/type_mapping_test.go b/materialize-elasticsearch/type_mapping_test.go index bde93416a7..c961f640a6 100644 --- a/materialize-elasticsearch/type_mapping_test.go +++ b/materialize-elasticsearch/type_mapping_test.go @@ -47,7 +47,49 @@ func TestPropForProjection(t *testing.T) { }, }, }, - want: property{Type: elasticTypeFlattened, IgnoreAbove: 32766 / 4}, + want: objProp(), + }, + { + name: "array with array items", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"array"}, + Array: &pf.Inference_Array{ + ItemTypes: []string{"array"}, + }, + }, + }, + want: objProp(), + }, + { + name: "array with multiple item types", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"array"}, + Array: &pf.Inference_Array{ + ItemTypes: []string{"object", "string"}, + }, + }, + }, + want: objProp(), + }, + { + name: "array with unknown item types", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"array"}, + }, + }, + want: objProp(), + }, + { + name: "multiple types", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"array", "string", "object"}, + }, + }, + want: objProp(), }, } diff --git a/tests/materialize/materialize-elasticsearch/snapshot.json b/tests/materialize/materialize-elasticsearch/snapshot.json index a93b7bca8e..fce7e6c47d 100644 --- a/tests/materialize/materialize-elasticsearch/snapshot.json +++ b/tests/materialize/materialize-elasticsearch/snapshot.json @@ -457,6 +457,9 @@ }, "flow_published_at": "1970-01-01T00:00:13.000000000Z", "id": 1, + "multiple": { + "json": 1 + }, "nested": { "id": "i1" }, @@ -491,6 +494,9 @@ }, "flow_published_at": "1970-01-01T00:00:14.000000000Z", "id": 2, + "multiple": { + "json": 2.2 + }, "nested": { "id": "i2" }, @@ -525,6 +531,9 @@ }, "flow_published_at": "1970-01-01T00:00:15.000000000Z", "id": 3, + "multiple": { + "json": true + }, "nested": { "id": "i3" }, @@ -559,6 +568,9 @@ }, "flow_published_at": "1970-01-01T00:00:16.000000000Z", "id": 4, + "multiple": { + "json": false + }, "nested": { "id": "i4" }, @@ -593,6 +605,9 @@ }, "flow_published_at": "1970-01-01T00:00:17.000000000Z", "id": 5, + "multiple": { + "json": "string five" + }, "nested": { "id": "i5" }, @@ -631,6 +646,13 @@ }, "flow_published_at": "1970-01-01T01:00:19.000000000Z", "id": 6, + "multiple": { + "json": [ + "one", + 2, + true + ] + }, "nested": { "id": "i6" }, @@ -667,6 +689,11 @@ }, "flow_published_at": "1970-01-01T01:00:20.000000000Z", "id": 7, + "multiple": { + "json": { + "object": "seven" + } + }, "nested": { "id": "i7" }, @@ -701,6 +728,9 @@ }, "flow_published_at": "1970-01-01T01:00:21.000000000Z", "id": 8, + "multiple": { + "json": null + }, "nested": { "id": "i8" }, @@ -735,6 +765,9 @@ }, "flow_published_at": "1970-01-01T01:00:22.000000000Z", "id": 9, + "multiple": { + "json": null + }, "nested": { "id": "i9" }, @@ -769,6 +802,9 @@ }, "flow_published_at": "1970-01-01T01:00:23.000000000Z", "id": 10, + "multiple": { + "json": null + }, "nested": { "id": "i10" }, @@ -798,9 +834,13 @@ }, "flow_published_at": "1970-01-01T01:00:13.000000000Z", "id": 1, - "int_and_str": 1, + "int_and_str": { + "json": 1 + }, "int_str": "10", - "num_and_str": 1.1, + "num_and_str": { + "json": 1.1 + }, "num_str": "10.1", "time": "00:00:00Z" }, @@ -822,9 +862,13 @@ }, "flow_published_at": "1970-01-01T01:00:14.000000000Z", "id": 2, - "int_and_str": 2, + "int_and_str": { + "json": 2 + }, "int_str": "20", - "num_and_str": 2.1, + "num_and_str": { + "json": 2.1 + }, "num_str": "20.1", "time": "14:20:12.33Z" }, @@ -846,9 +890,13 @@ }, "flow_published_at": "1970-01-01T00:00:11.000000000Z", "id": 3, - "int_and_str": 3, + "int_and_str": { + "json": 3 + }, "int_str": "30", - "num_and_str": 3.1, + "num_and_str": { + "json": 3.1 + }, "num_str": "30.1", "time": "23:59:38.10Z" }, @@ -870,9 +918,13 @@ }, "flow_published_at": "1970-01-01T00:00:12.000000000Z", "id": 4, - "int_and_str": "4", + "int_and_str": { + "json": "4" + }, "int_str": "40", - "num_and_str": "4.1", + "num_and_str": { + "json": "4.1" + }, "num_str": "40.1", "time": "23:59:38Z" }, @@ -894,9 +946,13 @@ }, "flow_published_at": "1970-01-01T01:00:15.000000000Z", "id": 5, - "int_and_str": "5", + "int_and_str": { + "json": "5" + }, "int_str": "50", - "num_and_str": "5.1", + "num_and_str": { + "json": "5.1" + }, "num_str": "50.1", "time": "23:59:59Z" }, @@ -912,9 +968,13 @@ }, "flow_published_at": "1970-01-01T01:00:16.000000000Z", "id": 8, - "int_and_str": null, + "int_and_str": { + "json": null + }, "int_str": null, - "num_and_str": null, + "num_and_str": { + "json": null + }, "num_str": null, "time": null }, @@ -930,9 +990,13 @@ }, "flow_published_at": "1970-01-01T01:00:17.000000000Z", "id": 9, - "int_and_str": null, + "int_and_str": { + "json": null + }, "int_str": null, - "num_and_str": null, + "num_and_str": { + "json": null + }, "num_str": null, "time": null }, @@ -948,9 +1012,13 @@ }, "flow_published_at": "1970-01-01T01:00:18.000000000Z", "id": 10, - "int_and_str": null, + "int_and_str": { + "json": null + }, "int_str": null, - "num_and_str": null, + "num_and_str": { + "json": null + }, "num_str": null, "time": null }