Skip to content

Commit

Permalink
Fix create sink with key statemets
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Nov 14, 2023
1 parent c2fc25c commit 249730a
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
1 change: 1 addition & 0 deletions integration/sink.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ resource "materialize_sink_kafka" "sink_kafka" {
envelope {
debezium = true
}
key = ["counter"]
}

output "qualified_sink_kafka" {
Expand Down
8 changes: 4 additions & 4 deletions pkg/materialize/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ func (b *SinkKafkaBuilder) Create() error {
q.WriteString(fmt.Sprintf(` INTO KAFKA CONNECTION %s`, b.kafkaConnection.QualifiedName()))
}

if b.topic != "" {
q.WriteString(fmt.Sprintf(` (TOPIC %s)`, QuoteString(b.topic)))
}

if len(b.key) > 0 {
o := strings.Join(b.key[:], ", ")
q.WriteString(fmt.Sprintf(` KEY (%s)`, o))
}

if b.topic != "" {
q.WriteString(fmt.Sprintf(` (TOPIC %s)`, QuoteString(b.topic)))
}

if b.format.Json {
q.WriteString(` FORMAT JSON`)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/materialize/sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestSinkKafkaCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`CREATE SINK "database"."schema"."sink" FROM "database"."schema"."table" INTO KAFKA CONNECTION "database"."schema"."kafka_connection" KEY \(key_1, key_2\) \(TOPIC 'test_avro_topic'\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."public"."csr_connection" ENVELOPE UPSERT WITH \( SIZE = 'xsmall' SNAPSHOT = false\);`,
`CREATE SINK "database"."schema"."sink" FROM "database"."schema"."table" INTO KAFKA CONNECTION "database"."schema"."kafka_connection" \(TOPIC 'test_avro_topic'\) KEY \(key_1, key_2\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."public"."csr_connection" ENVELOPE UPSERT WITH \( SIZE = 'xsmall' SNAPSHOT = false\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"}
Expand Down

0 comments on commit 249730a

Please sign in to comment.