From cfe082b60c277c41d3bdba0c65dde321614d0f01 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sun, 26 Dec 2021 18:51:24 +0800 Subject: [PATCH 1/5] add kafka_topic_list data source --- kafka/client.go | 8 +++++ kafka/data_source_kafka_topic_list.go | 34 ++++++++++++++++++++++ kafka/data_source_kafka_topic_list_test.go | 30 +++++++++++++++++++ kafka/lazy_client.go | 7 +++++ kafka/provider.go | 1 + 5 files changed, 80 insertions(+) create mode 100755 kafka/data_source_kafka_topic_list.go create mode 100755 kafka/data_source_kafka_topic_list_test.go diff --git a/kafka/client.go b/kafka/client.go index c4dd9b4b..94b78382 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -598,3 +598,11 @@ func (c *Client) getDeleteAclsRequestAPIVersion() int16 { func (c *Client) getDescribeConfigAPIVersion() int16 { return int16(c.versionForKey(32, 1)) } + +func (c *Client) getKafkaTopicList() ([]string, error) { + topics, err := c.client.Topics() + if err != nil { + return nil, err + } + return topics, nil +} \ No newline at end of file diff --git a/kafka/data_source_kafka_topic_list.go b/kafka/data_source_kafka_topic_list.go new file mode 100755 index 00000000..2a9892f6 --- /dev/null +++ b/kafka/data_source_kafka_topic_list.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "time" +) + +func kafkaTopicListDataSource() *schema.Resource { + return &schema.Resource{ + Read: dataSourceTopicListRead, + Schema: map[string]*schema.Schema{ + "list": { + Type: schema.TypeList, + Computed: true, + Description: "a list of kafka topics in the Kafka cluster", + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + } +} + +func dataSourceTopicListRead(d *schema.ResourceData, meta interface{}) error { + client := meta.(*LazyClient) + topicList, err := client.GetKafkaTopicList() + if err != nil { + return err + } + err = d.Set("list", topicList) + if err != nil { + return err + } + d.SetId(time.Now().UTC().String()) + return nil +} diff --git a/kafka/data_source_kafka_topic_list_test.go b/kafka/data_source_kafka_topic_list_test.go new file mode 100755 index 00000000..e9db1688 --- /dev/null +++ b/kafka/data_source_kafka_topic_list_test.go @@ -0,0 +1,30 @@ +package kafka + +import ( + "testing" + + r "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +func TestAcc_TopicList(t *testing.T) { + bs := testBootstrapServers[0] + // Should be only one topic in a brand new kafka cluster + expectedTopic := "__confluent.support.metrics" + r.Test(t, r.TestCase{ + ProviderFactories: overrideProviderFactory(), + Steps: []r.TestStep{ + { + Config: cfg(t, bs,testDataSourceKafkaTopicList), + Check: r.ComposeTestCheckFunc( + r.TestCheckResourceAttr("data.kafka_topic_list.test", "list.0", expectedTopic), + r.TestCheckResourceAttr("data.kafka_topic_list.test", "list.#", "1"), + ), + }, + }, + }) +} + +const testDataSourceKafkaTopicList = ` +data "kafka_topic_list" "test" { +} +` \ No newline at end of file diff --git a/kafka/lazy_client.go b/kafka/lazy_client.go index 97d96df2..cfa0a96c 100644 --- a/kafka/lazy_client.go +++ b/kafka/lazy_client.go @@ -185,3 +185,10 @@ func (c *LazyClient) DeleteUserScramCredential(userScramCredential UserScramCred } return c.inner.DeleteUserScramCredential(userScramCredential) } +func (c *LazyClient) GetKafkaTopics() ([]Topic, error) { + err := c.init() + if err != nil { + return nil, err + } + return c.inner.getKafkaTopics() +} diff --git a/kafka/provider.go b/kafka/provider.go index d4239fc5..4b683fda 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -108,6 +108,7 @@ func Provider() *schema.Provider { }, DataSourcesMap: map[string]*schema.Resource{ "kafka_topic": kafkaTopicDataSource(), + "kafka_topic_list": kafkaTopicListDataSource(), }, } } From 6dfda824469c3182a8d9f8ecabf96bcebb655873 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sun, 26 Dec 2021 20:52:40 +0800 Subject: [PATCH 2/5] add REAME.md file --- README.md | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index fef4636b..fd8c227e 100644 --- a/README.md +++ b/README.md @@ -225,12 +225,65 @@ resource "kafka_user_scram_credential" "test" { #### Properties -| Property | Description | -| -------------------- | ---------------------------------------------- | -| `username` | The username | -| `scram_mechanism` | The SCRAM mechanism (SCRAM-SHA-256 or SCRAM-SHA-512) | -| `scram_iterations` | The number of SCRAM iterations (must be >= 4096). Default: 4096 | -| `password` | The password for the user | +| Property | Description | +|---------------------|-----------------------------------------------------------------| +| `username` | The username | +| `scram_mechanism` | The SCRAM mechanism (SCRAM-SHA-256 or SCRAM-SHA-512) | +| `scram_iterations` | The number of SCRAM iterations (must be >= 4096). Default: 4096 | +| `password` | The password for the user | + +## Data Sources +### `kafka_topic` + +A data source for getting information about a kafka topic. + +#### Example + +```hcl +provider "kafka" { + bootstrap_servers = ["localhost:9092"] +} +data "kafka_topic" "test" { + name = "test_topic_name" +} +output "output_test" { + value = data.kafka_topic.test.partitions +``` + +### `kafka_topics` + +A data source for getting a list of all available Kafka topics. The object topic return will have the same property of the kafka_topic data source +#### Example + +```hcl +provider "kafka" { + bootstrap_servers = ["localhost:9092"] +} +data "kafka_topics" "test" { +} +output "output_test_topic_name" { + value = data.kafka_topics.test[0].name +} +output "output_test_topic_partitions" { + value = data.kafka_topics.test[0].partitions +} +output "output_test_topic_replication_factor" { + value = data.kafka_topics.test[0].replication_factor +} +output "output_test_topic_config" { + value = data.kafka_topics.test[0].config["retention.ms"] +} +``` + +#### Properties + +| Property | Description | +|---------------------|-----------------------------------------------------------------| +| `list` | The list containing all kafka topics | +| `username` | The username | +| `scram_mechanism` | The SCRAM mechanism (SCRAM-SHA-256 or SCRAM-SHA-512) | +| `scram_iterations` | The number of SCRAM iterations (must be >= 4096). Default: 4096 | +| `password` | The password for the user | ## Requirements * [>= Kafka 1.0.0][3] From 8dd5bfa232867c61fdbb2a98eb5826b252497e06 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Wed, 5 Jan 2022 22:34:41 +0800 Subject: [PATCH 3/5] rename kafka_topic_list to kafka_topics data resource --- ...fka_topic_list.go => data_source_kafka_topics.go} | 6 +++--- ...list_test.go => data_source_kafka_topics_test.go} | 12 ++++++------ kafka/provider.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) rename kafka/{data_source_kafka_topic_list.go => data_source_kafka_topics.go} (78%) rename kafka/{data_source_kafka_topic_list_test.go => data_source_kafka_topics_test.go} (55%) diff --git a/kafka/data_source_kafka_topic_list.go b/kafka/data_source_kafka_topics.go similarity index 78% rename from kafka/data_source_kafka_topic_list.go rename to kafka/data_source_kafka_topics.go index 2a9892f6..fc4e946c 100755 --- a/kafka/data_source_kafka_topic_list.go +++ b/kafka/data_source_kafka_topics.go @@ -5,9 +5,9 @@ import ( "time" ) -func kafkaTopicListDataSource() *schema.Resource { +func kafkaTopicsDataSource() *schema.Resource { return &schema.Resource{ - Read: dataSourceTopicListRead, + Read: dataSourceTopicsRead, Schema: map[string]*schema.Schema{ "list": { Type: schema.TypeList, @@ -19,7 +19,7 @@ func kafkaTopicListDataSource() *schema.Resource { } } -func dataSourceTopicListRead(d *schema.ResourceData, meta interface{}) error { +func dataSourceTopicsRead(d *schema.ResourceData, meta interface{}) error { client := meta.(*LazyClient) topicList, err := client.GetKafkaTopicList() if err != nil { diff --git a/kafka/data_source_kafka_topic_list_test.go b/kafka/data_source_kafka_topics_test.go similarity index 55% rename from kafka/data_source_kafka_topic_list_test.go rename to kafka/data_source_kafka_topics_test.go index e9db1688..d0998746 100755 --- a/kafka/data_source_kafka_topic_list_test.go +++ b/kafka/data_source_kafka_topics_test.go @@ -6,7 +6,7 @@ import ( r "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" ) -func TestAcc_TopicList(t *testing.T) { +func TestAcc_Topics(t *testing.T) { bs := testBootstrapServers[0] // Should be only one topic in a brand new kafka cluster expectedTopic := "__confluent.support.metrics" @@ -14,17 +14,17 @@ func TestAcc_TopicList(t *testing.T) { ProviderFactories: overrideProviderFactory(), Steps: []r.TestStep{ { - Config: cfg(t, bs,testDataSourceKafkaTopicList), + Config: cfg(t, bs,testDataSourceKafkaTopics), Check: r.ComposeTestCheckFunc( - r.TestCheckResourceAttr("data.kafka_topic_list.test", "list.0", expectedTopic), - r.TestCheckResourceAttr("data.kafka_topic_list.test", "list.#", "1"), + r.TestCheckResourceAttr("data.kafka_topics.test", "list.0", expectedTopic), + r.TestCheckResourceAttr("data.kafka_topics.test", "list.#", "1"), ), }, }, }) } -const testDataSourceKafkaTopicList = ` -data "kafka_topic_list" "test" { +const testDataSourceKafkaTopics = ` +data "kafka_topics" "test" { } ` \ No newline at end of file diff --git a/kafka/provider.go b/kafka/provider.go index 4b683fda..761819cd 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -108,7 +108,7 @@ func Provider() *schema.Provider { }, DataSourcesMap: map[string]*schema.Resource{ "kafka_topic": kafkaTopicDataSource(), - "kafka_topic_list": kafkaTopicListDataSource(), + "kafka_topics": kafkaTopicsDataSource(), }, } } From 67f4628c22bc94f4e3bbd0a957f28a5a5864f48d Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Fri, 7 Jan 2022 23:10:51 +0800 Subject: [PATCH 4/5] change data source name to kafka_topics and attribute returned --- kafka/client.go | 13 +++-- kafka/data_source_kafka_topics.go | 67 ++++++++++++++++++++++---- kafka/data_source_kafka_topics_test.go | 26 ++++++++-- kafka/provider.go | 2 +- 4 files changed, 90 insertions(+), 18 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index 94b78382..7c0d2612 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -599,10 +599,17 @@ func (c *Client) getDescribeConfigAPIVersion() int16 { return int16(c.versionForKey(32, 1)) } -func (c *Client) getKafkaTopicList() ([]string, error) { +func (c *Client) getKafkaTopics() ([]Topic, error) { topics, err := c.client.Topics() if err != nil { return nil, err } - return topics, nil -} \ No newline at end of file + topicList := make([]Topic, len(topics)) + for i, _ := range topicList { + topicList[i], err = c.ReadTopic(topics[i], true) + if err != nil { + return nil, err + } + } + return topicList, nil +} diff --git a/kafka/data_source_kafka_topics.go b/kafka/data_source_kafka_topics.go index fc4e946c..6031c07c 100755 --- a/kafka/data_source_kafka_topics.go +++ b/kafka/data_source_kafka_topics.go @@ -1,34 +1,81 @@ package kafka import ( + "context" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "time" ) func kafkaTopicsDataSource() *schema.Resource { return &schema.Resource{ - Read: dataSourceTopicsRead, + ReadContext: dataSourceTopicsRead, Schema: map[string]*schema.Schema{ - "list": { + "list": &schema.Schema{ Type: schema.TypeList, Computed: true, - Description: "a list of kafka topics in the Kafka cluster", - Elem: &schema.Schema{Type: schema.TypeString}, + Description: "A list containing all the topics.", + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "topic_name": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + Description: "The name of the topic.", + }, + "partitions": &schema.Schema{ + Type: schema.TypeInt, + Computed: true, + Description: "Number of partitions.", + }, + "replication_factor": &schema.Schema{ + Type: schema.TypeInt, + Computed: true, + Description: "Number of replicas.", + }, + "config": &schema.Schema{ + Type: schema.TypeMap, + Computed: true, + Description: "A map of string k/v attributes.", + Elem: schema.TypeString, + }, + }, + }, }, }, } } -func dataSourceTopicsRead(d *schema.ResourceData, meta interface{}) error { +func dataSourceTopicsRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + var diags diag.Diagnostics client := meta.(*LazyClient) - topicList, err := client.GetKafkaTopicList() + topicList, err := client.GetKafkaTopics() if err != nil { - return err + return diag.FromErr(err) + } + + topics := flattenTopicsData(&topicList) + if err := d.Set("list", topics); err != nil { + return diag.FromErr(err) } - err = d.Set("list", topicList) if err != nil { - return err + return diag.FromErr(err) } d.SetId(time.Now().UTC().String()) - return nil + return diags +} + +func flattenTopicsData(topicList *[]Topic) []interface{} { + if topicList != nil { + topics := make([]interface{}, len(*topicList), len(*topicList)) + for i, topic := range *topicList { + topicObj := make(map[string]interface{}) + topicObj["topic_name"] = topic.Name + topicObj["replication_factor"] = topic.ReplicationFactor + topicObj["partitions"] = topic.Partitions + topicObj["config"] = topic.Config + topics[i] = topicObj + } + return topics + } + return make([]interface{}, 0) } diff --git a/kafka/data_source_kafka_topics_test.go b/kafka/data_source_kafka_topics_test.go index d0998746..745b874f 100755 --- a/kafka/data_source_kafka_topics_test.go +++ b/kafka/data_source_kafka_topics_test.go @@ -14,10 +14,12 @@ func TestAcc_Topics(t *testing.T) { ProviderFactories: overrideProviderFactory(), Steps: []r.TestStep{ { - Config: cfg(t, bs,testDataSourceKafkaTopics), + Config: cfg(t, bs, testDataSourceKafkaTopics), Check: r.ComposeTestCheckFunc( - r.TestCheckResourceAttr("data.kafka_topics.test", "list.0", expectedTopic), - r.TestCheckResourceAttr("data.kafka_topics.test", "list.#", "1"), + r.TestCheckOutput("partitions", "1"), + r.TestCheckOutput("replication_factor", "3"), + r.TestCheckOutput("topic_name", expectedTopic), + r.TestCheckOutput("retention_ms", "31536000000"), ), }, }, @@ -27,4 +29,20 @@ func TestAcc_Topics(t *testing.T) { const testDataSourceKafkaTopics = ` data "kafka_topics" "test" { } -` \ No newline at end of file + +output "partitions" { + value = data.kafka_topics.test.list[0].partitions +} + +output "replication_factor" { + value = data.kafka_topics.test.list[0].replication_factor +} + +output "topic_name" { + value = data.kafka_topics.test.list[0].topic_name +} + +output "retention_ms" { + value = data.kafka_topics.test.list[0].config["retention.ms"] +} +` diff --git a/kafka/provider.go b/kafka/provider.go index 761819cd..bfad786e 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -107,7 +107,7 @@ func Provider() *schema.Provider { "kafka_user_scram_credential": kafkaUserScramCredentialResource(), }, DataSourcesMap: map[string]*schema.Resource{ - "kafka_topic": kafkaTopicDataSource(), + "kafka_topic": kafkaTopicDataSource(), "kafka_topics": kafkaTopicsDataSource(), }, } From a6b39252623f750672c9aa431a3eef0b87cfcbcf Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sat, 8 Jan 2022 13:40:07 +0800 Subject: [PATCH 5/5] fix linting test and add a custom acc test func --- kafka/client.go | 2 +- kafka/data_source_kafka_topics.go | 6 +-- kafka/data_source_kafka_topics_test.go | 71 ++++++++++++++++++-------- 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index 7c0d2612..943c5bb0 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -605,7 +605,7 @@ func (c *Client) getKafkaTopics() ([]Topic, error) { return nil, err } topicList := make([]Topic, len(topics)) - for i, _ := range topicList { + for i := range topicList { topicList[i], err = c.ReadTopic(topics[i], true) if err != nil { return nil, err diff --git a/kafka/data_source_kafka_topics.go b/kafka/data_source_kafka_topics.go index 6031c07c..2f00f772 100755 --- a/kafka/data_source_kafka_topics.go +++ b/kafka/data_source_kafka_topics.go @@ -2,9 +2,9 @@ package kafka import ( "context" + "fmt" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "time" ) func kafkaTopicsDataSource() *schema.Resource { @@ -60,13 +60,13 @@ func dataSourceTopicsRead(ctx context.Context, d *schema.ResourceData, meta inte if err != nil { return diag.FromErr(err) } - d.SetId(time.Now().UTC().String()) + d.SetId(fmt.Sprint(len(topics))) return diags } func flattenTopicsData(topicList *[]Topic) []interface{} { if topicList != nil { - topics := make([]interface{}, len(*topicList), len(*topicList)) + topics := make([]interface{}, len(*topicList)) for i, topic := range *topicList { topicObj := make(map[string]interface{}) topicObj["topic_name"] = topic.Name diff --git a/kafka/data_source_kafka_topics_test.go b/kafka/data_source_kafka_topics_test.go index 745b874f..f809f884 100755 --- a/kafka/data_source_kafka_topics_test.go +++ b/kafka/data_source_kafka_topics_test.go @@ -1,25 +1,29 @@ package kafka import ( + "fmt" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" "testing" r "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" ) func TestAcc_Topics(t *testing.T) { + u, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + topicName := fmt.Sprintf("syslog-%s", u) + bs := testBootstrapServers[0] - // Should be only one topic in a brand new kafka cluster - expectedTopic := "__confluent.support.metrics" r.Test(t, r.TestCase{ ProviderFactories: overrideProviderFactory(), Steps: []r.TestStep{ { - Config: cfg(t, bs, testDataSourceKafkaTopics), + Config: cfg(t, bs, fmt.Sprintf(testDataSourceKafkaTopics, topicName)), Check: r.ComposeTestCheckFunc( - r.TestCheckOutput("partitions", "1"), - r.TestCheckOutput("replication_factor", "3"), - r.TestCheckOutput("topic_name", expectedTopic), - r.TestCheckOutput("retention_ms", "31536000000"), + testDatasourceTopics, ), }, }, @@ -27,22 +31,47 @@ func TestAcc_Topics(t *testing.T) { } const testDataSourceKafkaTopics = ` -data "kafka_topics" "test" { +resource "kafka_topic" "test" { + name = "%[1]s" + replication_factor = 1 + partitions = 1 + config = { + "retention.ms" = "22222" + } } - -output "partitions" { - value = data.kafka_topics.test.list[0].partitions -} - -output "replication_factor" { - value = data.kafka_topics.test.list[0].replication_factor +data "kafka_topics" "test" { + depends_on = [kafka_topic.test] } +` -output "topic_name" { - value = data.kafka_topics.test.list[0].topic_name -} +func testDatasourceTopics(s *terraform.State) error { + resourceState := s.Modules[0].Resources["data.kafka_topics.test"] + if resourceState == nil { + return fmt.Errorf("resource not found in state") + } + instanceState := resourceState.Primary + client := testProvider.Meta().(*LazyClient) + expectedTopics, err := client.GetKafkaTopics() + if err != nil { + return fmt.Errorf(err.Error()) + } + for i := 0; i < len(expectedTopics); i++ { + expectedTopicName := instanceState.Attributes[fmt.Sprintf("list.%d.topic_name", i)] + expectedTopicOutput, err := client.ReadTopic(expectedTopicName, true) + if err != nil { + return fmt.Errorf(err.Error()) + } -output "retention_ms" { - value = data.kafka_topics.test.list[0].config["retention.ms"] + if instanceState.Attributes[fmt.Sprintf("list.%d.partitions", i)] != fmt.Sprint(expectedTopicOutput.Partitions) { + return fmt.Errorf("expected %d for topic %s partition, got %s", expectedTopicOutput.Partitions, expectedTopicOutput.Name, instanceState.Attributes[fmt.Sprintf("list.%d.partitions", i)]) + } + if instanceState.Attributes[fmt.Sprintf("list.%d.replication_factor", i)] != fmt.Sprint(expectedTopicOutput.ReplicationFactor) { + return fmt.Errorf("expected %d for topic %s replication factor, got %s", expectedTopicOutput.ReplicationFactor, expectedTopicOutput.Name, instanceState.Attributes[fmt.Sprintf("list.%d.replication_factor", i)]) + } + retentionMs := expectedTopicOutput.Config["retention.ms"] + if instanceState.Attributes[fmt.Sprintf("list.%d.config.retention.ms", i)] != *retentionMs { + return fmt.Errorf("expected %s for topic %s config retention.ms, got %s", *retentionMs, expectedTopicOutput.Name, instanceState.Attributes[fmt.Sprintf("list.%d.config.retention.ms", i)]) + } + } + return nil } -`