Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Avro Doc Comments materialize_sink_kafka #373

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/resources/sink_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ Required:

Optional:

- `avro_doc_column` (Block List) **Private Preview** Add column level documentation comment to the generated Avro schemas. (see [below for nested schema](#nestedblock--format--avro--avro_doc_column))
- `avro_doc_type` (Block List, Max: 1) **Private Preview** Add top level documentation comment to the generated Avro schemas. (see [below for nested schema](#nestedblock--format--avro--avro_doc_type))
- `avro_key_fullname` (String) The full name of the Avro key schema.
- `avro_value_fullname` (String) The full name of the Avro value schema.

Expand All @@ -142,6 +144,60 @@ Optional:
- `database_name` (String) The schema_registry_connection database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `schema_name` (String) The schema_registry_connection schema name. Defaults to `public`.


<a id="nestedblock--format--avro--avro_doc_column"></a>
### Nested Schema for `format.avro.avro_doc_column`

Required:

- `column` (String) Name of the column in the Avro schema to apply to.
- `doc` (String) Documentation string.
- `object` (Block List, Min: 1, Max: 1) The object to apply the Avro documentation. (see [below for nested schema](#nestedblock--format--avro--avro_doc_column--object))

Optional:

- `key` (Boolean) Applies to the key schema.
- `value` (Boolean) Applies to the value schema.

<a id="nestedblock--format--avro--avro_doc_column--object"></a>
### Nested Schema for `format.avro.avro_doc_column.object`

Required:

- `name` (String) The object name.

Optional:

- `database_name` (String) The object database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `schema_name` (String) The object schema name. Defaults to `public`.



<a id="nestedblock--format--avro--avro_doc_type"></a>
### Nested Schema for `format.avro.avro_doc_type`

Required:

- `doc` (String) Documentation string.
- `object` (Block List, Min: 1, Max: 1) The object to apply the Avro documentation. (see [below for nested schema](#nestedblock--format--avro--avro_doc_type--object))

Optional:

- `key` (Boolean) Applies to the key schema.
- `value` (Boolean) Applies to the value schema.

<a id="nestedblock--format--avro--avro_doc_type--object"></a>
### Nested Schema for `format.avro.avro_doc_type.object`

Required:

- `name` (String) The object name.

Optional:

- `database_name` (String) The object database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `schema_name` (String) The object schema name. Defaults to `public`.

## Import

Import is supported using the following syntax:
Expand Down
28 changes: 28 additions & 0 deletions integration/sink.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,34 @@ resource "materialize_sink_kafka" "sink_kafka" {
database_name = materialize_connection_confluent_schema_registry.schema_registry.database_name
schema_name = materialize_connection_confluent_schema_registry.schema_registry.schema_name
}
avro_doc_type {
object {
name = materialize_source_load_generator.load_generator.name
database_name = materialize_source_load_generator.load_generator.database_name
schema_name = materialize_source_load_generator.load_generator.schema_name
}
doc = "top level comment"
}
avro_doc_column {
object {
name = materialize_source_load_generator.load_generator.name
database_name = materialize_source_load_generator.load_generator.database_name
schema_name = materialize_source_load_generator.load_generator.schema_name
}
column = "counter"
doc = "comment key"
key = true
}
avro_doc_column {
object {
name = materialize_source_load_generator.load_generator.name
database_name = materialize_source_load_generator.load_generator.database_name
schema_name = materialize_source_load_generator.load_generator.schema_name
}
column = "counter"
doc = "comment value"
value = true
}
}
}
envelope {
Expand Down
49 changes: 49 additions & 0 deletions pkg/materialize/format_specs.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
package materialize

type AvroDocType struct {
Object IdentifierSchemaStruct
Doc string
Key bool
Value bool
}

type AvroDocColumn struct {
Object IdentifierSchemaStruct
Column string
Doc string
Key bool
Value bool
}

type AvroFormatSpec struct {
SchemaRegistryConnection IdentifierSchemaStruct
KeyStrategy string
Expand Down Expand Up @@ -30,6 +45,8 @@ type SinkAvroFormatSpec struct {
SchemaRegistryConnection IdentifierSchemaStruct
AvroKeyFullname string
AvroValueFullname string
DocType AvroDocType
DocColumn []AvroDocColumn
}

type SinkFormatSpecStruct struct {
Expand Down Expand Up @@ -89,10 +106,42 @@ func GetSinkFormatSpecStruc(v interface{}) SinkFormatSpecStruct {
if csr, ok := avro.([]interface{})[0].(map[string]interface{})["schema_registry_connection"]; ok {
key := avro.([]interface{})[0].(map[string]interface{})["avro_key_fullname"].(string)
value := avro.([]interface{})[0].(map[string]interface{})["avro_value_fullname"].(string)

var docType AvroDocType
if adt, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_type"].([]interface{}); ok && len(adt) > 0 {
if v, ok := adt[0].(map[string]interface{})["object"]; ok {
docType.Object = GetIdentifierSchemaStruct(v)
}
if v, ok := adt[0].(map[string]interface{})["doc"]; ok {
docType.Doc = v.(string)
}
if v, ok := adt[0].(map[string]interface{})["key"]; ok {
docType.Key = v.(bool)
}
if v, ok := adt[0].(map[string]interface{})["value"]; ok {
docType.Value = v.(bool)
}
}

var docColumn []AvroDocColumn
if adc, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_column"]; ok {
for _, column := range adc.([]interface{}) {
docColumn = append(docColumn, AvroDocColumn{
Object: GetIdentifierSchemaStruct(column.(map[string]interface{})["object"]),
Column: column.(map[string]interface{})["column"].(string),
Doc: column.(map[string]interface{})["doc"].(string),
Key: column.(map[string]interface{})["key"].(bool),
Value: column.(map[string]interface{})["value"].(bool),
})
}
}

format.Avro = &SinkAvroFormatSpec{
SchemaRegistryConnection: GetIdentifierSchemaStruct(csr),
AvroKeyFullname: key,
AvroValueFullname: value,
DocType: docType,
DocColumn: docColumn,
}
}
}
Expand Down
40 changes: 38 additions & 2 deletions pkg/materialize/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,45 @@ func (b *SinkKafkaBuilder) Create() error {
if b.format.Avro.SchemaRegistryConnection.Name != "" {
q.WriteString(fmt.Sprintf(` FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, b.format.Avro.SchemaRegistryConnection.QualifiedName()))
}

// CSR Connection Options
var v = []string{}
if b.format.Avro.AvroValueFullname != "" && b.format.Avro.AvroKeyFullname != "" {
q.WriteString(fmt.Sprintf(` WITH (AVRO KEY FULLNAME %s AVRO VALUE FULLNAME %s)`, QuoteString(b.format.Avro.AvroKeyFullname), QuoteString(b.format.Avro.AvroValueFullname)))
v = append(v, fmt.Sprintf(`AVRO KEY FULLNAME %s AVRO VALUE FULLNAME %s`,
QuoteString(b.format.Avro.AvroKeyFullname),
QuoteString(b.format.Avro.AvroValueFullname)),
)
}

// Doc Type
if b.format.Avro.DocType.Object.Name != "" {
c := strings.Builder{}
if b.format.Avro.DocType.Key {
c.WriteString("KEY ")
} else if b.format.Avro.DocType.Value {
c.WriteString("VALUE ")
}
c.WriteString(fmt.Sprintf("DOC ON TYPE %[1]s = %[2]s",
b.format.Avro.DocType.Object.QualifiedName(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on docs, unsure if we need to also support aliases for the object reference. Or if it should just always make the connection.

QuoteString(b.format.Avro.DocType.Doc),
))
v = append(v, c.String())
}

// Doc Column
for _, ac := range b.format.Avro.DocColumn {
c := strings.Builder{}
if ac.Key {
c.WriteString("KEY")
} else if ac.Value {
c.WriteString("VALUE")
}
f := b.from.QualifiedName() + "." + QuoteIdentifier(ac.Column)
c.WriteString(fmt.Sprintf(" DOC ON COLUMN %[1]s = %[2]s", f, QuoteString(ac.Doc)))
v = append(v, c.String())
}
if len(v) > 0 {
q.WriteString(fmt.Sprintf(` (%s)`, strings.Join(v[:], ", ")))
}
}

Expand All @@ -154,6 +191,5 @@ func (b *SinkKafkaBuilder) Create() error {
q.WriteString(fmt.Sprintf(` WITH (%s)`, strings.Join(withOptions, ", ")))
}

q.WriteString(`;`)
return b.ddl.exec(q.String())
}
Loading