Skip to content

Commit

Permalink
[WIP] Expand Connections Test Coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Nov 27, 2023
1 parent 4481ee9 commit b29946a
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 11 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
'
redpanda:
container_name: redpanda
image: docker.vectorized.io/vectorized/redpanda:v21.11.2
command:
- redpanda start
Expand Down
206 changes: 195 additions & 11 deletions integration/connection.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,84 @@ resource "materialize_connection_kafka" "kafka_connection" {
kafka_broker {
broker = "redpanda:9092"
}
validate = true
}

resource "materialize_connection_confluent_schema_registry" "schema_registry" {
name = "schema_registry_connection"
comment = "connection schema registry comment"
resource "materialize_connection_kafka" "kafka_conn_ssl_auth" {
name = "kafka_conn_ssl_auth"
security_protocol = "SSL"

url = "http://redpanda:8081"
kafka_broker {
broker = "redpanda:9092"
}

ssl_certificate {
text = "certificate-content"
}

ssl_key {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

ssl_certificate_authority {
text = "ca-content"
}

validate = false
}

resource "materialize_connection_ssh_tunnel" "ssh_connection" {
name = "ssh_connection"
schema_name = "public"
comment = "connection ssh tunnel comment"
resource "materialize_connection_kafka" "kafka_ssh_tunnel_connection" {
name = "kafka_ssh_tunnel_connection"
security_protocol = "PLAINTEXT"

host = "ssh_host"
user = "ssh_user"
port = 22
kafka_broker {
broker = "redpanda:9092"
}

ssh_tunnel {
name = materialize_connection_ssh_tunnel.ssh_connection.name
}

validate = false
}

resource "materialize_connection_kafka" "kafka_sasl_ssl" {
name = "kafka_sasl_ssl"
security_protocol = "SASL_SSL"

kafka_broker {
broker = "redpanda:9092"
}

sasl_mechanisms = "SCRAM-SHA-256"

sasl_username {
text = "sasl_username"
}

sasl_password {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

ssl_certificate {
text = "ssl_certificate_content"
}

ssl_key {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

ssl_certificate_authority {
text = "ssl_ca_content"
}

validate = false
}

resource "materialize_connection_kafka" "kafka_conn_multiple_brokers" {
Expand All @@ -47,6 +108,82 @@ resource "materialize_connection_kafka" "kafka_conn_multiple_brokers" {
validate = false
}

resource "materialize_connection_confluent_schema_registry" "schema_registry" {
name = "schema_registry_connection"
comment = "connection schema registry comment"

url = "http://redpanda:8081"
}

resource "materialize_connection_confluent_schema_registry" "csr_with_basic_auth" {
name = "csr_with_basic_auth"
url = "http://redpanda:8081"

username {
text = "username"
}

password {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

validate = false
}

resource "materialize_connection_confluent_schema_registry" "schema_registry_basic_auth_ssl" {
name = "schema_registry_basic_auth_ssl"
url = "http://redpanda:8081"

username {
text = "schema_registry_user"
}

password {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

ssl_certificate {
text = "ssl_certificate_content"
}

ssl_key {
name = materialize_secret.kafka_password.name
database_name = materialize_secret.kafka_password.database_name
schema_name = materialize_secret.kafka_password.schema_name
}

ssl_certificate_authority {
text = "ssl_ca_content" #
}

validate = false
}

resource "materialize_connection_confluent_schema_registry" "schema_registry_ssh_tunnel" {
name = "schema_registry_ssh_tunnel"
url = "http://redpanda:8081"

ssh_tunnel {
name = materialize_connection_ssh_tunnel.ssh_connection.name
}

validate = false
}

resource "materialize_connection_ssh_tunnel" "ssh_connection" {
name = "ssh_connection"
schema_name = "public"
comment = "connection ssh tunnel comment"

host = "ssh_host"
user = "ssh_user"
port = 22
}

resource "materialize_connection_postgres" "postgres_connection" {
name = "postgres_connection"
comment = "connection postgres comment"
Expand Down Expand Up @@ -84,6 +221,53 @@ resource "materialize_connection_postgres" "postgres_connection_with_secret" {
validate = false
}

resource "materialize_connection_postgres" "postgres_ssh_tunnel_connection" {
name = "postgres_ssh_tunnel_connection"
host = "postgres"
port = 5432
database = "postgres"
user {
text = "postgres"
}
password {
name = materialize_secret.postgres_password.name
database_name = materialize_secret.postgres_password.database_name
schema_name = materialize_secret.postgres_password.schema_name
}
ssh_tunnel {
name = materialize_connection_ssh_tunnel.ssh_connection.name
}
validate = false
}

resource "materialize_connection_postgres" "postgres_ssl_connection" {
name = "postgres_ssl_connection"
host = "postgres"
port = 5432
database = "postgres"
user {
text = "postgres"
}
password {
name = materialize_secret.postgres_password.name
database_name = materialize_secret.postgres_password.database_name
schema_name = materialize_secret.postgres_password.schema_name
}
ssl_mode = "require"
ssl_certificate {
text = "client_certificate_content"
}
ssl_key {
name = materialize_secret.postgres_password.name
database_name = materialize_secret.postgres_password.database_name
schema_name = materialize_secret.postgres_password.schema_name
}
ssl_certificate_authority {
text = "ca_certificate_content"
}
validate = false
}

resource "materialize_connection_grant" "connection_grant_usage" {
role_name = materialize_role.role_1.name
privilege = "USAGE"
Expand Down
2 changes: 2 additions & 0 deletions pkg/provider/acceptance_connection_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestAccConnKafka_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_connection_kafka.test", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_connection_kafka.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, connectionName)),
resource.TestCheckResourceAttr("materialize_connection_kafka.test", "ownership_role", "mz_system"),
resource.TestCheckResourceAttr("materialize_connection_kafka.test", "comment", "object comment"),
testAccCheckConnKafkaExists("materialize_connection_kafka.test_role"),
resource.TestCheckResourceAttr("materialize_connection_kafka.test_role", "name", connection2Name),
resource.TestCheckResourceAttr("materialize_connection_kafka.test_role", "ownership_role", roleName),
Expand Down Expand Up @@ -114,6 +115,7 @@ resource "materialize_connection_kafka" "test" {
broker = "redpanda:9092"
}
security_protocol = "PLAINTEXT"
comment = "object comment"
}
resource "materialize_connection_kafka" "test_role" {
Expand Down
2 changes: 2 additions & 0 deletions pkg/provider/acceptance_connection_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestAccConnPostgres_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_connection_postgres.test", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_connection_postgres.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, connectionName)),
resource.TestCheckResourceAttr("materialize_connection_postgres.test", "ownership_role", "mz_system"),
resource.TestCheckResourceAttr("materialize_connection_postgres.test", "comment", "object comment"),
testAccCheckConnPostgresExists("materialize_connection_postgres.test_role"),
resource.TestCheckResourceAttr("materialize_connection_postgres.test_role", "name", connection2Name),
resource.TestCheckResourceAttr("materialize_connection_postgres.test_role", "ownership_role", roleName),
Expand Down Expand Up @@ -134,6 +135,7 @@ resource "materialize_connection_postgres" "test" {
database_name = materialize_secret.postgres_password.database_name
}
database = "postgres"
comment = "object comment"
}
resource "materialize_connection_postgres" "test_role" {
Expand Down
2 changes: 2 additions & 0 deletions pkg/provider/acceptance_connection_ssh_tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestAccConnSshTunnel_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, connectionName)),
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test", "ownership_role", "mz_system"),
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test", "comment", "object comment"),
testAccCheckConnKafkaExists("materialize_connection_ssh_tunnel.test_role"),
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test_role", "name", connection2Name),
resource.TestCheckResourceAttr("materialize_connection_ssh_tunnel.test_role", "ownership_role", roleName),
Expand Down Expand Up @@ -115,6 +116,7 @@ resource "materialize_connection_ssh_tunnel" "test" {
host = "ssh_host"
user = "ssh_user"
port = 22
comment = "object comment"
}
resource "materialize_connection_ssh_tunnel" "test_role" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var inConfluentSchemaRegistry = map[string]interface{}{
},
},
"aws_privatelink": []interface{}{map[string]interface{}{"name": "privatelink"}},
"comment": "object comment",
}

func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) {
Expand All @@ -48,6 +49,9 @@ func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) {
`CREATE CONNECTION "database"."schema"."conn" TO CONFLUENT SCHEMA REGISTRY \(URL 'http://localhost:8081', USERNAME = 'user', PASSWORD = SECRET "materialize"."public"."password", SSL CERTIFICATE AUTHORITY = SECRET "materialize"."public"."ssl", SSL CERTIFICATE = SECRET "materialize"."public"."ssl", SSL KEY = SECRET "ssl_key"."public"."ssl", AWS PRIVATELINK "materialize"."public"."privatelink", SSH TUNNEL "materialize"."tunnel_schema"."tunnel"\)`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Comment
mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'object comment';`).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_connections.name = 'conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'`
testhelpers.MockConnectionScan(mock, ip)
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/resource_connection_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var inKafka = map[string]interface{}{
"sasl_username": []interface{}{map[string]interface{}{"text": "username"}},
"sasl_password": []interface{}{map[string]interface{}{"name": "password"}},
"ssh_tunnel": []interface{}{map[string]interface{}{"name": "tunnel"}},
"comment": "object comment",
}

func TestResourceConnectionKafkaCreate(t *testing.T) {
Expand All @@ -41,6 +42,9 @@ func TestResourceConnectionKafkaCreate(t *testing.T) {
`CREATE CONNECTION "database"."schema"."conn" TO KAFKA \(BROKERS \('b-1.hostname-1:9096' USING SSH TUNNEL "materialize"."public"."tunnel"\), SECURITY PROTOCOL = 'SASL_PLAINTEXT', PROGRESS TOPIC 'topic', SSL CERTIFICATE AUTHORITY = 'key', SSL CERTIFICATE = SECRET "materialize"."public"."cert", SSL KEY = SECRET "materialize"."public"."key", SASL MECHANISMS = 'PLAIN', SASL USERNAME = 'username', SASL PASSWORD = SECRET "materialize"."public"."password"\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Comment
mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'object comment';`).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_connections.name = 'conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'`
testhelpers.MockConnectionScan(mock, ip)
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/resource_connection_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var inPostgres = map[string]interface{}{
"ssl_key": []interface{}{map[string]interface{}{"name": "key"}},
"ssl_mode": "verify-full",
"aws_privatelink": []interface{}{map[string]interface{}{"name": "link"}},
"comment": "object comment",
}

func TestResourceConnectionPostgresCreate(t *testing.T) {
Expand All @@ -53,6 +54,9 @@ func TestResourceConnectionPostgresCreate(t *testing.T) {
`CREATE CONNECTION "database"."schema"."conn" TO POSTGRES \(HOST 'postgres_host', PORT 5432, USER SECRET "materialize"."public"."user", PASSWORD SECRET "materialize"."public"."password", SSL MODE 'verify-full', SSH TUNNEL "tunnel_database"."tunnel_schema"."ssh_conn", SSL CERTIFICATE AUTHORITY SECRET "ssl_database"."public"."root", SSL CERTIFICATE SECRET "materialize"."public"."cert", SSL KEY SECRET "materialize"."public"."key", AWS PRIVATELINK "materialize"."public"."link", DATABASE 'default'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Comment
mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'object comment';`).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_connections.name = 'conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'`
testhelpers.MockConnectionScan(mock, ip)
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/resource_connection_ssh_tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var inSshTunnel = map[string]interface{}{
"host": "localhost",
"port": 123,
"user": "user",
"comment": "object comment",
}

func TestResourceConnectionSshTunnelCreate(t *testing.T) {
Expand All @@ -34,6 +35,9 @@ func TestResourceConnectionSshTunnelCreate(t *testing.T) {
`CREATE CONNECTION "database"."schema"."conn" TO SSH TUNNEL \(HOST 'localhost', USER 'user', PORT 123\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Comment
mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'object comment';`).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_connections.name = 'conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'`
testhelpers.MockConnectionScan(mock, ip)
Expand Down

0 comments on commit b29946a

Please sign in to comment.