From 56766fb8afb36638043ac867530647315d9b9bd2 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Tue, 28 May 2024 20:39:25 +0300 Subject: [PATCH] Add Kafka Sink Headers Attribute --- docs/resources/sink_kafka.md | 1 + integration/sink.tf | 29 ++++++++++++++ integration/table.tf | 19 +++++++++ pkg/materialize/sink_kafka.go | 10 +++++ pkg/materialize/sink_kafka_test.go | 36 +++++++++++++++++ pkg/provider/acceptance_sink_kafka_test.go | 46 ++++++++++++++++++++++ pkg/resources/resource_sink_kafka.go | 10 +++++ pkg/resources/resource_sink_kafka_test.go | 3 +- 8 files changed, 153 insertions(+), 1 deletion(-) diff --git a/docs/resources/sink_kafka.md b/docs/resources/sink_kafka.md index 4bb313c8..272d238a 100644 --- a/docs/resources/sink_kafka.md +++ b/docs/resources/sink_kafka.md @@ -63,6 +63,7 @@ resource "materialize_sink_kafka" "example_sink_kafka" { - `database_name` (String) The identifier for the sink database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. - `envelope` (Block List, Max: 1) How to interpret records (e.g. Debezium, Upsert). (see [below for nested schema](#nestedblock--envelope)) - `format` (Block List, Max: 1) How to decode raw bytes from different formats into data structures it can understand at runtime. (see [below for nested schema](#nestedblock--format)) +- `headers` (String) The name of a column containing additional headers to add to each message emitted by the sink. The column must be of type map[text => text] or map[text => bytea]. - `key` (List of String) An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset. - `key_not_enforced` (Boolean) Disable Materialize's validation of the key's uniqueness. - `ownership_role` (String) The owernship role of the object. diff --git a/integration/sink.tf b/integration/sink.tf index 6f1a503b..87fadaf8 100644 --- a/integration/sink.tf +++ b/integration/sink.tf @@ -86,6 +86,35 @@ resource "materialize_sink_kafka" "sink_kafka_cluster" { } } + +resource "materialize_sink_kafka" "sink_kafka_headers" { + name = "sink_kafka_headers" + schema_name = materialize_schema.schema.name + database_name = materialize_database.database.name + cluster_name = materialize_cluster.cluster_sink.name + topic = "topic1" + key = ["key_column"] + key_not_enforced = true + snapshot = true + headers = "kafka_header" + from { + name = materialize_table.simple_table_sink.name + database_name = materialize_table.simple_table_sink.database_name + schema_name = materialize_table.simple_table_sink.schema_name + } + kafka_connection { + name = materialize_connection_kafka.kafka_connection.name + database_name = materialize_connection_kafka.kafka_connection.database_name + schema_name = materialize_connection_kafka.kafka_connection.schema_name + } + format { + json = true + } + envelope { + upsert = true + } +} + output "qualified_sink_kafka" { value = materialize_sink_kafka.sink_kafka.qualified_sql_name } diff --git a/integration/table.tf b/integration/table.tf index ef002c79..8b1c4d86 100644 --- a/integration/table.tf +++ b/integration/table.tf @@ -31,6 +31,25 @@ resource "materialize_table" "simple_table" { } } +resource "materialize_table" "simple_table_sink" { + name = "simple_table_sink" + schema_name = materialize_schema.schema.name + database_name = materialize_database.database.name + comment = "table sink comment" + + column { + name = "key_column" + type = "text" + } + column { + name = "kafka_header" + type = "map[text => text]" + } + lifecycle { + ignore_changes = [column] + } +} + resource "materialize_table_grant" "table_grant_select" { role_name = materialize_role.role_1.name privilege = "SELECT" diff --git a/pkg/materialize/sink_kafka.go b/pkg/materialize/sink_kafka.go index c0ad5e97..ca53a24f 100644 --- a/pkg/materialize/sink_kafka.go +++ b/pkg/materialize/sink_kafka.go @@ -35,6 +35,7 @@ type SinkKafkaBuilder struct { format SinkFormatSpecStruct envelope KafkaSinkEnvelopeStruct snapshot bool + headers string keyNotEnforced bool } @@ -95,6 +96,11 @@ func (b *SinkKafkaBuilder) Snapshot(s bool) *SinkKafkaBuilder { return b } +func (b *SinkKafkaBuilder) Headers(h string) *SinkKafkaBuilder { + b.headers = h + return b +} + func (b *SinkKafkaBuilder) KeyNotEnforced(s bool) *SinkKafkaBuilder { b.keyNotEnforced = true return b @@ -132,6 +138,10 @@ func (b *SinkKafkaBuilder) Create() error { q.WriteString(` NOT ENFORCED`) } + if b.headers != "" { + q.WriteString(fmt.Sprintf(` HEADERS %s`, b.headers)) + } + if b.format.Json { q.WriteString(` FORMAT JSON`) } diff --git a/pkg/materialize/sink_kafka_test.go b/pkg/materialize/sink_kafka_test.go index ff3c70be..2beb7e5b 100644 --- a/pkg/materialize/sink_kafka_test.go +++ b/pkg/materialize/sink_kafka_test.go @@ -356,3 +356,39 @@ func TestSinkKafkaAvroDocsCreate(t *testing.T) { } }) } + +func TestSinkKafkaHeadersCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SINK "database"."schema"."sink" + FROM "database"."schema"."src" + INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\) + HEADERS headers + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "materialize"."public"."csr_conn" + ENVELOPE DEBEZIUM;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"} + b := NewSinkKafkaBuilder(db, o) + b.From(IdentifierSchemaStruct{Name: "src", SchemaName: "schema", DatabaseName: "database"}) + b.KafkaConnection(IdentifierSchemaStruct{Name: "kafka_conn", SchemaName: "schema", DatabaseName: "database"}) + b.Topic("testdrive-snk1-seed") + b.Format( + SinkFormatSpecStruct{ + Avro: &SinkAvroFormatSpec{ + SchemaRegistryConnection: IdentifierSchemaStruct{ + Name: "csr_conn", + DatabaseName: "materialize", + SchemaName: "public", + }, + }, + }, + ) + b.Headers("headers") + b.Envelope(KafkaSinkEnvelopeStruct{Debezium: true}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/provider/acceptance_sink_kafka_test.go b/pkg/provider/acceptance_sink_kafka_test.go index 91041626..0a1a0667 100644 --- a/pkg/provider/acceptance_sink_kafka_test.go +++ b/pkg/provider/acceptance_sink_kafka_test.go @@ -40,6 +40,14 @@ func TestAccSinkKafka_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_sink_kafka.test_role", "name", sink2Name), resource.TestCheckResourceAttr("materialize_sink_kafka.test_role", "ownership_role", roleName), resource.TestCheckResourceAttr("materialize_sink_kafka.test_role", "comment", "Comment"), + testAccCheckSinkKafkaExists("materialize_sink_kafka.sink_kafka_headers"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "name", sinkName+"_sink_headers"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "topic", "topic1"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "key.0", "column_1"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "key_not_enforced", "true"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "snapshot", "true"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "headers", "column_1"), + resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "envelope.0.upsert", "true"), ), }, { @@ -182,6 +190,18 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name size = "3xsmall" } + resource "materialize_table" "simple_table" { + name = "%[4]s_simple_table" + + column { + name = "column_1" + type = "map[text => text]" + } + lifecycle { + ignore_changes = [column] + } + } + resource "materialize_connection_kafka" "test" { name = "%[2]s" kafka_broker { @@ -248,6 +268,32 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name depends_on = [materialize_role.test] } + + resource "materialize_sink_kafka" "sink_kafka_headers" { + name = "%[4]s_sink_headers" + cluster_name = materialize_cluster.test.name + topic = "topic1" + key = ["column_1"] + key_not_enforced = true + snapshot = true + headers = "column_1" + from { + name = materialize_table.simple_table.name + database_name = materialize_table.simple_table.database_name + schema_name = materialize_table.simple_table.schema_name + } + kafka_connection { + name = materialize_connection_kafka.test.name + database_name = materialize_connection_kafka.test.database_name + schema_name = materialize_connection_kafka.test.schema_name + } + format { + json = true + } + envelope { + upsert = true + } + } `, roleName, connName, tableName, sinkName, sink2Name, sinkOwner, comment) } diff --git a/pkg/resources/resource_sink_kafka.go b/pkg/resources/resource_sink_kafka.go index 6b9c283c..f0d06be8 100644 --- a/pkg/resources/resource_sink_kafka.go +++ b/pkg/resources/resource_sink_kafka.go @@ -93,6 +93,12 @@ var sinkKafkaSchema = map[string]*schema.Schema{ ForceNew: true, Default: false, }, + "headers": { + Description: "The name of a column containing additional headers to add to each message emitted by the sink. The column must be of type map[text => text] or map[text => bytea].", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, "region": RegionSchema(), } @@ -173,6 +179,10 @@ func sinkKafkaCreate(ctx context.Context, d *schema.ResourceData, meta any) diag b.Snapshot(v.(bool)) } + if v, ok := d.GetOk("headers"); ok { + b.Headers(v.(string)) + } + // create resource if err := b.Create(); err != nil { return diag.FromErr(err) diff --git a/pkg/resources/resource_sink_kafka_test.go b/pkg/resources/resource_sink_kafka_test.go index 4f3bb388..eb1b4dd7 100644 --- a/pkg/resources/resource_sink_kafka_test.go +++ b/pkg/resources/resource_sink_kafka_test.go @@ -29,6 +29,7 @@ var inSinkKafka = map[string]interface{}{ "compression_type": "gzip", "key": []interface{}{"key_1", "key_2"}, "key_not_enforced": true, + "headers": "headers", "format": []interface{}{ map[string]interface{}{ "avro": []interface{}{ @@ -100,7 +101,7 @@ func TestResourceSinkKafkaCreate(t *testing.T) { IN CLUSTER "cluster" FROM "database"."public"."item" INTO KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', COMPRESSION TYPE = gzip\) KEY \(key_1, key_2\) - NOT ENFORCED FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" + NOT ENFORCED HEADERS headers FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname', DOC ON TYPE "database"."public"."item" = 'top-level comment', KEY DOC ON COLUMN "database"."public"."item"."c1" = 'comment on column only in key schema',