From 38e6299567379bc07b747a605dda52159ea6ec72 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Mon, 9 Sep 2024 14:41:49 +0300 Subject: [PATCH] Add source_id logic --- pkg/materialize/source_table.go | 43 ++++++++++- pkg/materialize/source_table_test.go | 47 ++++++++++++ pkg/provider/acceptance_source_table_test.go | 31 ++++++-- pkg/resources/resource_source_table.go | 13 +++- pkg/resources/resource_source_table_test.go | 49 ++++++++++-- pkg/testhelpers/mock_scans.go | 80 ++++++++++++++++++++ 6 files changed, 245 insertions(+), 18 deletions(-) diff --git a/pkg/materialize/source_table.go b/pkg/materialize/source_table.go index 3268d348..287660b5 100644 --- a/pkg/materialize/source_table.go +++ b/pkg/materialize/source_table.go @@ -17,6 +17,7 @@ type SourceTableParams struct { SourceName sql.NullString `db:"source_name"` SourceSchemaName sql.NullString `db:"source_schema_name"` SourceDatabaseName sql.NullString `db:"source_database_name"` + SourceType sql.NullString `db:"source_type"` UpstreamName sql.NullString `db:"upstream_name"` UpstreamSchemaName sql.NullString `db:"upstream_schema_name"` TextColumns pq.StringArray `db:"text_columns"` @@ -32,6 +33,10 @@ var sourceTableQuery = NewBaseQuery(` mz_tables.name, mz_schemas.name AS schema_name, mz_databases.name AS database_name, + mz_sources.name AS source_name, + source_schemas.name AS source_schema_name, + source_databases.name AS source_database_name, + mz_sources.type AS source_type, comments.comment AS comment, mz_roles.name AS owner_name, mz_tables.privileges @@ -40,6 +45,12 @@ var sourceTableQuery = NewBaseQuery(` ON mz_tables.schema_id = mz_schemas.id JOIN mz_databases ON mz_schemas.database_id = mz_databases.id + JOIN mz_sources + ON mz_tables.source_id = mz_sources.id + JOIN mz_schemas AS source_schemas + ON mz_sources.schema_id = source_schemas.id + JOIN mz_databases AS source_databases + ON source_schemas.database_id = source_databases.id JOIN mz_roles ON mz_tables.owner_id = mz_roles.id LEFT JOIN ( @@ -88,6 +99,8 @@ type SourceTableBuilder struct { upstreamSchemaName string textColumns []string ignoreColumns []string + sourceType string + conn *sqlx.DB } func NewSourceTableBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceTableBuilder { @@ -96,9 +109,31 @@ func NewSourceTableBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceTableBui tableName: obj.Name, schemaName: obj.SchemaName, databaseName: obj.DatabaseName, + conn: conn, } } +func (b *SourceTableBuilder) GetSourceType() (string, error) { + if b.sourceType != "" { + return b.sourceType, nil + } + + q := sourceQuery.QueryPredicate(map[string]string{ + "mz_sources.name": b.source.Name, + "mz_schemas.name": b.source.SchemaName, + "mz_databases.name": b.source.DatabaseName, + }) + + var s SourceParams + if err := b.conn.Get(&s, q); err != nil { + return "", err + } + + b.sourceType = s.SourceType.String + + return b.sourceType, nil +} + func (b *SourceTableBuilder) QualifiedName() string { return QualifiedName(b.databaseName, b.schemaName, b.tableName) } @@ -148,8 +183,12 @@ func (b *SourceTableBuilder) Create() error { options = append(options, fmt.Sprintf(`TEXT COLUMNS (%s)`, s)) } - // TODO: Implement logic to only use IGNORE COLUMNS if the source is a MySQL source - if len(b.ignoreColumns) > 0 { + sourceType, err := b.GetSourceType() + if err != nil { + return err + } + + if strings.EqualFold(sourceType, "mysql") && len(b.ignoreColumns) > 0 { s := strings.Join(b.ignoreColumns, ", ") options = append(options, fmt.Sprintf(`IGNORE COLUMNS (%s)`, s)) } diff --git a/pkg/materialize/source_table_test.go b/pkg/materialize/source_table_test.go index 384fe815..79360f07 100644 --- a/pkg/materialize/source_table_test.go +++ b/pkg/materialize/source_table_test.go @@ -12,6 +12,9 @@ var sourceTable = MaterializeObject{Name: "table", SchemaName: "schema", Databas func TestSourceTableCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'` + testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "kafka") + mock.ExpectExec( `CREATE TABLE "database"."schema"."table" FROM SOURCE "materialize"."public"."source" @@ -31,6 +34,50 @@ func TestSourceTableCreate(t *testing.T) { }) } +func TestSourceTableCreateWithMySQLSource(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'` + testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "mysql") + + mock.ExpectExec( + `CREATE TABLE "database"."schema"."table" + FROM SOURCE "materialize"."public"."source" + \(REFERENCE "upstream_schema"."upstream_table"\) + WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(ignore1, ignore2\)\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceTableBuilder(db, sourceTable) + b.Source(IdentifierSchemaStruct{Name: "source", SchemaName: "public", DatabaseName: "materialize"}) + b.UpstreamName("upstream_table") + b.UpstreamSchemaName("upstream_schema") + b.TextColumns([]string{"column1", "column2"}) + b.IgnoreColumns([]string{"ignore1", "ignore2"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestGetSourceType(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'` + testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "KAFKA") + + b := NewSourceTableBuilder(db, sourceTable) + b.Source(IdentifierSchemaStruct{Name: "source", SchemaName: "public", DatabaseName: "materialize"}) + + sourceType, err := b.GetSourceType() + if err != nil { + t.Fatal(err) + } + + if sourceType != "KAFKA" { + t.Fatalf("Expected source type 'KAFKA', got '%s'", sourceType) + } + }) +} + func TestSourceTableRename(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { mock.ExpectExec( diff --git a/pkg/provider/acceptance_source_table_test.go b/pkg/provider/acceptance_source_table_test.go index a557adc7..de133107 100644 --- a/pkg/provider/acceptance_source_table_test.go +++ b/pkg/provider/acceptance_source_table_test.go @@ -31,6 +31,10 @@ func TestAccSourceTablePostgres_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "text_columns.0", "updated_at"), resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "upstream_name", "table2"), resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "upstream_schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.#", "1"), + resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.name", nameSpace+"_source_postgres"), + resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_table.test_postgres", "source.0.database_name", "materialize"), ), }, }, @@ -56,6 +60,10 @@ func TestAccSourceTableMySQL_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "upstream_schema_name", "shop"), resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.#", "1"), resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.0", "banned"), + resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.#", "1"), + resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.name", nameSpace+"_source_mysql"), + resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "source.0.database_name", "materialize"), // resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.#", "1"), // resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.0", "about"), ), @@ -80,6 +88,10 @@ func TestAccSourceTableLoadGen_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "database_name", "materialize"), resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "schema_name", "public"), resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "upstream_name", "bids"), + resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.#", "1"), + resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.name", nameSpace+"_loadgen"), + resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_table.test_loadgen", "source.0.database_name", "materialize"), ), }, }, @@ -102,6 +114,10 @@ func TestAccSourceTable_update(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_table.test", "text_columns.#", "2"), resource.TestCheckResourceAttr("materialize_source_table.test", "ownership_role", "mz_system"), resource.TestCheckResourceAttr("materialize_source_table.test", "comment", ""), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.#", "1"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.name", nameSpace+"_source"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.database_name", "materialize"), ), }, { @@ -113,6 +129,9 @@ func TestAccSourceTable_update(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_table.test", "text_columns.#", "2"), resource.TestCheckResourceAttr("materialize_source_table.test", "ownership_role", nameSpace+"_role"), resource.TestCheckResourceAttr("materialize_source_table.test", "comment", "Updated comment"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.#", "1"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.name", nameSpace+"_source"), + resource.TestCheckResourceAttr("materialize_source_table.test", "source.0.schema_name", "public"), ), }, }, @@ -153,8 +172,7 @@ func testAccSourceTablePostgresBasicResource(nameSpace string) string { resource "materialize_connection_postgres" "postgres_connection" { name = "%[1]s_connection_postgres" - // TODO: Change with container name once new image is available - host = "localhost" + host = "postgres" port = 5432 user { text = "postgres" @@ -207,8 +225,7 @@ func testAccSourceTableMySQLBasicResource(nameSpace string) string { resource "materialize_connection_mysql" "mysql_connection" { name = "%[1]s_connection_mysql" - // TODO: Change with container name once new image is available - host = "localhost" + host = "mysql" port = 3306 user { text = "repluser" @@ -283,8 +300,7 @@ func testAccSourceTableBasicResource(nameSpace string) string { resource "materialize_connection_postgres" "postgres_connection" { name = "%[1]s_connection" - // TODO: Change with container name once new image is available - host = "localhost" + host = "postgres" port = 5432 user { text = "postgres" @@ -343,8 +359,7 @@ func testAccSourceTableResource(nameSpace, upstreamName, ownershipRole, comment resource "materialize_connection_postgres" "postgres_connection" { name = "%[1]s_connection" - // TODO: Change with container name once new image is available - host = "localhost" + host = "postgres" port = 5432 user { text = "postgres" diff --git a/pkg/resources/resource_source_table.go b/pkg/resources/resource_source_table.go index 063fb103..93291025 100644 --- a/pkg/resources/resource_source_table.go +++ b/pkg/resources/resource_source_table.go @@ -47,6 +47,7 @@ var sourceTableSchema = map[string]*schema.Schema{ Type: schema.TypeList, Elem: &schema.Schema{Type: schema.TypeString}, Optional: true, + ForceNew: true, }, "comment": CommentSchema(false), "ownership_role": OwnershipRoleSchema(), @@ -169,8 +170,18 @@ func sourceTableRead(ctx context.Context, d *schema.ResourceData, meta interface return diag.FromErr(err) } - // TODO: Set source once the source_id is available in the mz_tables table + source := []interface{}{ + map[string]interface{}{ + "name": t.SourceName.String, + "schema_name": t.SourceSchemaName.String, + "database_name": t.SourceDatabaseName.String, + }, + } + if err := d.Set("source", source); err != nil { + return diag.FromErr(err) + } + // TODO: Set the upstream_name and upstream_schema_name once supported on the Materialize side // if err := d.Set("upstream_name", t.UpstreamName.String); err != nil { // return diag.FromErr(err) // } diff --git a/pkg/resources/resource_source_table_test.go b/pkg/resources/resource_source_table_test.go index b912b9e9..2262bc24 100644 --- a/pkg/resources/resource_source_table_test.go +++ b/pkg/resources/resource_source_table_test.go @@ -35,21 +35,56 @@ func TestResourceSourceTableCreate(t *testing.T) { r.NotNil(d) testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Expect source type query + sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'` + testhelpers.MockSourceScanWithType(mock, sourceTypeQuery, "mysql") + // Create mock.ExpectExec( `CREATE TABLE "database"."schema"."table" - FROM SOURCE "materialize"."public"."source" - \(REFERENCE "upstream_schema"."upstream_table"\) - WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(column3, column4\)\);`, + FROM SOURCE "materialize"."public"."source" + \(REFERENCE "upstream_schema"."upstream_table"\) + WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(column3, column4\)\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_tables.name = 'table'` - testhelpers.MockTableScan(mock, ip) + testhelpers.MockSourceTableScan(mock, ip) + + // Query Params + pp := `WHERE mz_tables.id = 'u1'` + testhelpers.MockSourceTableScan(mock, pp) + + if err := sourceTableCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceTableCreateNonMySQL(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceTable().Schema, inSourceTable) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Expect source type query + sourceTypeQuery := `WHERE mz_databases.name = 'materialize' AND mz_schemas.name = 'public' AND mz_sources.name = 'source'` + testhelpers.MockSourceScan(mock, sourceTypeQuery) + + // Create (without IGNORE COLUMNS) + mock.ExpectExec(`CREATE TABLE "database"."schema"."table" + FROM SOURCE "materialize"."public"."source" + \(REFERENCE "upstream_schema"."upstream_table"\) + WITH \(TEXT COLUMNS \(column1, column2\)\);`). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_tables.name = 'table'` + testhelpers.MockSourceTableScan(mock, ip) // Query Params pp := `WHERE mz_tables.id = 'u1'` - testhelpers.MockTableScan(mock, pp) + testhelpers.MockSourceTableScan(mock, pp) if err := sourceTableCreate(context.TODO(), d, db); err != nil { t.Fatal(err) @@ -66,7 +101,7 @@ func TestResourceSourceTableRead(t *testing.T) { testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { // Query Params pp := `WHERE mz_tables.id = 'u1'` - testhelpers.MockTableScan(mock, pp) + testhelpers.MockSourceTableScan(mock, pp) if err := sourceTableRead(context.TODO(), d, db); err != nil { t.Fatal(err) @@ -90,7 +125,7 @@ func TestResourceSourceTableUpdate(t *testing.T) { // Query Params pp := `WHERE mz_tables.id = 'u1'` - testhelpers.MockTableScan(mock, pp) + testhelpers.MockSourceTableScan(mock, pp) if err := sourceTableUpdate(context.TODO(), d, db); err != nil { t.Fatal(err) diff --git a/pkg/testhelpers/mock_scans.go b/pkg/testhelpers/mock_scans.go index 3f1cad5a..9b177598 100644 --- a/pkg/testhelpers/mock_scans.go +++ b/pkg/testhelpers/mock_scans.go @@ -569,6 +569,45 @@ func MockSourceScan(mock sqlmock.Sqlmock, predicate string) { mock.ExpectQuery(q).WillReturnRows(ir) } +func MockSourceScanWithType(mock sqlmock.Sqlmock, predicate string, sourceType string) { + b := ` + SELECT + mz_sources.id, + mz_sources.name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.type AS source_type, + COALESCE\(mz_sources.size, mz_clusters.size\) AS size, + mz_sources.envelope_type, + mz_connections.name as connection_name, + mz_clusters.name as cluster_name, + comments.comment AS comment, + mz_roles.name AS owner_name, + mz_sources.privileges + FROM mz_sources + JOIN mz_schemas + ON mz_sources.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + LEFT JOIN mz_connections + ON mz_sources.connection_id = mz_connections.id + LEFT JOIN mz_clusters + ON mz_sources.cluster_id = mz_clusters.id + JOIN mz_roles + ON mz_sources.owner_id = mz_roles.id + LEFT JOIN \( + SELECT id, comment + FROM mz_internal.mz_comments + WHERE object_type = 'source' + \) comments + ON mz_sources.id = comments.id` + + q := mockQueryBuilder(b, predicate, "") + ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_type", "size", "envelope_type", "connection_name", "cluster_name", "owner_name", "privileges"}). + AddRow("u1", "source", "schema", "database", sourceType, "small", "BYTES", "conn", "cluster", "joe", defaultPrivilege) + mock.ExpectQuery(q).WillReturnRows(ir) +} + func MockSubsourceScan(mock sqlmock.Sqlmock, predicate string) { b := ` WITH dependencies AS \( @@ -736,6 +775,47 @@ func MockTableScan(mock sqlmock.Sqlmock, predicate string) { mock.ExpectQuery(q).WillReturnRows(ir) } +func MockSourceTableScan(mock sqlmock.Sqlmock, predicate string) { + b := ` + SELECT + mz_tables.id, + mz_tables.name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.name AS source_name, + source_schemas.name AS source_schema_name, + source_databases.name AS source_database_name, + mz_sources.type AS source_type, + comments.comment AS comment, + mz_roles.name AS owner_name, + mz_tables.privileges + FROM mz_tables + JOIN mz_schemas + ON mz_tables.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + JOIN mz_sources + ON mz_tables.source_id = mz_sources.id + JOIN mz_schemas AS source_schemas + ON mz_sources.schema_id = source_schemas.id + JOIN mz_databases AS source_databases + ON source_schemas.database_id = source_databases.id + JOIN mz_roles + ON mz_tables.owner_id = mz_roles.id + LEFT JOIN \( + SELECT id, comment + FROM mz_internal.mz_comments + WHERE object_type = 'table' + AND object_sub_id IS NULL + \) comments + ON mz_tables.id = comments.id` + + q := mockQueryBuilder(b, predicate, "") + ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_name", "source_schema_name", "source_database_name", "source_type", "comment", "owner_name", "privileges"}). + AddRow("u1", "table", "schema", "database", "source", "public", "materialize", "KAFKA", "comment", "materialize", defaultPrivilege) + mock.ExpectQuery(q).WillReturnRows(ir) +} + func MockTypeScan(mock sqlmock.Sqlmock, predicate string) { b := ` SELECT