Skip to content

Commit

Permalink
Add alter source refresh to data source
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Oct 28, 2024
1 parent 627fb7b commit ae709ec
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 24 deletions.
4 changes: 2 additions & 2 deletions docs/data-sources/source_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
page_title: "materialize_source_reference Data Source - terraform-provider-materialize"
subcategory: ""
description: |-
The materialize_source_reference data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times.
The materialize_source_reference data source retrieves a list of available upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.
---

# materialize_source_reference (Data Source)

The `materialize_source_reference` data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times.
The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.

## Example Usage

Expand Down
3 changes: 2 additions & 1 deletion pkg/datasources/datasource_source_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func SourceReference() *schema.Resource {
return &schema.Resource{
ReadContext: sourceReferenceRead,
Description: "The `materialize_source_reference` data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times.",
Description: "The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.",
Schema: map[string]*schema.Schema{
"source_id": {
Type: schema.TypeString,
Expand Down Expand Up @@ -79,6 +79,7 @@ func SourceReference() *schema.Resource {

func sourceReferenceRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
sourceID := d.Get("source_id").(string)
sourceID = utils.ExtractId(sourceID)

var diags diag.Diagnostics

Expand Down
38 changes: 20 additions & 18 deletions pkg/materialize/source_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,28 @@ func (b *SourcePostgresBuilder) Create() error {

q.WriteString(fmt.Sprintf(` (%s)`, p))

q.WriteString(` FOR TABLES (`)
for i, t := range b.table {
if t.UpstreamSchemaName == "" {
t.UpstreamSchemaName = b.SchemaName
}
if t.Name == "" {
t.Name = t.UpstreamName
}
if t.SchemaName == "" {
t.SchemaName = b.SchemaName
}
if t.DatabaseName == "" {
t.DatabaseName = b.DatabaseName
}
q.WriteString(fmt.Sprintf(`%s.%s AS %s.%s.%s`, QuoteIdentifier(t.UpstreamSchemaName), QuoteIdentifier(t.UpstreamName), QuoteIdentifier(t.DatabaseName), QuoteIdentifier(t.SchemaName), QuoteIdentifier(t.Name)))
if i < len(b.table)-1 {
q.WriteString(`, `)
if b.table != nil && len(b.table) > 0 {
q.WriteString(` FOR TABLES (`)
for i, t := range b.table {
if t.UpstreamSchemaName == "" {
t.UpstreamSchemaName = b.SchemaName
}
if t.Name == "" {
t.Name = t.UpstreamName
}
if t.SchemaName == "" {
t.SchemaName = b.SchemaName
}
if t.DatabaseName == "" {
t.DatabaseName = b.DatabaseName
}
q.WriteString(fmt.Sprintf(`%s.%s AS %s.%s.%s`, QuoteIdentifier(t.UpstreamSchemaName), QuoteIdentifier(t.UpstreamName), QuoteIdentifier(t.DatabaseName), QuoteIdentifier(t.SchemaName), QuoteIdentifier(t.Name)))
if i < len(b.table)-1 {
q.WriteString(`, `)
}
}
q.WriteString(`)`)
}
q.WriteString(`)`)

if b.exposeProgress.Name != "" {
q.WriteString(fmt.Sprintf(` EXPOSE PROGRESS AS %s`, b.exposeProgress.QualifiedName()))
Expand Down
18 changes: 16 additions & 2 deletions pkg/materialize/source_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package materialize

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
Expand Down Expand Up @@ -61,9 +62,22 @@ func ScanSourceReference(conn *sqlx.DB, id string) (SourceReferenceParams, error
return s, nil
}

func ListSourceReferences(conn *sqlx.DB, sourceId string) ([]SourceReferenceParams, error) {
func refreshSourceReferences(conn *sqlx.DB, sourceName, schemaName, databaseName string) error {
query := fmt.Sprintf(`ALTER SOURCE %s REFRESH REFERENCES`, QualifiedName(databaseName, schemaName, sourceName))
_, err := conn.Exec(query)
return err
}

func ListSourceReferences(conn *sqlx.DB, id string) ([]SourceReferenceParams, error) {
source, err := ScanSource(conn, id)
if err == nil {
if err := refreshSourceReferences(conn, source.SourceName.String, source.SchemaName.String, source.DatabaseName.String); err != nil {
return nil, fmt.Errorf("error refreshing source references: %v", err)
}
}

p := map[string]string{
"sr.source_id": sourceId,
"sr.source_id": id,
}
q := sourceReferenceQuery.QueryPredicate(p)

Expand Down
95 changes: 95 additions & 0 deletions pkg/materialize/source_reference_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package materialize

import (
"testing"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

func TestSourceReferenceId(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectQuery(
`SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type
FROM mz_internal\.mz_source_references sr
JOIN mz_sources s ON sr\.source_id = s\.id
JOIN mz_schemas ss ON s\.schema_id = ss\.id
JOIN mz_databases sd ON ss\.database_id = sd\.id
WHERE sr\.source_id = 'test-source-id'`,
).
WillReturnRows(sqlmock.NewRows([]string{"source_id"}).AddRow("test-source-id"))

result, err := SourceReferenceId(db, "test-source-id")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result != "test-source-id" {
t.Errorf("expected source id to be 'test-source-id', got %v", result)
}
})
}

func TestScanSourceReference(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectQuery(
`SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type
FROM mz_internal\.mz_source_references sr
JOIN mz_sources s ON sr\.source_id = s\.id
JOIN mz_schemas ss ON s\.schema_id = ss\.id
JOIN mz_databases sd ON ss\.database_id = sd\.id
WHERE sr\.source_id = 'test-source-id'`,
).
WillReturnRows(sqlmock.NewRows([]string{"source_id", "namespace", "name", "updated_at", "columns", "source_name", "source_schema_name", "source_database_name", "source_type"}).
AddRow("test-source-id", "test-namespace", "test-name", "2024-10-28", pq.StringArray{"col1", "col2"}, "source-name", "source-schema-name", "source-database-name", "source-type"))

result, err := ScanSourceReference(db, "test-source-id")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.SourceId.String != "test-source-id" {
t.Errorf("expected source id to be 'test-source-id', got %v", result.SourceId.String)
}
})
}

func TestRefreshSourceReferences(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`ALTER SOURCE "test-database"\."test-schema"\."test-source" REFRESH REFERENCES`,
).
WillReturnResult(sqlmock.NewResult(1, 1))

err := refreshSourceReferences(db, "test-source", "test-schema", "test-database")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
})
}

func TestListSourceReferences(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectQuery(
`SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type
FROM mz_internal\.mz_source_references sr
JOIN mz_sources s ON sr\.source_id = s\.id
JOIN mz_schemas ss ON s\.schema_id = ss\.id
JOIN mz_databases sd ON ss\.database_id = sd\.id
WHERE sr\.source_id = 'test-source-id'`,
).
WillReturnRows(sqlmock.NewRows([]string{"source_id", "namespace", "name", "updated_at", "columns", "source_name", "source_schema_name", "source_database_name", "source_type"}).
AddRow("test-source-id", "test-namespace", "test-name", "2024-10-28", pq.StringArray{"col1", "col2"}, "source-name", "source-schema-name", "source-database-name", "source-type"))

result, err := ListSourceReferences(db, "test-source-id")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result) != 1 {
t.Errorf("expected 1 result, got %d", len(result))
}
if result[0].SourceId.String != "test-source-id" {
t.Errorf("expected source id to be 'test-source-id', got %v", result[0].SourceId.String)
}
})
}
1 change: 0 additions & 1 deletion pkg/provider/acceptance_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ func testAccManagedClusterResourceAlterGraceful(clusterName, clusterSize string,
enabled = true
timeout = "10m"
on_timeout = "%[4]s"
}
}
`,
Expand Down
172 changes: 172 additions & 0 deletions pkg/provider/acceptance_datasource_source_reference_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package provider

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
)

func TestAccDataSourceSourceReference_basic(t *testing.T) {
addTestTopic()
nameSpace := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Steps: []resource.TestStep{
{
Config: testAccDataSourceSourceReferenceConfig(nameSpace),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet("data.materialize_source_reference.kafka", "source_id"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "source_id"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "source_id"),

// Check total references
resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.#", "1"),
resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.#", "3"),
resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.#", "4"),

// Check Postgres reference attributes
resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.namespace", "public"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "references.0.name"),
resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.source_name", fmt.Sprintf("%s_source_postgres", nameSpace)),
resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.source_type", "postgres"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "references.0.updated_at"),

// Check MySQL reference attributes
resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.0.namespace", "shop"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "references.0.name"),
resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.0.source_name", fmt.Sprintf("%s_source_mysql", nameSpace)),
resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.1.source_type", "mysql"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "references.1.updated_at"),

// Check Kafka reference attributes
resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.name", "terraform"),
resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.source_name", fmt.Sprintf("%s_source_kafka", nameSpace)),
resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.source_type", "kafka"),
resource.TestCheckResourceAttrSet("data.materialize_source_reference.kafka", "references.0.updated_at"),
),
},
},
})
}

func testAccDataSourceSourceReferenceConfig(nameSpace string) string {
return fmt.Sprintf(`
// Postgres setup
resource "materialize_secret" "postgres_password" {
name = "%[1]s_secret_postgres"
value = "c2VjcmV0Cg=="
}
resource "materialize_connection_postgres" "postgres_connection" {
name = "%[1]s_connection_postgres"
host = "postgres"
port = 5432
user {
text = "postgres"
}
password {
name = materialize_secret.postgres_password.name
}
database = "postgres"
}
resource "materialize_source_postgres" "test_source_postgres" {
name = "%[1]s_source_postgres"
cluster_name = "quickstart"
postgres_connection {
name = materialize_connection_postgres.postgres_connection.name
}
publication = "mz_source"
}
resource "materialize_source_table_postgres" "table_from_source_pg" {
name = "%[1]s_table"
schema_name = "public"
database_name = "materialize"
source {
name = materialize_source_postgres.test_source_postgres.name
}
upstream_name = "table2"
upstream_schema_name = "public"
}
// MySQL setup
resource "materialize_secret" "mysql_password" {
name = "%[1]s_secret_mysql"
value = "c2VjcmV0Cg=="
}
resource "materialize_connection_mysql" "mysql_connection" {
name = "%[1]s_connection_mysql"
host = "mysql"
port = 3306
user {
text = "repluser"
}
password {
name = materialize_secret.mysql_password.name
}
}
resource "materialize_source_mysql" "test_source_mysql" {
name = "%[1]s_source_mysql"
cluster_name = "quickstart"
mysql_connection {
name = materialize_connection_mysql.mysql_connection.name
}
}
// Kafka setup
resource "materialize_connection_kafka" "kafka_connection" {
name = "%[1]s_connection_kafka"
kafka_broker {
broker = "redpanda:9092"
}
security_protocol = "PLAINTEXT"
}
resource "materialize_source_kafka" "test_source_kafka" {
name = "%[1]s_source_kafka"
cluster_name = "quickstart"
topic = "terraform"
kafka_connection {
name = materialize_connection_kafka.kafka_connection.name
}
value_format {
json = true
}
key_format {
json = true
}
}
data "materialize_source_reference" "kafka" {
source_id = materialize_source_kafka.test_source_kafka.id
depends_on = [
materialize_source_kafka.test_source_kafka
]
}
data "materialize_source_reference" "postgres" {
source_id = materialize_source_postgres.test_source_postgres.id
depends_on = [
materialize_source_postgres.test_source_postgres
]
}
data "materialize_source_reference" "mysql" {
source_id = materialize_source_mysql.test_source_mysql.id
depends_on = [
materialize_source_mysql.test_source_mysql
]
}
`, nameSpace)
}

0 comments on commit ae709ec

Please sign in to comment.