diff --git a/docs/resources/sink_kafka.md b/docs/resources/sink_kafka.md
index b015b94c..826a4ed7 100644
--- a/docs/resources/sink_kafka.md
+++ b/docs/resources/sink_kafka.md
@@ -127,6 +127,8 @@ Required:
Optional:
+- `avro_doc_column` (Block List) **Private Preview** Add column level documentation comment to the generated Avro schemas. (see [below for nested schema](#nestedblock--format--avro--avro_doc_column))
+- `avro_doc_type` (Block List, Max: 1) **Private Preview** Add top level documentation comment to the generated Avro schemas. (see [below for nested schema](#nestedblock--format--avro--avro_doc_type))
- `avro_key_fullname` (String) The full name of the Avro key schema.
- `avro_value_fullname` (String) The full name of the Avro value schema.
@@ -142,6 +144,60 @@ Optional:
- `database_name` (String) The schema_registry_connection database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `schema_name` (String) The schema_registry_connection schema name. Defaults to `public`.
+
+
+### Nested Schema for `format.avro.avro_doc_column`
+
+Required:
+
+- `column` (String) Name of the column in the Avro schema to apply to.
+- `doc` (String) Documentation string.
+- `object` (Block List, Min: 1, Max: 1) The object to apply the Avro documentation. (see [below for nested schema](#nestedblock--format--avro--avro_doc_column--object))
+
+Optional:
+
+- `key` (Boolean) Applies to the key schema.
+- `value` (Boolean) Applies to the value schema.
+
+
+### Nested Schema for `format.avro.avro_doc_column.object`
+
+Required:
+
+- `name` (String) The object name.
+
+Optional:
+
+- `database_name` (String) The object database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
+- `schema_name` (String) The object schema name. Defaults to `public`.
+
+
+
+
+### Nested Schema for `format.avro.avro_doc_type`
+
+Required:
+
+- `doc` (String) Documentation string.
+- `object` (Block List, Min: 1, Max: 1) The object to apply the Avro documentation. (see [below for nested schema](#nestedblock--format--avro--avro_doc_type--object))
+
+Optional:
+
+- `key` (Boolean) Applies to the key schema.
+- `value` (Boolean) Applies to the value schema.
+
+
+### Nested Schema for `format.avro.avro_doc_type.object`
+
+Required:
+
+- `name` (String) The object name.
+
+Optional:
+
+- `database_name` (String) The object database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
+- `schema_name` (String) The object schema name. Defaults to `public`.
+
## Import
Import is supported using the following syntax:
diff --git a/integration/sink.tf b/integration/sink.tf
index 55fbdd14..beb03fea 100644
--- a/integration/sink.tf
+++ b/integration/sink.tf
@@ -24,6 +24,34 @@ resource "materialize_sink_kafka" "sink_kafka" {
database_name = materialize_connection_confluent_schema_registry.schema_registry.database_name
schema_name = materialize_connection_confluent_schema_registry.schema_registry.schema_name
}
+ avro_doc_type {
+ object {
+ name = materialize_source_load_generator.load_generator.name
+ database_name = materialize_source_load_generator.load_generator.database_name
+ schema_name = materialize_source_load_generator.load_generator.schema_name
+ }
+ doc = "top level comment"
+ }
+ avro_doc_column {
+ object {
+ name = materialize_source_load_generator.load_generator.name
+ database_name = materialize_source_load_generator.load_generator.database_name
+ schema_name = materialize_source_load_generator.load_generator.schema_name
+ }
+ column = "counter"
+ doc = "comment key"
+ key = true
+ }
+ avro_doc_column {
+ object {
+ name = materialize_source_load_generator.load_generator.name
+ database_name = materialize_source_load_generator.load_generator.database_name
+ schema_name = materialize_source_load_generator.load_generator.schema_name
+ }
+ column = "counter"
+ doc = "comment value"
+ value = true
+ }
}
}
envelope {
diff --git a/pkg/materialize/format_specs.go b/pkg/materialize/format_specs.go
index 321fe9bf..c9e54d4a 100644
--- a/pkg/materialize/format_specs.go
+++ b/pkg/materialize/format_specs.go
@@ -1,5 +1,20 @@
package materialize
+type AvroDocType struct {
+ Object IdentifierSchemaStruct
+ Doc string
+ Key bool
+ Value bool
+}
+
+type AvroDocColumn struct {
+ Object IdentifierSchemaStruct
+ Column string
+ Doc string
+ Key bool
+ Value bool
+}
+
type AvroFormatSpec struct {
SchemaRegistryConnection IdentifierSchemaStruct
KeyStrategy string
@@ -30,6 +45,8 @@ type SinkAvroFormatSpec struct {
SchemaRegistryConnection IdentifierSchemaStruct
AvroKeyFullname string
AvroValueFullname string
+ DocType AvroDocType
+ DocColumn []AvroDocColumn
}
type SinkFormatSpecStruct struct {
@@ -89,10 +106,42 @@ func GetSinkFormatSpecStruc(v interface{}) SinkFormatSpecStruct {
if csr, ok := avro.([]interface{})[0].(map[string]interface{})["schema_registry_connection"]; ok {
key := avro.([]interface{})[0].(map[string]interface{})["avro_key_fullname"].(string)
value := avro.([]interface{})[0].(map[string]interface{})["avro_value_fullname"].(string)
+
+ var docType AvroDocType
+ if adt, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_type"].([]interface{}); ok && len(adt) > 0 {
+ if v, ok := adt[0].(map[string]interface{})["object"]; ok {
+ docType.Object = GetIdentifierSchemaStruct(v)
+ }
+ if v, ok := adt[0].(map[string]interface{})["doc"]; ok {
+ docType.Doc = v.(string)
+ }
+ if v, ok := adt[0].(map[string]interface{})["key"]; ok {
+ docType.Key = v.(bool)
+ }
+ if v, ok := adt[0].(map[string]interface{})["value"]; ok {
+ docType.Value = v.(bool)
+ }
+ }
+
+ var docColumn []AvroDocColumn
+ if adc, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_column"]; ok {
+ for _, column := range adc.([]interface{}) {
+ docColumn = append(docColumn, AvroDocColumn{
+ Object: GetIdentifierSchemaStruct(column.(map[string]interface{})["object"]),
+ Column: column.(map[string]interface{})["column"].(string),
+ Doc: column.(map[string]interface{})["doc"].(string),
+ Key: column.(map[string]interface{})["key"].(bool),
+ Value: column.(map[string]interface{})["value"].(bool),
+ })
+ }
+ }
+
format.Avro = &SinkAvroFormatSpec{
SchemaRegistryConnection: GetIdentifierSchemaStruct(csr),
AvroKeyFullname: key,
AvroValueFullname: value,
+ DocType: docType,
+ DocColumn: docColumn,
}
}
}
diff --git a/pkg/materialize/sink_kafka.go b/pkg/materialize/sink_kafka.go
index 04fc79ba..a8351ae3 100644
--- a/pkg/materialize/sink_kafka.go
+++ b/pkg/materialize/sink_kafka.go
@@ -130,8 +130,45 @@ func (b *SinkKafkaBuilder) Create() error {
if b.format.Avro.SchemaRegistryConnection.Name != "" {
q.WriteString(fmt.Sprintf(` FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, b.format.Avro.SchemaRegistryConnection.QualifiedName()))
}
+
+ // CSR Connection Options
+ var v = []string{}
if b.format.Avro.AvroValueFullname != "" && b.format.Avro.AvroKeyFullname != "" {
- q.WriteString(fmt.Sprintf(` WITH (AVRO KEY FULLNAME %s AVRO VALUE FULLNAME %s)`, QuoteString(b.format.Avro.AvroKeyFullname), QuoteString(b.format.Avro.AvroValueFullname)))
+ v = append(v, fmt.Sprintf(`AVRO KEY FULLNAME %s AVRO VALUE FULLNAME %s`,
+ QuoteString(b.format.Avro.AvroKeyFullname),
+ QuoteString(b.format.Avro.AvroValueFullname)),
+ )
+ }
+
+ // Doc Type
+ if b.format.Avro.DocType.Object.Name != "" {
+ c := strings.Builder{}
+ if b.format.Avro.DocType.Key {
+ c.WriteString("KEY ")
+ } else if b.format.Avro.DocType.Value {
+ c.WriteString("VALUE ")
+ }
+ c.WriteString(fmt.Sprintf("DOC ON TYPE %[1]s = %[2]s",
+ b.format.Avro.DocType.Object.QualifiedName(),
+ QuoteString(b.format.Avro.DocType.Doc),
+ ))
+ v = append(v, c.String())
+ }
+
+ // Doc Column
+ for _, ac := range b.format.Avro.DocColumn {
+ c := strings.Builder{}
+ if ac.Key {
+ c.WriteString("KEY")
+ } else if ac.Value {
+ c.WriteString("VALUE")
+ }
+ f := b.from.QualifiedName() + "." + QuoteIdentifier(ac.Column)
+ c.WriteString(fmt.Sprintf(" DOC ON COLUMN %[1]s = %[2]s", f, QuoteString(ac.Doc)))
+ v = append(v, c.String())
+ }
+ if len(v) > 0 {
+ q.WriteString(fmt.Sprintf(` (%s)`, strings.Join(v[:], ", ")))
}
}
@@ -154,6 +191,5 @@ func (b *SinkKafkaBuilder) Create() error {
q.WriteString(fmt.Sprintf(` WITH (%s)`, strings.Join(withOptions, ", ")))
}
- q.WriteString(`;`)
return b.ddl.exec(q.String())
}
diff --git a/pkg/materialize/sink_kafka_test.go b/pkg/materialize/sink_kafka_test.go
index 3ec6e060..2c2666dd 100644
--- a/pkg/materialize/sink_kafka_test.go
+++ b/pkg/materialize/sink_kafka_test.go
@@ -15,8 +15,10 @@ import (
func TestSinkKafkaCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
- `CREATE SINK "database"."schema"."sink" FROM "database"."schema"."src"
- INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\)
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."src"
+ INTO KAFKA CONNECTION "database"."schema"."kafka_conn"
+ \(TOPIC 'testdrive-snk1-seed'\)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "materialize"."public"."csr_conn"
ENVELOPE DEBEZIUM;`,
).WillReturnResult(sqlmock.NewResult(1, 1))
@@ -48,7 +50,8 @@ func TestSinkKafkaCreate(t *testing.T) {
func TestSinkKafkaSnapshotCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
- `CREATE SINK "database"."schema"."sink" FROM "database"."schema"."src"
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."src"
INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "materialize"."public"."csr_conn"
ENVELOPE DEBEZIUM
@@ -83,7 +86,8 @@ func TestSinkKafkaSnapshotCreate(t *testing.T) {
func TestSinkKafkaSizeSnapshotCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
- `CREATE SINK "database"."schema"."sink" FROM "database"."schema"."src"
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."src"
INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "materialize"."public"."csr_conn"
ENVELOPE DEBEZIUM
@@ -119,7 +123,8 @@ func TestSinkKafkaSizeSnapshotCreate(t *testing.T) {
func TestSinkKafkaJsonCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
- `CREATE SINK "database"."schema"."sink" FROM "database"."schema"."src"
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."src"
INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\)
FORMAT JSON
ENVELOPE DEBEZIUM;`,
@@ -142,7 +147,8 @@ func TestSinkKafkaJsonCreate(t *testing.T) {
func TestSinkKafkaKeyCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
- `CREATE SINK "database"."schema"."sink" FROM "database"."schema"."src"
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."src"
INTO KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'testdrive-snk1-seed'\)
KEY \(b\)
FORMAT JSON
@@ -192,3 +198,164 @@ func TestSinkKafkaKeyNotEnforcedCreate(t *testing.T) {
}
})
}
+
+func TestSinkKafkaAvroDocsTypeCreate(t *testing.T) {
+ from := IdentifierSchemaStruct{Name: "table", SchemaName: "schema", DatabaseName: "database"}
+ testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
+ mock.ExpectExec(
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."table"
+ INTO KAFKA CONNECTION "database"."schema"."kafka_connection"
+ \(TOPIC 'testdrive-snk1-seed'\)
+ FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."public"."csr_connection"
+ \(KEY DOC ON TYPE "database"."schema"."table" = 'top-level comment'\)
+ ENVELOPE UPSERT WITH \(SIZE = 'xsmall'\);`,
+ ).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"}
+ b := NewSinkKafkaBuilder(db, o)
+ b.Size("xsmall")
+ b.From(from)
+ b.KafkaConnection(IdentifierSchemaStruct{
+ Name: "kafka_connection",
+ SchemaName: "schema",
+ DatabaseName: "database",
+ })
+ b.Topic("testdrive-snk1-seed")
+ b.Format(SinkFormatSpecStruct{
+ Avro: &SinkAvroFormatSpec{
+ SchemaRegistryConnection: IdentifierSchemaStruct{
+ Name: "csr_connection",
+ DatabaseName: "database",
+ SchemaName: "public",
+ },
+ DocType: AvroDocType{
+ Object: from,
+ Doc: "top-level comment",
+ Key: true,
+ },
+ },
+ })
+ b.Envelope(KafkaSinkEnvelopeStruct{Upsert: true})
+
+ if err := b.Create(); err != nil {
+ t.Fatal(err)
+ }
+ })
+}
+
+func TestSinkKafkaAvroDocsColumnCreate(t *testing.T) {
+ from := IdentifierSchemaStruct{Name: "table", SchemaName: "schema", DatabaseName: "database"}
+ testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
+ mock.ExpectExec(
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."table"
+ INTO KAFKA CONNECTION "database"."schema"."kafka_connection"
+ \(TOPIC 'testdrive-snk1-seed'\)
+ FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."public"."csr_connection"
+ \(KEY DOC ON COLUMN "database"."schema"."table"."c1" = 'comment on column only in key schema',
+ VALUE DOC ON COLUMN "database"."schema"."table"."c2" = 'comment on column only in value schema'\)
+ ENVELOPE UPSERT WITH \(SIZE = 'xsmall'\);`,
+ ).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"}
+ b := NewSinkKafkaBuilder(db, o)
+ b.Size("xsmall")
+ b.From(from)
+ b.KafkaConnection(IdentifierSchemaStruct{
+ Name: "kafka_connection",
+ SchemaName: "schema",
+ DatabaseName: "database",
+ })
+ b.Topic("testdrive-snk1-seed")
+ b.Format(SinkFormatSpecStruct{
+ Avro: &SinkAvroFormatSpec{
+ SchemaRegistryConnection: IdentifierSchemaStruct{
+ Name: "csr_connection",
+ DatabaseName: "database",
+ SchemaName: "public",
+ },
+ DocColumn: []AvroDocColumn{
+ {
+ Object: from,
+ Doc: "comment on column only in key schema",
+ Column: "c1",
+ Key: true,
+ },
+ {
+ Object: from,
+ Column: "c2",
+ Doc: "comment on column only in value schema",
+ Value: true,
+ },
+ },
+ },
+ })
+ b.Envelope(KafkaSinkEnvelopeStruct{Upsert: true})
+
+ if err := b.Create(); err != nil {
+ t.Fatal(err)
+ }
+ })
+}
+
+func TestSinkKafkaAvroDocsCreate(t *testing.T) {
+ from := IdentifierSchemaStruct{Name: "table", SchemaName: "schema", DatabaseName: "database"}
+ testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
+ mock.ExpectExec(
+ `CREATE SINK "database"."schema"."sink"
+ FROM "database"."schema"."table"
+ INTO KAFKA CONNECTION "database"."schema"."kafka_connection"
+ \(TOPIC 'testdrive-snk1-seed'\)
+ FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."public"."csr_connection"
+ \(VALUE DOC ON TYPE "database"."schema"."table" = 'top-level comment',
+ KEY DOC ON COLUMN "database"."schema"."table"."c1" = 'comment on column only in key schema',
+ VALUE DOC ON COLUMN "database"."schema"."table"."c2" = 'comment on column only in value schema'\)
+ ENVELOPE UPSERT WITH \(SIZE = 'xsmall'\);`,
+ ).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"}
+ b := NewSinkKafkaBuilder(db, o)
+ b.Size("xsmall")
+ b.From(from)
+ b.KafkaConnection(IdentifierSchemaStruct{
+ Name: "kafka_connection",
+ SchemaName: "schema",
+ DatabaseName: "database",
+ })
+ b.Topic("testdrive-snk1-seed")
+ b.Format(SinkFormatSpecStruct{
+ Avro: &SinkAvroFormatSpec{
+ SchemaRegistryConnection: IdentifierSchemaStruct{
+ Name: "csr_connection",
+ DatabaseName: "database",
+ SchemaName: "public",
+ },
+ DocType: AvroDocType{
+ Object: from,
+ Doc: "top-level comment",
+ Value: true,
+ },
+ DocColumn: []AvroDocColumn{
+ {
+ Object: from,
+ Doc: "comment on column only in key schema",
+ Column: "c1",
+ Key: true,
+ },
+ {
+ Object: from,
+ Column: "c2",
+ Doc: "comment on column only in value schema",
+ Value: true,
+ },
+ },
+ },
+ })
+ b.Envelope(KafkaSinkEnvelopeStruct{Upsert: true})
+
+ if err := b.Create(); err != nil {
+ t.Fatal(err)
+ }
+ })
+}
diff --git a/pkg/provider/acceptance_sink_kafka_test.go b/pkg/provider/acceptance_sink_kafka_test.go
index e201fd7d..a4baa15e 100644
--- a/pkg/provider/acceptance_sink_kafka_test.go
+++ b/pkg/provider/acceptance_sink_kafka_test.go
@@ -50,6 +50,55 @@ func TestAccSinkKafka_basic(t *testing.T) {
})
}
+func TestAccSinkKafkaAvro_basic(t *testing.T) {
+ sinkName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: testAccProviderFactories,
+ CheckDestroy: nil,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccSinkKafkaAvroResource(sinkName),
+ Check: resource.ComposeTestCheckFunc(
+ testAccCheckSinkKafkaExists("materialize_sink_kafka.test"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "name", sinkName+"_sink"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "database_name", "materialize"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "schema_name", "public"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sinkName+"_sink")),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "cluster_name", sinkName+"_cluster"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "topic", "topic1"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "key.0", "counter"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "key_not_enforced", "true"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.name", sinkName+"_conn_schema"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.name", sinkName+"_load_gen"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.doc", "top level comment"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.name", sinkName+"_load_gen"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.column", "counter"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.doc", "comment key"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.key", "true"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.name", sinkName+"_load_gen"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.column", "counter"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.doc", "comment value"),
+ resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.value", "true"),
+ ),
+ },
+ {
+ ResourceName: "materialize_sink_kafka.test",
+ ImportState: true,
+ ImportStateVerify: false,
+ },
+ },
+ })
+}
+
func TestAccSinkKafka_update(t *testing.T) {
slug := acctest.RandStringFromCharSet(5, acctest.CharSetAlpha)
sinkName := fmt.Sprintf("old_%s", slug)
@@ -185,6 +234,7 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name
envelope {
debezium = true
}
+
ownership_role = "%[6]s"
comment = "%[7]s"
@@ -193,6 +243,93 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name
`, roleName, connName, tableName, sinkName, sink2Name, sinkOwner, comment)
}
+func testAccSinkKafkaAvroResource(sinkName string) string {
+ return fmt.Sprintf(`
+ resource "materialize_cluster" "test" {
+ name = "%[1]s_cluster"
+ size = "3xsmall"
+ }
+
+ resource "materialize_source_load_generator" "test" {
+ name = "%[1]s_load_gen"
+ size = "3xsmall"
+ load_generator_type = "COUNTER"
+ }
+
+ resource "materialize_connection_kafka" "test" {
+ name = "%[1]s_conn"
+ security_protocol = "PLAINTEXT"
+ kafka_broker {
+ broker = "redpanda:9092"
+ }
+ validate = true
+ }
+
+ resource "materialize_connection_confluent_schema_registry" "test" {
+ name = "%[1]s_conn_schema"
+ url = "http://redpanda:8081"
+ }
+
+ resource "materialize_sink_kafka" "test" {
+ name = "%[1]s_sink"
+ cluster_name = materialize_cluster.test.name
+ topic = "topic1"
+ key = ["counter"]
+ key_not_enforced = true
+ from {
+ name = materialize_source_load_generator.test.name
+ database_name = materialize_source_load_generator.test.database_name
+ schema_name = materialize_source_load_generator.test.schema_name
+ }
+ kafka_connection {
+ name = materialize_connection_kafka.test.name
+ database_name = materialize_connection_kafka.test.database_name
+ schema_name = materialize_connection_kafka.test.schema_name
+ }
+ format {
+ avro {
+ schema_registry_connection {
+ name = materialize_connection_confluent_schema_registry.test.name
+ database_name = materialize_connection_confluent_schema_registry.test.database_name
+ schema_name = materialize_connection_confluent_schema_registry.test.schema_name
+ }
+ avro_doc_type {
+ object {
+ name = materialize_source_load_generator.test.name
+ database_name = materialize_source_load_generator.test.database_name
+ schema_name = materialize_source_load_generator.test.schema_name
+ }
+ doc = "top level comment"
+ }
+ avro_doc_column {
+ object {
+ name = materialize_source_load_generator.test.name
+ database_name = materialize_source_load_generator.test.database_name
+ schema_name = materialize_source_load_generator.test.schema_name
+ }
+ column = "counter"
+ doc = "comment key"
+ key = true
+ }
+ avro_doc_column {
+ object {
+ name = materialize_source_load_generator.test.name
+ database_name = materialize_source_load_generator.test.database_name
+ schema_name = materialize_source_load_generator.test.schema_name
+ }
+ column = "counter"
+ doc = "comment value"
+ value = true
+ }
+ }
+ }
+ envelope {
+ debezium = true
+ }
+ }
+ `, sinkName)
+}
+
func testAccCheckSinkKafkaExists(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
db := testAccProvider.Meta().(*sqlx.DB)
diff --git a/pkg/resources/resource_sink_kafka_test.go b/pkg/resources/resource_sink_kafka_test.go
index f31724e9..a11a864d 100644
--- a/pkg/resources/resource_sink_kafka_test.go
+++ b/pkg/resources/resource_sink_kafka_test.go
@@ -42,6 +42,44 @@ var inSinkKafka = map[string]interface{}{
"schema_name": "schema",
},
},
+ "avro_doc_type": []interface{}{
+ map[string]interface{}{
+ "object": []interface{}{
+ map[string]interface{}{
+ "name": "item",
+ "schema_name": "public",
+ "database_name": "database",
+ },
+ },
+ "doc": "top-level comment",
+ },
+ },
+ "avro_doc_column": []interface{}{
+ map[string]interface{}{
+ "object": []interface{}{
+ map[string]interface{}{
+ "name": "item",
+ "schema_name": "public",
+ "database_name": "database",
+ },
+ },
+ "column": "c1",
+ "doc": "comment on column only in key schema",
+ "key": true,
+ },
+ map[string]interface{}{
+ "object": []interface{}{
+ map[string]interface{}{
+ "name": "item",
+ "schema_name": "public",
+ "database_name": "database",
+ },
+ },
+ "column": "c1",
+ "doc": "comment on column only in value schema",
+ "value": true,
+ },
+ },
},
},
},
@@ -63,7 +101,10 @@ func TestResourceSinkKafkaCreate(t *testing.T) {
INTO KAFKA CONNECTION "materialize"."public"."kafka_conn"
\(TOPIC 'topic'\) KEY \(key_1, key_2\)
NOT ENFORCED FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn"
- WITH \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname'\)
+ \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname',
+ DOC ON TYPE "database"."public"."item" = 'top-level comment',
+ KEY DOC ON COLUMN "database"."public"."item"."c1" = 'comment on column only in key schema',
+ VALUE DOC ON COLUMN "database"."public"."item"."c1" = 'comment on column only in value schema'\)
ENVELOPE UPSERT WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))
diff --git a/pkg/resources/schema.go b/pkg/resources/schema.go
index 4f47ef9a..2e219445 100644
--- a/pkg/resources/schema.go
+++ b/pkg/resources/schema.go
@@ -285,6 +285,66 @@ func SinkFormatSpecSchema(elem string, description string, required bool) *schem
Optional: true,
ForceNew: true,
},
+ "avro_doc_type": {
+ Description: "**Private Preview** Add top level documentation comment to the generated Avro schemas.",
+ Type: schema.TypeList,
+ MinItems: 1,
+ MaxItems: 1,
+ Optional: true,
+ ForceNew: true,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "object": IdentifierSchema("object", "The object to apply the Avro documentation.", true),
+ "doc": {
+ Description: "Documentation string.",
+ Type: schema.TypeString,
+ Required: true,
+ },
+ "key": {
+ Description: "Applies to the key schema.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "value": {
+ Description: "Applies to the value schema.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ },
+ },
+ },
+ "avro_doc_column": {
+ Description: "**Private Preview** Add column level documentation comment to the generated Avro schemas.",
+ Type: schema.TypeList,
+ MinItems: 1,
+ Optional: true,
+ ForceNew: true,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "object": IdentifierSchema("object", "The object to apply the Avro documentation.", true),
+ "column": {
+ Description: "Name of the column in the Avro schema to apply to.",
+ Type: schema.TypeString,
+ Required: true,
+ },
+ "doc": {
+ Description: "Documentation string.",
+ Type: schema.TypeString,
+ Required: true,
+ },
+ "key": {
+ Description: "Applies to the key schema.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "value": {
+ Description: "Applies to the value schema.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ },
+ },
+ },
},
},
},