Skip to content

Commit

Permalink
materialize-elasticsearch: materialize fields with multiple types and…
Browse files Browse the repository at this point in the history
… non singly-typed arrays

Expands handling for fields that have multiple types, and arrays that do not
have a single type of item.

This is done by materializing the values for these fields as a synthetic object
and putting that in a "flattened" index mapping, the same way as other objects
are materialized. The generated object has a key of "json" and its value is the
JSON representation of the original value.
  • Loading branch information
williamhbaker committed Jan 14, 2025
1 parent e5153ea commit 4e6ecd3
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 81 deletions.
39 changes: 22 additions & 17 deletions materialize-elasticsearch/.snapshots/TestValidateAndApply
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
Big Schema Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields are able to be materialized"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"}
{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document is required for a standard updates materialization"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"Primary key locations are required"}
{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"objField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"}
{"Field":"objField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
Expand Down Expand Up @@ -39,14 +39,14 @@ Big Schema Initial Constraints:

Big Schema Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"}
{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
Expand Down Expand Up @@ -78,14 +78,14 @@ Big Schema Re-validated Constraints:

Big Schema Changed Types Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"}
{"Field":"arrayField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"boolField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'boolField' is already being materialized as endpoint type 'BOOLEAN' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"intField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'intField' is already being materialized as endpoint type 'LONG' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"}
{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"}
{"Field":"objField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'objField' is already being materialized as endpoint type 'FLATTENED' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"}
{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"}
Expand Down Expand Up @@ -117,11 +117,13 @@ Big Schema Changed Types Constraints:

Big Schema Materialized Resource Schema With All Fields Required:
{"Field":"_meta/flow_truncated","Type":"boolean"}
{"Field":"arrayField","Type":"flattened"}
{"Field":"boolField","Type":"boolean"}
{"Field":"flow_document","Type":"flattened"}
{"Field":"flow_published_at","Type":"date"}
{"Field":"intField","Type":"long"}
{"Field":"key","Type":"keyword"}
{"Field":"multipleField","Type":"flattened"}
{"Field":"numField","Type":"double"}
{"Field":"objField","Type":"flattened"}
{"Field":"stringDateField","Type":"date"}
Expand Down Expand Up @@ -153,11 +155,13 @@ Big Schema Materialized Resource Schema With All Fields Required:

Big Schema Materialized Resource Schema With No Fields Required:
{"Field":"_meta/flow_truncated","Type":"boolean"}
{"Field":"arrayField","Type":"flattened"}
{"Field":"boolField","Type":"boolean"}
{"Field":"flow_document","Type":"flattened"}
{"Field":"flow_published_at","Type":"date"}
{"Field":"intField","Type":"long"}
{"Field":"key","Type":"keyword"}
{"Field":"multipleField","Type":"flattened"}
{"Field":"numField","Type":"double"}
{"Field":"objField","Type":"flattened"}
{"Field":"stringDateField","Type":"date"}
Expand Down Expand Up @@ -189,14 +193,14 @@ Big Schema Materialized Resource Schema With No Fields Required:

Big Schema Changed Types With Table Replacement Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields are able to be materialized"}
{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"}
{"Field":"arrayField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document is required for a standard updates materialization"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"Primary key locations are required"}
{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"This field is able to be materialized"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"numField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
Expand Down Expand Up @@ -234,6 +238,7 @@ Big Schema Materialized Resource Schema Changed Types With Table Replacement:
{"Field":"flow_published_at","Type":"date"}
{"Field":"intField","Type":"text"}
{"Field":"key","Type":"keyword"}
{"Field":"multipleField","Type":"flattened"}
{"Field":"nullField","Type":"flattened"}
{"Field":"numField","Type":"boolean"}
{"Field":"objField","Type":"text"}
Expand Down
4 changes: 4 additions & 0 deletions materialize-elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,16 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil
allFields := append(b.FieldSelection.Keys, b.FieldSelection.Values...)
fields := make([]string, 0, len(allFields))
floatFields := make([]bool, len(allFields))
wrapFields := make([]bool, len(allFields))

for idx, field := range allFields {
fields = append(fields, translateField(field))
if prop, err := propForField(field, b); err != nil {
return nil, nil, nil, err
} else if prop.Type == elasticTypeDouble {
floatFields[idx] = true
} else if mustWrapAndFlatten(b.Collection.GetProjection(field)) {
wrapFields[idx] = true
}
}

Expand All @@ -506,6 +509,7 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil
deltaUpdates: res.DeltaUpdates,
fields: fields,
floatFields: floatFields,
wrapFields: wrapFields,
docField: b.FieldSelection.Document,
})
}
Expand Down
6 changes: 6 additions & 0 deletions materialize-elasticsearch/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type binding struct {
// these and we'll replace them with NULL.
floatFields []bool

// Index of fields that are some type that cannot be materialized natively, and must be wrapped
// in a synthetic object and materialized into a flattened mapping.
wrapFields []bool

// Present if the binding includes the root document, empty if not. This is usually the default
// "flow_document" but may have an alternate user-defined projection name.
docField string
Expand Down Expand Up @@ -223,6 +227,8 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
if s == "Infinity" || s == "-Infinity" || s == "NaN" {
v = nil
}
} else if b.wrapFields[idx] {
v = map[string]any{"json": v}
}

doc[b.fields[idx]] = v
Expand Down
108 changes: 61 additions & 47 deletions materialize-elasticsearch/type_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var numericStringTypes = map[boilerplate.StringWithNumericFormat]elasticProperty
}

func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (property, error) {
if mustWrapAndFlatten(p) {
return objProp(), nil
}

type fieldConfig struct {
Keyword bool `json:"keyword"`
}
Expand All @@ -58,17 +62,12 @@ func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (pr
return property{Type: numericStringTypes[numericString], Coerce: true}, nil
}

nonNullTypes := typesWithoutNull(types)
if len(nonNullTypes) == 0 {
panic("internal application error: expected a single non-null type")
}

switch t := nonNullTypes[0]; t {
switch t := typesWithoutNull(types)[0]; t {
case pf.JsonTypeArray:
// You can add an array of the same item to any field in Elasticsearch,
// and we only allow schematized arrays having a single type to be
// materialized.
return propForProjection(p, p.Inference.Array.ItemTypes, nil)
// Arrays of the same item type can be added to a field that has that
// type, so the created mapping will be for that singly-typed array
// item.
return propForProjection(p, p.Inference.Array.ItemTypes, fc)
case pf.JsonTypeBoolean:
return property{Type: elasticTypeBoolean}, nil
case pf.JsonTypeInteger:
Expand Down Expand Up @@ -103,15 +102,7 @@ func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (pr
}
}
case pf.JsonTypeObject:
return property{
Type: elasticTypeFlattened,
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html
// This setting is to avoid extremely long strings causing errors due to Elastic's
// requirement that strings do not have a byte length longer than 32766. Long strings
// will not be indexed or stored in the Lucene index, but will still be present in the
// _source field.
IgnoreAbove: 32766 / 4,
}, nil
return objProp(), nil
case pf.JsonTypeNumber:
return property{Type: elasticTypeDouble}, nil
default:
Expand Down Expand Up @@ -156,29 +147,16 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
switch {
case p.IsPrimaryKey:
constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED
constraint.Reason = "Primary key locations are required"
case p.IsRootDocumentProjection() && !deltaUpdates:
constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED
constraint.Reason = "The root document is required for a standard updates materialization"
case p.IsRootDocumentProjection():
constraint.Reason = "All Locations that are part of the collections key are required"
case p.IsRootDocumentProjection() && deltaUpdates:
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The root document should usually be materialized"
case p.Inference.IsSingleType() && slices.Contains(p.Inference.Types, "array"):
var itemTypes []string
if p.Inference.Array != nil {
itemTypes = typesWithoutNull(p.Inference.Array.ItemTypes)
}

if len(itemTypes) == 1 && itemTypes[0] == "array" {
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Nested arrays cannot be materialized"
} else if len(itemTypes) == 1 {
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "Arrays with a single item type can be materialized"
} else {
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Arrays with multiple or unknown item types cannot be materialized"
}
case p.IsRootDocumentProjection():
constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED
constraint.Reason = "The root document must be materialized"
case len(p.Inference.Types) == 0:
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Cannot materialize a field with no types"
case p.Field == "_meta/op":
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The operation type should usually be materialized"
Expand All @@ -188,15 +166,16 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
case p.Inference.IsSingleScalarType() || isNumeric:
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The projection has a single scalar type"
case p.Inference.IsSingleType():
case slices.Equal(p.Inference.Types, []string{"null"}):
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Cannot materialize a field where the only possible type is 'null'"
case p.Inference.IsSingleType() && slices.Contains(p.Inference.Types, "object"):
constraint.Type = pm.Response_Validated_Constraint_FIELD_OPTIONAL
constraint.Reason = "This field is able to be materialized"

constraint.Reason = "Object fields may be materialized"
default:
// Anything else is either multiple different types or a single 'null'
// type.
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Cannot materialize this field"
// Any other case is one where the field is an array or has multiple types.
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "This field is able to be materialized"
}

return &constraint
Expand Down Expand Up @@ -230,3 +209,38 @@ func typesWithoutNull(ts []string) []string {

return out
}

func mustWrapAndFlatten(p *pf.Projection) bool {
nonNullTypes := typesWithoutNull(p.Inference.Types)

if len(nonNullTypes) != 1 {
return true
}

if nonNullTypes[0] == "array" {
if p.Inference.Array == nil {
return true
}

items := typesWithoutNull(p.Inference.Array.ItemTypes)
if len(items) != 1 || items[0] == "array" {
// Nested arrays don't work because we need some non-array type
// somewhere in order to actually create the mapping.
return true
}
}

return false
}

func objProp() property {
return property{
Type: elasticTypeFlattened,
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html
// This setting is to avoid extremely long strings causing errors due to Elastic's
// requirement that strings do not have a byte length longer than 32766. Long strings
// will not be indexed or stored in the Lucene index, but will still be present in the
// _source field.
IgnoreAbove: 32766 / 4,
}
}
44 changes: 43 additions & 1 deletion materialize-elasticsearch/type_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,49 @@ func TestPropForProjection(t *testing.T) {
},
},
},
want: property{Type: elasticTypeFlattened, IgnoreAbove: 32766 / 4},
want: objProp(),
},
{
name: "array with array items",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array"},
Array: &pf.Inference_Array{
ItemTypes: []string{"array"},
},
},
},
want: objProp(),
},
{
name: "array with multiple item types",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array"},
Array: &pf.Inference_Array{
ItemTypes: []string{"object", "string"},
},
},
},
want: objProp(),
},
{
name: "array with unknown item types",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array"},
},
},
want: objProp(),
},
{
name: "multiple types",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array", "string", "object"},
},
},
want: objProp(),
},
}

Expand Down
Loading

0 comments on commit 4e6ecd3

Please sign in to comment.