Skip to content

Commit

Permalink
Add new kafk acolumns from mz_kafka_source_tables
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Oct 28, 2024
1 parent ae709ec commit 79f725e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
76 changes: 41 additions & 35 deletions pkg/materialize/source_table_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,50 @@ import (

type SourceTableKafkaParams struct {
SourceTableParams
EnvelopeType string `db:"envelope_type"`
KeyFormat string `db:"key_format"`
ValueFormat string `db:"value_format"`
}

var sourceTableKafkaQuery = `
SELECT
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_kafka_source_tables.topic AS upstream_table_name,
mz_sources.type AS source_type,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
LEFT JOIN mz_internal.mz_kafka_source_tables
ON mz_tables.id = mz_kafka_source_tables.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN (
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
) comments
ON mz_tables.id = comments.id
SELECT
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_kafka_source_tables.topic AS upstream_table_name,
mz_kafka_source_tables.envelope_type,
mz_kafka_source_tables.key_format,
mz_kafka_source_tables.value_format,
mz_sources.type AS source_type,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
LEFT JOIN mz_internal.mz_kafka_source_tables
ON mz_tables.id = mz_kafka_source_tables.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN (
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
) comments
ON mz_tables.id = comments.id
`

func SourceTableKafkaId(conn *sqlx.DB, obj MaterializeObject) (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/resources/resource_source_table_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ func sourceTableKafkaRead(ctx context.Context, d *schema.ResourceData, meta inte
return diag.FromErr(err)
}

// TODO: include envelope_type, key_format and value_format from mz_internal.mz_kafka_source_tables

if err := d.Set("ownership_role", t.OwnerName.String); err != nil {
return diag.FromErr(err)
}
Expand Down

0 comments on commit 79f725e

Please sign in to comment.