Skip to content

Commit

Permalink
Prevent Name Leaking
Browse files Browse the repository at this point in the history
  • Loading branch information
Dennis Hume committed Nov 9, 2023
1 parent 18f64b6 commit a4b4b75
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 64 deletions.
8 changes: 2 additions & 6 deletions pkg/materialize/identifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ func GetIdentifierSchemaStruct(databaseName string, schemaName string, v interfa
if v, ok := u["name"]; ok {
conn.Name = v.(string)
}
if v, ok := u["schema_name"]; ok && v.(string) != "" {
if v, ok := u["schema_name"]; ok {
conn.SchemaName = v.(string)
} else {
conn.SchemaName = schemaName
}
if v, ok := u["database_name"]; ok && v.(string) != "" {
if v, ok := u["database_name"]; ok {
conn.DatabaseName = v.(string)
} else {
conn.DatabaseName = databaseName
}
return conn
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ var inConfluentSchemaRegistry = map[string]interface{}{
"url": "http://localhost:8081",
"ssl_certificate_authority": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "ssl"}}}},
"ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "ssl"}}}},
"ssl_key": []interface{}{map[string]interface{}{"name": "ssl"}},
"password": []interface{}{map[string]interface{}{"name": "password"}},
"username": []interface{}{map[string]interface{}{"text": "user"}},
"ssh_tunnel": []interface{}{map[string]interface{}{"name": "tunnel"}},
"aws_privatelink": []interface{}{map[string]interface{}{"name": "privatelink"}},
"ssl_key": []interface{}{
map[string]interface{}{
"name": "ssl",
"database_name": "ssl_key",
},
},
"password": []interface{}{map[string]interface{}{"name": "password"}},
"username": []interface{}{map[string]interface{}{"text": "user"}},
"ssh_tunnel": []interface{}{
map[string]interface{}{
"name": "tunnel",
"schema_name": "tunnel_schema",
},
},
"aws_privatelink": []interface{}{map[string]interface{}{"name": "privatelink"}},
}

func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) {
Expand All @@ -35,7 +45,7 @@ func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE CONNECTION "database"."schema"."conn" TO CONFLUENT SCHEMA REGISTRY \(URL 'http://localhost:8081', USERNAME = 'user', PASSWORD = SECRET "database"."schema"."password", SSL CERTIFICATE AUTHORITY = SECRET "database"."schema"."ssl", SSL CERTIFICATE = SECRET "database"."schema"."ssl", SSL KEY = SECRET "database"."schema"."ssl", AWS PRIVATELINK "database"."schema"."privatelink", SSH TUNNEL "database"."schema"."tunnel"\)`,
`CREATE CONNECTION "database"."schema"."conn" TO CONFLUENT SCHEMA REGISTRY \(URL 'http://localhost:8081', USERNAME = 'user', PASSWORD = SECRET "materialize"."public"."password", SSL CERTIFICATE AUTHORITY = SECRET "materialize"."public"."ssl", SSL CERTIFICATE = SECRET "materialize"."public"."ssl", SSL KEY = SECRET "ssl_key"."public"."ssl", AWS PRIVATELINK "materialize"."public"."privatelink", SSH TUNNEL "materialize"."tunnel_schema"."tunnel"\)`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/resource_connection_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestResourceConnectionKafkaCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE CONNECTION "database"."schema"."conn" TO KAFKA \(BROKERS \('b-1.hostname-1:9096' USING SSH TUNNEL "database"."schema"."tunnel"\), PROGRESS TOPIC 'topic', SSL CERTIFICATE AUTHORITY = 'key', SSL CERTIFICATE = SECRET "database"."schema"."cert", SSL KEY = SECRET "database"."schema"."key", SASL MECHANISMS = 'PLAIN', SASL USERNAME = 'username', SASL PASSWORD = SECRET "database"."schema"."password"\);`,
`CREATE CONNECTION "database"."schema"."conn" TO KAFKA \(BROKERS \('b-1.hostname-1:9096' USING SSH TUNNEL "materialize"."public"."tunnel"\), PROGRESS TOPIC 'topic', SSL CERTIFICATE AUTHORITY = 'key', SSL CERTIFICATE = SECRET "materialize"."public"."cert", SSL KEY = SECRET "materialize"."public"."key", SASL MECHANISMS = 'PLAIN', SASL USERNAME = 'username', SASL PASSWORD = SECRET "materialize"."public"."password"\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
43 changes: 28 additions & 15 deletions pkg/resources/resource_connection_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,33 @@ import (
)

var inPostgres = map[string]interface{}{
"name": "conn",
"schema_name": "schema",
"database_name": "database",
"database": "default",
"host": "postgres_host",
"port": 5432,
"user": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "user"}}}},
"password": []interface{}{map[string]interface{}{"name": "password"}},
"ssh_tunnel": []interface{}{map[string]interface{}{"name": "ssh_conn"}},
"ssl_certificate_authority": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "root"}}}},
"ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "cert"}}}},
"ssl_key": []interface{}{map[string]interface{}{"name": "key"}},
"ssl_mode": "verify-full",
"aws_privatelink": []interface{}{map[string]interface{}{"name": "link"}},
"name": "conn",
"schema_name": "schema",
"database_name": "database",
"database": "default",
"host": "postgres_host",
"port": 5432,
"user": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "user"}}}},
"password": []interface{}{map[string]interface{}{"name": "password"}},
"ssh_tunnel": []interface{}{
map[string]interface{}{
"name": "ssh_conn",
"schema_name": "tunnel_schema",
"database_name": "tunnel_database",
},
},
"ssl_certificate_authority": []interface{}{
map[string]interface{}{
"secret": []interface{}{map[string]interface{}{
"name": "root",
"database_name": "ssl_database",
}},
},
},
"ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "cert"}}}},
"ssl_key": []interface{}{map[string]interface{}{"name": "key"}},
"ssl_mode": "verify-full",
"aws_privatelink": []interface{}{map[string]interface{}{"name": "link"}},
}

func TestResourceConnectionPostgresCreate(t *testing.T) {
Expand All @@ -37,7 +50,7 @@ func TestResourceConnectionPostgresCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE CONNECTION "database"."schema"."conn" TO POSTGRES \(HOST 'postgres_host', PORT 5432, USER SECRET "database"."schema"."user", PASSWORD SECRET "database"."schema"."password", SSL MODE 'verify-full', SSH TUNNEL "database"."schema"."ssh_conn", SSL CERTIFICATE AUTHORITY SECRET "database"."schema"."root", SSL CERTIFICATE SECRET "database"."schema"."cert", SSL KEY SECRET "database"."schema"."key", AWS PRIVATELINK "database"."schema"."link", DATABASE 'default'\);`,
`CREATE CONNECTION "database"."schema"."conn" TO POSTGRES \(HOST 'postgres_host', PORT 5432, USER SECRET "materialize"."public"."user", PASSWORD SECRET "materialize"."public"."password", SSL MODE 'verify-full', SSH TUNNEL "tunnel_database"."tunnel_schema"."ssh_conn", SSL CERTIFICATE AUTHORITY SECRET "ssl_database"."public"."root", SSL CERTIFICATE SECRET "materialize"."public"."cert", SSL KEY SECRET "materialize"."public"."key", AWS PRIVATELINK "materialize"."public"."link", DATABASE 'default'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
42 changes: 32 additions & 10 deletions pkg/resources/resource_sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,40 @@ import (
)

var inSinkKafka = map[string]interface{}{
"name": "sink",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"from": []interface{}{map[string]interface{}{"name": "item", "schema_name": "public", "database_name": "database"}},
"name": "sink",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"from": []interface{}{
map[string]interface{}{
"name": "item",
"schema_name": "public",
"database_name": "database",
},
},
"kafka_connection": []interface{}{map[string]interface{}{"name": "kafka_conn"}},
"topic": "topic",
"key": []interface{}{"key_1", "key_2"},
"format": []interface{}{map[string]interface{}{"avro": []interface{}{map[string]interface{}{"avro_key_fullname": "avro_key_fullname", "avro_value_fullname": "avro_value_fullname", "schema_registry_connection": []interface{}{map[string]interface{}{"name": "csr_conn", "database_name": "database", "schema_name": "schema"}}}}}},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"snapshot": false,
"format": []interface{}{
map[string]interface{}{
"avro": []interface{}{
map[string]interface{}{
"avro_key_fullname": "avro_key_fullname",
"avro_value_fullname": "avro_value_fullname",
"schema_registry_connection": []interface{}{
map[string]interface{}{
"name": "csr_conn",
"database_name": "database",
"schema_name": "schema",
},
},
},
},
},
},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"snapshot": false,
}

func TestResourceSinkKafkaCreate(t *testing.T) {
Expand All @@ -35,7 +57,7 @@ func TestResourceSinkKafkaCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SINK "database"."schema"."sink" IN CLUSTER "cluster" FROM "database"."public"."item" INTO KAFKA CONNECTION "database"."schema"."kafka_conn" KEY \(key_1, key_2\) \(TOPIC 'topic'\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" WITH \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname'\) ENVELOPE UPSERT WITH \( SIZE = 'small' SNAPSHOT = false\);`,
`CREATE SINK "database"."schema"."sink" IN CLUSTER "cluster" FROM "database"."public"."item" INTO KAFKA CONNECTION "materialize"."public"."kafka_conn" KEY \(key_1, key_2\) \(TOPIC 'topic'\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" WITH \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname'\) ENVELOPE UPSERT WITH \( SIZE = 'small' SNAPSHOT = false\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
25 changes: 20 additions & 5 deletions pkg/resources/resource_source_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,25 @@ var inSourceKafka = map[string]interface{}{
"include_offset_alias": "offset",
"include_timestamp": true,
"include_timestamp_alias": "timestamp",
"format": []interface{}{map[string]interface{}{"avro": []interface{}{map[string]interface{}{"value_strategy": "avro_key_fullname", "schema_registry_connection": []interface{}{map[string]interface{}{"name": "csr_conn", "database_name": "database", "schema_name": "schema"}}}}}},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"start_offset": []interface{}{1, 2, 3},
"start_timestamp": -1000,
"format": []interface{}{
map[string]interface{}{
"avro": []interface{}{
map[string]interface{}{
"value_strategy": "avro_key_fullname",
"schema_registry_connection": []interface{}{
map[string]interface{}{
"name": "csr_conn",
"database_name": "database",
"schema_name": "schema",
},
},
},
},
},
},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"start_offset": []interface{}{1, 2, 3},
"start_timestamp": -1000,
}

func TestResourceSourceKafkaCreate(t *testing.T) {
Expand All @@ -45,7 +60,7 @@ func TestResourceSourceKafkaCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname START OFFSET \[1, 2, 3\] INCLUDE KEY AS key, HEADERS AS headers, PARTITION AS partition, OFFSET AS offset, TIMESTAMP AS timestamp ENVELOPE UPSERT WITH \(SIZE = 'small'\);`,
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname START OFFSET \[1, 2, 3\] INCLUDE KEY AS key, HEADERS AS headers, PARTITION AS partition, OFFSET AS offset, TIMESTAMP AS timestamp ENVELOPE UPSERT WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
46 changes: 27 additions & 19 deletions pkg/resources/resource_source_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ import (
)

var inSourcePostgresTable = map[string]interface{}{
"name": "source",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"postgres_connection": []interface{}{map[string]interface{}{"name": "pg_connection"}},
"publication": "mz_source",
"text_columns": []interface{}{"table.unsupported_type_1"},
"name": "source",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"postgres_connection": []interface{}{
map[string]interface{}{
"name": "pg_connection",
},
},
"publication": "mz_source",
"text_columns": []interface{}{"table.unsupported_type_1"},
"table": []interface{}{
map[string]interface{}{"name": "name1", "alias": "alias"},
map[string]interface{}{"name": "name2"},
Expand All @@ -37,7 +41,7 @@ func TestResourceSourcePostgresCreateTable(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "database"."schema"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR TABLES \(name1 AS alias, name2 AS name2\) WITH \(SIZE = 'small'\);`,
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "materialize"."public"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR TABLES \(name1 AS alias, name2 AS name2\) WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand All @@ -59,15 +63,19 @@ func TestResourceSourcePostgresCreateTable(t *testing.T) {
}

var inSourcePostgresSchema = map[string]interface{}{
"name": "source",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"postgres_connection": []interface{}{map[string]interface{}{"name": "pg_connection"}},
"publication": "mz_source",
"text_columns": []interface{}{"table.unsupported_type_1"},
"schema": []interface{}{"schema1", "schema2"},
"name": "source",
"schema_name": "schema",
"database_name": "database",
"cluster_name": "cluster",
"size": "small",
"postgres_connection": []interface{}{
map[string]interface{}{
"name": "pg_connection",
},
},
"publication": "mz_source",
"text_columns": []interface{}{"table.unsupported_type_1"},
"schema": []interface{}{"schema1", "schema2"},
}

func TestResourceSourcePostgresCreateSchema(t *testing.T) {
Expand All @@ -78,7 +86,7 @@ func TestResourceSourcePostgresCreateSchema(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "database"."schema"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR SCHEMAS \(schema1, schema2\) WITH \(SIZE = 'small'\);`,
`CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "materialize"."public"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR SCHEMAS \(schema1, schema2\) WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down
6 changes: 4 additions & 2 deletions pkg/resources/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ func IdentifierSchema(elem, description string, required bool) *schema.Schema {
Required: true,
},
"schema_name": {
Description: fmt.Sprintf("The %s schema name.", elem),
Description: fmt.Sprintf("The %s schema name. Defaults to `public`", elem),
Type: schema.TypeString,
Optional: true,
Default: defaultSchema,
},
"database_name": {
Description: fmt.Sprintf("The %s database name.", elem),
Description: fmt.Sprintf("The %s database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.", elem),
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("MZ_DATABASE", defaultDatabase),
},
},
},
Expand Down

0 comments on commit a4b4b75

Please sign in to comment.