Skip to content

Commit

Permalink
materialize-elasticsearch: materialize singly-typed arrays
Browse files Browse the repository at this point in the history
With array inference information available in the connector protocol,
materializations like Elasticsearch can create appropriately typed "array" index
mappings for schematized arrays where there is known to be a single type of
item.
  • Loading branch information
williamhbaker committed Jan 14, 2025
1 parent 7f596b0 commit e5153ea
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 38 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/databricks/databricks-sql-go v1.5.5
github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5
github.com/elastic/go-elasticsearch/v8 v8.9.0
github.com/estuary/flow v0.4.1-0.20240815183205-bba5ae475f12
github.com/estuary/flow v0.5.9
github.com/estuary/vitess v0.15.10
github.com/evanphx/json-patch/v5 v5.9.0
github.com/firebolt-db/firebolt-go-sdk v1.2.0
Expand Down Expand Up @@ -67,9 +67,10 @@ require (
github.com/trinodb/trino-go-client v0.313.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220527110425-ba4adb87a31b
go.gazette.dev/core v0.89.1-0.20240418133910-d612fdcfd24a
go.gazette.dev/core v0.100.0
go.mongodb.org/mongo-driver v1.16.1
golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.16.0
Expand Down Expand Up @@ -167,6 +168,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
Expand Down Expand Up @@ -232,7 +234,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.22.0 // indirect
Expand Down
25 changes: 13 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/estuary/flow v0.4.1-0.20240815183205-bba5ae475f12 h1:wUvdCL6Uy4+otsfND3RlXRUjB8zdWT+1U50HCYjLuxI=
github.com/estuary/flow v0.4.1-0.20240815183205-bba5ae475f12/go.mod h1:9LZsqpHimSOWuo47gKABSgrrfZQ0DMXzXOdlY+ziBSg=
github.com/estuary/flow v0.5.9 h1:AhZ/gomj9avI5zQ9T8OTMQfWTPRDsipMMHIMpp5i3GA=
github.com/estuary/flow v0.5.9/go.mod h1:fRYaE62AKCeBcthEs+M+Ofc9L7AQdRaNKjES3+tFziM=
github.com/estuary/vitess v0.15.10 h1:oXJgcG0HGZWuwjdvSp4pIFIAXtKLaR9Fl9VBynSszFA=
github.com/estuary/vitess v0.15.10/go.mod h1:rSIE8UMfexjblBloleGw6BqQIKggGmU91TduCNIaJEA=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
Expand Down Expand Up @@ -574,8 +574,8 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc=
github.com/gorilla/schema v1.2.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E=
github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand All @@ -586,6 +586,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
Expand Down Expand Up @@ -968,14 +969,14 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.einride.tech/aip v0.67.1 h1:d/4TW92OxXBngkSOwWS2CH5rez869KpKMaN44mdxkFI=
go.einride.tech/aip v0.67.1/go.mod h1:ZGX4/zKw8dcgzdLsrvpOOGxfxI2QSk12SlP7d6c0/XI=
go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0=
go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU=
go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ=
go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI=
go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg=
go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk=
go.gazette.dev/core v0.89.1-0.20240418133910-d612fdcfd24a h1:pu/yxw6Lsv+HhkpRT/MldB2zfEKjKNBX5B/8on5rEoA=
go.gazette.dev/core v0.89.1-0.20240418133910-d612fdcfd24a/go.mod h1:TdjN2Q0A8NsL1C7hFjbl8BzGKobyFNPmhJFi54GvNyU=
go.etcd.io/etcd/api/v3 v3.5.17 h1:cQB8eb8bxwuxOilBpMJAEo8fAONyrdXTHUNcMd8yT1w=
go.etcd.io/etcd/api/v3 v3.5.17/go.mod h1:d1hvkRuXkts6PmaYk2Vrgqbv7H4ADfAKhyJqHNLJCB4=
go.etcd.io/etcd/client/pkg/v3 v3.5.17 h1:XxnDXAWq2pnxqx76ljWwiQ9jylbpC4rvkAeRVOUKKVw=
go.etcd.io/etcd/client/pkg/v3 v3.5.17/go.mod h1:4DqK1TKacp/86nJk4FLQqo6Mn2vvQFBmruW3pP14H/w=
go.etcd.io/etcd/client/v3 v3.5.17 h1:o48sINNeWz5+pjy/Z0+HKpj/xSnBkuVhVvXkjEXbqZY=
go.etcd.io/etcd/client/v3 v3.5.17/go.mod h1:j2d4eXTHWkT2ClBgnnEPm/Wuu7jsqku41v9DZ3OtjQo=
go.gazette.dev/core v0.100.0 h1:lOSwhWIuZhYZ+br/4LN/osEfP6yQ+J1qM8pxuXmoD0I=
go.gazette.dev/core v0.100.0/go.mod h1:1rj+daAL/cy+dt9XZGzsLBJl5BeLkeCiRmlcHaSCH/I=
go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8=
go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down
4 changes: 2 additions & 2 deletions materialize-elasticsearch/.snapshots/TestValidateAndApply
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Big Schema Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields are able to be materialized"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"}
{"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document is required for a standard updates materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
Expand Down Expand Up @@ -39,7 +39,7 @@ Big Schema Initial Constraints:

Big Schema Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize this field"}
{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Arrays with multiple or unknown item types cannot be materialized"}
{"Field":"boolField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
Expand Down
2 changes: 0 additions & 2 deletions materialize-elasticsearch/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
} else {
for idx, v := range append(it.Key, it.Values...) {
if b, ok := v.([]byte); ok {
// An object or array field is received as raw JSON bytes. We currently only support
// objects.
v = json.RawMessage(b)
}
if s, ok := v.(string); b.floatFields[idx] && ok {
Expand Down
72 changes: 53 additions & 19 deletions materialize-elasticsearch/type_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ type property struct {
}

func propForField(field string, binding *pf.MaterializationSpec_Binding) (property, error) {
return propForProjection(binding.Collection.GetProjection(field), binding.FieldSelection.FieldConfigJsonMap[field])
p := binding.Collection.GetProjection(field)
return propForProjection(p, p.Inference.Types, binding.FieldSelection.FieldConfigJsonMap[field])
}

var numericStringTypes = map[boilerplate.StringWithNumericFormat]elasticPropertyType{
boilerplate.StringFormatInteger: elasticTypeLong,
boilerplate.StringFormatNumber: elasticTypeDouble,
}

func propForProjection(p *pf.Projection, fc json.RawMessage) (property, error) {
func propForProjection(p *pf.Projection, types []string, fc json.RawMessage) (property, error) {
type fieldConfig struct {
Keyword bool `json:"keyword"`
}
Expand All @@ -57,27 +58,34 @@ func propForProjection(p *pf.Projection, fc json.RawMessage) (property, error) {
return property{Type: numericStringTypes[numericString], Coerce: true}, nil
}

typesWithoutNull := func(ts []string) []string {
out := []string{}
for _, t := range ts {
if t != "null" {
out = append(out, t)
}
}
return out
nonNullTypes := typesWithoutNull(types)
if len(nonNullTypes) == 0 {
panic("internal application error: expected a single non-null type")
}

switch t := typesWithoutNull(p.Inference.Types)[0]; t {
switch t := nonNullTypes[0]; t {
case pf.JsonTypeArray:
// You can add an array of the same item to any field in Elasticsearch,
// and we only allow schematized arrays having a single type to be
// materialized.
return propForProjection(p, p.Inference.Array.ItemTypes, nil)
case pf.JsonTypeBoolean:
return property{Type: elasticTypeBoolean}, nil
case pf.JsonTypeInteger:
return property{Type: elasticTypeLong}, nil
case pf.JsonTypeString:
if p.Inference.String_.ContentEncoding == "base64" {
inf := p.Inference.String_
if inf == nil {
// This simplifies handling for arrays with string item types, since
// these will not have a string inference set.
inf = &pf.Inference_String{}
}

if inf.ContentEncoding == "base64" {
return property{Type: elasticTypeBinary}, nil
}

switch f := p.Inference.String_.Format; f {
switch f := inf.Format; f {
// Formats for "integer" and "number" are handled above.
case "date":
return property{Type: elasticTypeDate}, nil
Expand Down Expand Up @@ -155,6 +163,22 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
case p.IsRootDocumentProjection():
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The root document should usually be materialized"
case p.Inference.IsSingleType() && slices.Contains(p.Inference.Types, "array"):
var itemTypes []string
if p.Inference.Array != nil {
itemTypes = typesWithoutNull(p.Inference.Array.ItemTypes)
}

if len(itemTypes) == 1 && itemTypes[0] == "array" {
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Nested arrays cannot be materialized"
} else if len(itemTypes) == 1 {
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "Arrays with a single item type can be materialized"
} else {
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Arrays with multiple or unknown item types cannot be materialized"
}
case p.Field == "_meta/op":
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The operation type should usually be materialized"
Expand All @@ -164,14 +188,13 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
case p.Inference.IsSingleScalarType() || isNumeric:
constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED
constraint.Reason = "The projection has a single scalar type"
case p.Inference.IsSingleType() && !slices.Contains(p.Inference.Types, "array"):
case p.Inference.IsSingleType():
constraint.Type = pm.Response_Validated_Constraint_FIELD_OPTIONAL
constraint.Reason = "This field is able to be materialized"

default:
// Anything else is either multiple different types, a single 'null' type, or an array type
// which we currently don't support. We could potentially support array types if they made
// the "elements" configuration available and that was a single type.
// Anything else is either multiple different types or a single 'null'
// type.
constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
constraint.Reason = "Cannot materialize this field"
}
Expand All @@ -180,7 +203,7 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
}

func (constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, fc json.RawMessage) (bool, error) {
prop, err := propForProjection(proposed, fc)
prop, err := propForProjection(proposed, proposed.Inference.Types, fc)
if err != nil {
return false, err
}
Expand All @@ -189,10 +212,21 @@ func (constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.
}

func (constrainter) DescriptionForType(p *pf.Projection, fc json.RawMessage) (string, error) {
prop, err := propForProjection(p, fc)
prop, err := propForProjection(p, p.Inference.Types, fc)
if err != nil {
return "", err
}

return string(prop.Type), nil
}

func typesWithoutNull(ts []string) []string {
out := []string{}
for _, t := range ts {
if t != "null" {
out = append(out, t)
}
}

return out
}
61 changes: 61 additions & 0 deletions materialize-elasticsearch/type_mapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"testing"

pf "github.com/estuary/flow/go/protocols/flow"
"github.com/stretchr/testify/require"
)

func TestPropForProjection(t *testing.T) {
tests := []struct {
name string
in *pf.Projection
want property
}{
{
name: "array with string items - nullable",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array", "null"},
Array: &pf.Inference_Array{
ItemTypes: []string{"string", "null"},
},
},
},
want: property{Type: elasticTypeText},
},
{
name: "array with string items - not nullable",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array"},
Array: &pf.Inference_Array{
ItemTypes: []string{"string"},
},
},
},
want: property{Type: elasticTypeText},
},
{
name: "array with object items",
in: &pf.Projection{
Inference: pf.Inference{
Types: []string{"array"},
Array: &pf.Inference_Array{
ItemTypes: []string{"object"},
},
},
},
want: property{Type: elasticTypeFlattened, IgnoreAbove: 32766 / 4},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := propForProjection(tt.in, tt.in.Inference.Types, nil)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
Loading

0 comments on commit e5153ea

Please sign in to comment.