Skip to content

Commit

Permalink
Add source_id logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Sep 9, 2024
1 parent 37c17c8 commit 38e6299
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 18 deletions.
43 changes: 41 additions & 2 deletions pkg/materialize/source_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type SourceTableParams struct {
SourceName sql.NullString `db:"source_name"`
SourceSchemaName sql.NullString `db:"source_schema_name"`
SourceDatabaseName sql.NullString `db:"source_database_name"`
SourceType sql.NullString `db:"source_type"`
UpstreamName sql.NullString `db:"upstream_name"`
UpstreamSchemaName sql.NullString `db:"upstream_schema_name"`
TextColumns pq.StringArray `db:"text_columns"`
Expand All @@ -32,6 +33,10 @@ var sourceTableQuery = NewBaseQuery(`
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_sources.type AS source_type,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
Expand All @@ -40,6 +45,12 @@ var sourceTableQuery = NewBaseQuery(`
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN (
Expand Down Expand Up @@ -88,6 +99,8 @@ type SourceTableBuilder struct {
upstreamSchemaName string
textColumns []string
ignoreColumns []string
sourceType string
conn *sqlx.DB
}

func NewSourceTableBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceTableBuilder {
Expand All @@ -96,9 +109,31 @@ func NewSourceTableBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceTableBui
tableName: obj.Name,
schemaName: obj.SchemaName,
databaseName: obj.DatabaseName,
conn: conn,
}
}

func (b *SourceTableBuilder) GetSourceType() (string, error) {
if b.sourceType != "" {
return b.sourceType, nil
}

q := sourceQuery.QueryPredicate(map[string]string{
"mz_sources.name": b.source.Name,
"mz_schemas.name": b.source.SchemaName,
"mz_databases.name": b.source.DatabaseName,
})

var s SourceParams
if err := b.conn.Get(&s, q); err != nil {
return "", err
}

b.sourceType = s.SourceType.String

return b.sourceType, nil
}

func (b *SourceTableBuilder) QualifiedName() string {
return QualifiedName(b.databaseName, b.schemaName, b.tableName)
}
Expand Down Expand Up @@ -148,8 +183,12 @@ func (b *SourceTableBuilder) Create() error {
options = append(options, fmt.Sprintf(`TEXT COLUMNS (%s)`, s))
}

// TODO: Implement logic to only use IGNORE COLUMNS if the source is a MySQL source
if len(b.ignoreColumns) > 0 {
sourceType, err := b.GetSourceType()
if err != nil {
return err
}

if strings.EqualFold(sourceType, "mysql") && len(b.ignoreColumns) > 0 {
s := strings.Join(b.ignoreColumns, ", ")
options = append(options, fmt.Sprintf(`IGNORE COLUMNS (%s)`, s))
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/materialize/source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ var sourceTable = MaterializeObject{Name: "table", SchemaName: "schema", Databas

func TestSourceTableCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'`
testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "kafka")

mock.ExpectExec(
`CREATE TABLE "database"."schema"."table"
FROM SOURCE "materialize"."public"."source"
Expand All @@ -31,6 +34,50 @@ func TestSourceTableCreate(t *testing.T) {
})
}

func TestSourceTableCreateWithMySQLSource(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'`
testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "mysql")

mock.ExpectExec(
`CREATE TABLE "database"."schema"."table"
FROM SOURCE "materialize"."public"."source"
\(REFERENCE "upstream_schema"."upstream_table"\)
WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(ignore1, ignore2\)\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

b := NewSourceTableBuilder(db, sourceTable)
b.Source(IdentifierSchemaStruct{Name: "source", SchemaName: "public", DatabaseName: "materialize"})
b.UpstreamName("upstream_table")
b.UpstreamSchemaName("upstream_schema")
b.TextColumns([]string{"column1", "column2"})
b.IgnoreColumns([]string{"ignore1", "ignore2"})

if err := b.Create(); err != nil {
t.Fatal(err)
}
})
}

func TestGetSourceType(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'`
testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "KAFKA")

b := NewSourceTableBuilder(db, sourceTable)
b.Source(IdentifierSchemaStruct{Name: "source", SchemaName: "public", DatabaseName: "materialize"})

sourceType, err := b.GetSourceType()
if err != nil {
t.Fatal(err)
}

if sourceType != "KAFKA" {
t.Fatalf("Expected source type 'KAFKA', got '%s'", sourceType)
}
})
}

func TestSourceTableRename(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
Expand Down
31 changes: 23 additions & 8 deletions pkg/provider/acceptance_source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func TestAccSourceTablePostgres_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "text_columns.0", "updated_at"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "upstream_name", "table2"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "upstream_schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.name", nameSpace+"_source_postgres"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.database_name", "materialize"),
),
},
},
Expand All @@ -56,6 +60,10 @@ func TestAccSourceTableMySQL_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "upstream_schema_name", "shop"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.0", "banned"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.name", nameSpace+"_source_mysql"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.database_name", "materialize"),
// resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.#", "1"),
// resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.0", "about"),
),
Expand All @@ -80,6 +88,10 @@ func TestAccSourceTableLoadGen_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "upstream_name", "bids"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.name", nameSpace+"_loadgen"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.database_name", "materialize"),
),
},
},
Expand All @@ -102,6 +114,10 @@ func TestAccSourceTable_update(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test", "text_columns.#", "2"),
resource.TestCheckResourceAttr("materialize_source_table.test", "ownership_role", "mz_system"),
resource.TestCheckResourceAttr("materialize_source_table.test", "comment", ""),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.name", nameSpace+"_source"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.database_name", "materialize"),
),
},
{
Expand All @@ -113,6 +129,9 @@ func TestAccSourceTable_update(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test", "text_columns.#", "2"),
resource.TestCheckResourceAttr("materialize_source_table.test", "ownership_role", nameSpace+"_role"),
resource.TestCheckResourceAttr("materialize_source_table.test", "comment", "Updated comment"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.name", nameSpace+"_source"),
resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.schema_name", "public"),
),
},
},
Expand Down Expand Up @@ -153,8 +172,7 @@ func testAccSourceTablePostgresBasicResource(nameSpace string) string {
resource "materialize_connection_postgres" "postgres_connection" {
name = "%[1]s_connection_postgres"
// TODO: Change with container name once new image is available
host = "localhost"
host = "postgres"
port = 5432
user {
text = "postgres"
Expand Down Expand Up @@ -207,8 +225,7 @@ func testAccSourceTableMySQLBasicResource(nameSpace string) string {
resource "materialize_connection_mysql" "mysql_connection" {
name = "%[1]s_connection_mysql"
// TODO: Change with container name once new image is available
host = "localhost"
host = "mysql"
port = 3306
user {
text = "repluser"
Expand Down Expand Up @@ -283,8 +300,7 @@ func testAccSourceTableBasicResource(nameSpace string) string {
resource "materialize_connection_postgres" "postgres_connection" {
name = "%[1]s_connection"
// TODO: Change with container name once new image is available
host = "localhost"
host = "postgres"
port = 5432
user {
text = "postgres"
Expand Down Expand Up @@ -343,8 +359,7 @@ func testAccSourceTableResource(nameSpace, upstreamName, ownershipRole, comment
resource "materialize_connection_postgres" "postgres_connection" {
name = "%[1]s_connection"
// TODO: Change with container name once new image is available
host = "localhost"
host = "postgres"
port = 5432
user {
text = "postgres"
Expand Down
13 changes: 12 additions & 1 deletion pkg/resources/resource_source_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var sourceTableSchema = map[string]*schema.Schema{
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
ForceNew: true,
},
"comment": CommentSchema(false),
"ownership_role": OwnershipRoleSchema(),
Expand Down Expand Up @@ -169,8 +170,18 @@ func sourceTableRead(ctx context.Context, d *schema.ResourceData, meta interface
return diag.FromErr(err)
}

// TODO: Set source once the source_id is available in the mz_tables table
source := []interface{}{
map[string]interface{}{
"name": t.SourceName.String,
"schema_name": t.SourceSchemaName.String,
"database_name": t.SourceDatabaseName.String,
},
}
if err := d.Set("source", source); err != nil {
return diag.FromErr(err)
}

// TODO: Set the upstream_name and upstream_schema_name once supported on the Materialize side
// if err := d.Set("upstream_name", t.UpstreamName.String); err != nil {
// return diag.FromErr(err)
// }
Expand Down
49 changes: 42 additions & 7 deletions pkg/resources/resource_source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,56 @@ func TestResourceSourceTableCreate(t *testing.T) {
r.NotNil(d)

testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
// Expect source type query
sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'`
testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "mysql")

// Create
mock.ExpectExec(
`CREATE TABLE "database"."schema"."table"
FROM SOURCE "materialize"."public"."source"
\(REFERENCE "upstream_schema"."upstream_table"\)
WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(column3, column4\)\);`,
FROM SOURCE "materialize"."public"."source"
\(REFERENCE "upstream_schema"."upstream_table"\)
WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(column3, column4\)\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_tables.name = 'table'`
testhelpers.MockTableScan(mock, ip)
testhelpers.MockSourceTableScan(mock, ip)

// Query Params
pp := `WHERE mz_tables.id = 'u1'`
testhelpers.MockSourceTableScan(mock, pp)

if err := sourceTableCreate(context.TODO(), d, db); err != nil {
t.Fatal(err)
}
})
}

func TestResourceSourceTableCreateNonMySQL(t *testing.T) {
r := require.New(t)
d := schema.TestResourceDataRaw(t, SourceTable().Schema, inSourceTable)
r.NotNil(d)

testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
// Expect source type query
sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'`
testhelpers.MockSourceScan(mock, sourceTypeQuery)

// Create (without IGNORE COLUMNS)
mock.ExpectExec(`CREATE TABLE "database"."schema"."table"
FROM SOURCE "materialize"."public"."source"
\(REFERENCE "upstream_schema"."upstream_table"\)
WITH \(TEXT COLUMNS \(column1, column2\)\);`).
WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_tables.name = 'table'`
testhelpers.MockSourceTableScan(mock, ip)

// Query Params
pp := `WHERE mz_tables.id = 'u1'`
testhelpers.MockTableScan(mock, pp)
testhelpers.MockSourceTableScan(mock, pp)

if err := sourceTableCreate(context.TODO(), d, db); err != nil {
t.Fatal(err)
Expand All @@ -66,7 +101,7 @@ func TestResourceSourceTableRead(t *testing.T) {
testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
// Query Params
pp := `WHERE mz_tables.id = 'u1'`
testhelpers.MockTableScan(mock, pp)
testhelpers.MockSourceTableScan(mock, pp)

if err := sourceTableRead(context.TODO(), d, db); err != nil {
t.Fatal(err)
Expand All @@ -90,7 +125,7 @@ func TestResourceSourceTableUpdate(t *testing.T) {

// Query Params
pp := `WHERE mz_tables.id = 'u1'`
testhelpers.MockTableScan(mock, pp)
testhelpers.MockSourceTableScan(mock, pp)

if err := sourceTableUpdate(context.TODO(), d, db); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 38e6299

Please sign in to comment.