From eb3afffc49350c9f9bc6168a587ab67717ef72ca Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Fri, 28 Jun 2024 12:03:14 +0300 Subject: [PATCH 1/4] Resource Kafka Source: Add VALUE DECODING ERRORSarg --- docs/resources/source_kafka.md | 9 +++ pkg/materialize/source_kafka.go | 43 +++++++--- pkg/materialize/source_kafka_test.go | 31 ++++++++ pkg/provider/acceptance_source_kafka_test.go | 84 ++++++++++++++++++++ pkg/resources/config.go | 4 + pkg/resources/resource_source_kafka.go | 25 +++++- pkg/resources/resource_source_kafka_test.go | 27 ++++++- 7 files changed, 207 insertions(+), 16 deletions(-) diff --git a/docs/resources/source_kafka.md b/docs/resources/source_kafka.md index c18f824d..f801268a 100644 --- a/docs/resources/source_kafka.md +++ b/docs/resources/source_kafka.md @@ -104,6 +104,15 @@ Optional: - `debezium` (Boolean) Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. - `none` (Boolean) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. - `upsert` (Boolean) Use the upsert envelope, which uses message keys to handle CRUD operations. +- `upsert_options` (Block List, Max: 1) Options for the upsert envelope. (see [below for nested schema](#nestedblock--envelope--upsert_options)) + + +### Nested Schema for `envelope.upsert_options` + +Optional: + +- `value_decoding_errors` (String) Specify how to handle value decoding errors in the upsert envelope. + diff --git a/pkg/materialize/source_kafka.go b/pkg/materialize/source_kafka.go index 3def45ca..bd91e6ad 100644 --- a/pkg/materialize/source_kafka.go +++ b/pkg/materialize/source_kafka.go @@ -9,22 +9,40 @@ import ( ) type KafkaSourceEnvelopeStruct struct { - Debezium bool - None bool - Upsert bool + Debezium bool + None bool + Upsert bool + UpsertOptions *UpsertOptionsStruct } -func GetSourceKafkaEnelopeStruct(v interface{}) KafkaSourceEnvelopeStruct { +type UpsertOptionsStruct struct { + ValueDecodingErrors string +} + +func GetSourceKafkaEnvelopeStruct(v interface{}) KafkaSourceEnvelopeStruct { var envelope KafkaSourceEnvelopeStruct - if v, ok := v.([]interface{})[0].(map[string]interface{})["upsert"]; ok { - envelope.Upsert = v.(bool) + + data := v.([]interface{})[0].(map[string]interface{}) + + if upsert, ok := data["upsert"].(bool); ok { + envelope.Upsert = upsert + if options, ok := data["upsert_options"].([]interface{}); ok && len(options) > 0 { + optionsData := options[0].(map[string]interface{}) + envelope.UpsertOptions = &UpsertOptionsStruct{} + if valueDecodingErrors, ok := optionsData["value_decoding_errors"].(string); ok { + envelope.UpsertOptions.ValueDecodingErrors = valueDecodingErrors + } + } } - if v, ok := v.([]interface{})[0].(map[string]interface{})["debezium"]; ok { - envelope.Debezium = v.(bool) + + if debezium, ok := data["debezium"].(bool); ok { + envelope.Debezium = debezium } - if v, ok := v.([]interface{})[0].(map[string]interface{})["none"]; ok { - envelope.None = v.(bool) + + if none, ok := data["none"].(bool); ok { + envelope.None = none } + return envelope } @@ -409,6 +427,11 @@ func (b *SourceKafkaBuilder) Create() error { if b.envelope.Upsert { q.WriteString(` ENVELOPE UPSERT`) + if b.envelope.UpsertOptions != nil { + if b.envelope.UpsertOptions.ValueDecodingErrors != "" { + q.WriteString(fmt.Sprintf(` (VALUE DECODING ERRORS = (%s))`, b.envelope.UpsertOptions.ValueDecodingErrors)) + } + } } if b.envelope.None { diff --git a/pkg/materialize/source_kafka_test.go b/pkg/materialize/source_kafka_test.go index be61c761..ac4b15d8 100644 --- a/pkg/materialize/source_kafka_test.go +++ b/pkg/materialize/source_kafka_test.go @@ -37,3 +37,34 @@ func TestResourceSourceKafkaCreate(t *testing.T) { } }) } + +func TestResourceSourceKafkaCreateWithUpsertOptions(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" + FROM KAFKA CONNECTION "database"."schema"."kafka_connection" + \(TOPIC 'events'\) FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_connection" + INCLUDE KEY, HEADERS, PARTITION, OFFSET, TIMESTAMP ENVELOPE UPSERT + \(VALUE DECODING ERRORS = \(INLINE\)\) + EXPOSE PROGRESS AS "database"."schema"."progress";`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + o := MaterializeObject{Name: "source", SchemaName: "schema", DatabaseName: "database"} + b := NewSourceKafkaBuilder(db, o) + b.KafkaConnection(IdentifierSchemaStruct{Name: "kafka_connection", DatabaseName: "database", SchemaName: "schema"}) + b.Topic("events") + b.Format(SourceFormatSpecStruct{Avro: &AvroFormatSpec{SchemaRegistryConnection: IdentifierSchemaStruct{Name: "csr_connection", DatabaseName: "database", SchemaName: "schema"}}}) + b.IncludeKey() + b.IncludeHeaders() + b.IncludePartition() + b.IncludeOffset() + b.IncludeTimestamp() + b.Envelope(KafkaSourceEnvelopeStruct{Upsert: true, UpsertOptions: &UpsertOptionsStruct{ValueDecodingErrors: "INLINE"}}) + b.ExposeProgress(IdentifierSchemaStruct{Name: "progress", DatabaseName: "database", SchemaName: "schema"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/provider/acceptance_source_kafka_test.go b/pkg/provider/acceptance_source_kafka_test.go index e7a6ad3b..0f3bc7be 100644 --- a/pkg/provider/acceptance_source_kafka_test.go +++ b/pkg/provider/acceptance_source_kafka_test.go @@ -111,6 +111,48 @@ func TestAccSourceKafkaAvro_basic(t *testing.T) { }) } +func TestAccSourceKafka_withUpsertOptions(t *testing.T) { + addTestTopic() + sourceName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccSourceKafkaResourceWithUpsertOptions(connName, sourceName), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceKafkaExists("materialize_source_kafka.test"), + resource.TestMatchResourceAttr("materialize_source_kafka.test", "id", terraformObjectIdRegex), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "name", sourceName), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "database_name", "materialize"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sourceName)), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "topic", "terraform"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "envelope.0.upsert", "true"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "envelope.0.upsert_options.0.value_decoding_errors", "INLINE"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.name", connName), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.database_name", "materialize"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offset.#", "1"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_timestamp_alias", "timestamp_alias"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset", "true"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset_alias", "offset_alias"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_partition", "true"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_partition_alias", "partition_alias"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_key_alias", "key_alias"), + ), + }, + { + ResourceName: "materialize_source_kafka.test", + ImportState: true, + ImportStateVerify: false, + }, + }, + }) +} + func TestAccSourceKafka_update(t *testing.T) { addTestTopic() slug := acctest.RandStringFromCharSet(5, acctest.CharSetAlpha) @@ -328,6 +370,48 @@ func testAccSourceKafkaResourceAvro(sourceName string) string { `, sourceName) } +func testAccSourceKafkaResourceWithUpsertOptions(connName, sourceName string) string { + return fmt.Sprintf(` + resource "materialize_connection_kafka" "test" { + name = "%[1]s" + kafka_broker { + broker = "redpanda:9092" + } + security_protocol = "PLAINTEXT" + } + + resource "materialize_source_kafka" "test" { + name = "%[2]s" + kafka_connection { + name = materialize_connection_kafka.test.name + } + + cluster_name = "quickstart" + topic = "terraform" + key_format { + text = true + } + value_format { + text = true + } + envelope { + upsert = true + upsert_options { + value_decoding_errors = "INLINE" + } + } + + start_offset = [0] + include_timestamp_alias = "timestamp_alias" + include_offset = true + include_offset_alias = "offset_alias" + include_partition = true + include_partition_alias = "partition_alias" + include_key_alias = "key_alias" + } +`, connName, sourceName) +} + func testAccCheckSourceKafkaExists(name string) resource.TestCheckFunc { return func(s *terraform.State) error { meta := testAccProvider.Meta() diff --git a/pkg/resources/config.go b/pkg/resources/config.go index 702e0e49..a61d1fee 100644 --- a/pkg/resources/config.go +++ b/pkg/resources/config.go @@ -108,3 +108,7 @@ var mysqlSSLMode = []string{ "verify-ca", "verify-identity", } + +var upsertValueDecodingErrors = []string{ + "INLINE", +} diff --git a/pkg/resources/resource_source_kafka.go b/pkg/resources/resource_source_kafka.go index ec606b7e..78cf8a8e 100644 --- a/pkg/resources/resource_source_kafka.go +++ b/pkg/resources/resource_source_kafka.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) var sourceKafkaSchema = map[string]*schema.Schema{ @@ -113,14 +114,32 @@ var sourceKafkaSchema = map[string]*schema.Schema{ Type: schema.TypeBool, Optional: true, ForceNew: true, - ConflictsWith: []string{"envelope.0.upsert", "envelope.0.none"}, + ConflictsWith: []string{"envelope.0.upsert", "envelope.0.none", "envelope.0.upsert_options"}, }, "none": { Description: "Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.", Type: schema.TypeBool, Optional: true, ForceNew: true, - ConflictsWith: []string{"envelope.0.upsert", "envelope.0.debezium"}, + ConflictsWith: []string{"envelope.0.upsert", "envelope.0.debezium", "envelope.0.upsert_options"}, + }, + "upsert_options": { + Description: "Options for the upsert envelope.", + Type: schema.TypeList, + MaxItems: 1, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "value_decoding_errors": { + Description: "Specify how to handle value decoding errors in the upsert envelope.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice(upsertValueDecodingErrors, true), + }, + }, + }, }, }, }, @@ -250,7 +269,7 @@ func sourceKafkaCreate(ctx context.Context, d *schema.ResourceData, meta any) di } if v, ok := d.GetOk("envelope"); ok { - envelope := materialize.GetSourceKafkaEnelopeStruct(v) + envelope := materialize.GetSourceKafkaEnvelopeStruct(v) b.Envelope(envelope) } diff --git a/pkg/resources/resource_source_kafka_test.go b/pkg/resources/resource_source_kafka_test.go index 94cde2f9..be0702f5 100644 --- a/pkg/resources/resource_source_kafka_test.go +++ b/pkg/resources/resource_source_kafka_test.go @@ -46,7 +46,16 @@ var inSourceKafka = map[string]interface{}{ }, }, }, - "envelope": []interface{}{map[string]interface{}{"upsert": true}}, + "envelope": []interface{}{ + map[string]interface{}{ + "upsert": true, + "upsert_options": []interface{}{ + map[string]interface{}{ + "value_decoding_errors": "INLINE", + }, + }, + }, + }, "start_offset": []interface{}{1, 2, 3}, "start_timestamp": -1000, } @@ -67,7 +76,7 @@ func TestResourceSourceKafkaCreate(t *testing.T) { PARTITION AS partition, OFFSET AS offset, TIMESTAMP AS timestamp - ENVELOPE UPSERT;`, + ENVELOPE UPSERT \(VALUE DECODING ERRORS = \(INLINE\)\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id @@ -103,6 +112,12 @@ func TestResourceSourceKafkaCreateIncludeTrueNoAlias(t *testing.T) { testInSourceKafka["include_timestamp"] = true delete(testInSourceKafka, "include_timestamp_alias") + testInSourceKafka["envelope"] = []interface{}{ + map[string]interface{}{ + "upsert": true, + }, + } + d := schema.TestResourceDataRaw(t, SourceKafka().Schema, testInSourceKafka) r.NotNil(d) @@ -148,6 +163,12 @@ func TestResourceSourceKafkaCreateIncludeFalseWithAlias(t *testing.T) { testInSourceKafka["include_offset"] = false testInSourceKafka["include_timestamp"] = false + testInSourceKafka["envelope"] = []interface{}{ + map[string]interface{}{ + "debezium": true, + }, + } + d := schema.TestResourceDataRaw(t, SourceKafka().Schema, testInSourceKafka) r.NotNil(d) @@ -157,7 +178,7 @@ func TestResourceSourceKafkaCreateIncludeFalseWithAlias(t *testing.T) { `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000, START OFFSET \(1,2,3\)\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname - ENVELOPE UPSERT;`, + ENVELOPE DEBEZIUM;`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id From ef4d21968a5446b2ce98dc901ed3d404a37674f8 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Sun, 30 Jun 2024 20:12:34 +0300 Subject: [PATCH 2/4] Add an integration test --- integration/source.tf | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/integration/source.tf b/integration/source.tf index 84bcf084..df875665 100644 --- a/integration/source.tf +++ b/integration/source.tf @@ -242,6 +242,36 @@ resource "materialize_source_grant" "source_grant_select" { database_name = materialize_source_load_generator.load_generator.database_name } +resource "materialize_source_kafka" "kafka_upsert_options_source" { + name = "kafka_upsert_options_source" + kafka_connection { + name = materialize_connection_kafka.kafka_connection.name + } + + cluster_name = materialize_cluster.cluster_source.name + topic = "topic1" + key_format { + text = true + } + value_format { + text = true + } + envelope { + upsert = true + upsert_options { + value_decoding_errors = "INLINE" + } + } + + start_offset = [0] + include_timestamp_alias = "timestamp_alias" + include_offset = true + include_offset_alias = "offset_alias" + include_partition = true + include_partition_alias = "partition_alias" + include_key_alias = "key_alias" +} + output "qualified_load_generator" { value = materialize_source_load_generator.load_generator.qualified_sql_name } From 10916606b095be6cb8a8ae6e32b06158a179f13b Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Sun, 30 Jun 2024 20:47:48 +0300 Subject: [PATCH 3/4] Fix failing test --- integration/source.tf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration/source.tf b/integration/source.tf index df875665..ae735b4a 100644 --- a/integration/source.tf +++ b/integration/source.tf @@ -248,6 +248,9 @@ resource "materialize_source_kafka" "kafka_upsert_options_source" { name = materialize_connection_kafka.kafka_connection.name } + # depends on sink_kafka_cluster to ensure that the topic exists + depends_on = [materialize_sink_kafka.sink_kafka_cluster] + cluster_name = materialize_cluster.cluster_source.name topic = "topic1" key_format { From 8eda9a155e823b51cf3007972257d5a8471f2982 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Sun, 30 Jun 2024 21:01:35 +0300 Subject: [PATCH 4/4] Fix failing test --- pkg/provider/acceptance_sink_kafka_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/provider/acceptance_sink_kafka_test.go b/pkg/provider/acceptance_sink_kafka_test.go index 787fabf4..43eb1adc 100644 --- a/pkg/provider/acceptance_sink_kafka_test.go +++ b/pkg/provider/acceptance_sink_kafka_test.go @@ -231,7 +231,7 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name } resource "materialize_table" "test_2" { - name = "%[3]s_2" + name = "%[3]s__2" column { name = "column_1" type = "text"