From 89cbfb99a47254b250525e5dc5a30bc0dfe3aca6 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Sat, 10 Aug 2024 01:02:14 +0100 Subject: [PATCH 1/6] increase producer rety backoff in acc test --- kafka/resource_kafka_topic_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index 459dcb99..2aab07c9 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -252,6 +252,8 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes kafkaConfig.Producer.Return.Successes = true kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll kafkaConfig.Producer.Timeout = 90 * time.Second + kafkaConfig.Producer.Retry.Max = 5 + kafkaConfig.Producer.Retry.Backoff = time.Second producer, err := sarama.NewSyncProducer(*config.BootstrapServers, kafkaConfig) if err != nil { From c92ac08614483b1658ae194b2c1abc8f5cd94af7 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Sat, 10 Aug 2024 01:04:50 +0100 Subject: [PATCH 2/6] reorder, return err message --- kafka/resource_kafka_topic_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index 2aab07c9..ca1a4462 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -150,8 +150,9 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) { { Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 7)), Check: r.ComposeTestCheckFunc( + testResourceTopic_initialCheck, testResourceTopic_produceMessages(messages), - testResourceTopic_initialCheck), + ), }, { Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 3, 7)), @@ -266,8 +267,13 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes } }() - if err := producer.SendMessages(messages); err != nil { - return err + if errs := producer.SendMessages(messages); errs != nil { + for _, err := range errs.(sarama.ProducerErrors) { + log.Println("[ERROR] Write to kafka failed: ", err) + return err + } + return errs + } return nil From 64d256ca2dee39cc0fa73c4e2d90fcc769765957 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Fri, 16 Aug 2024 11:04:54 +0100 Subject: [PATCH 3/6] linting --- kafka/client.go | 6 +++--- kafka/config.go | 2 +- kafka/data_source_kafka_topic.go | 3 +-- kafka/kafka_acls.go | 16 ++++++++-------- kafka/kafka_quotas.go | 2 +- kafka/kafka_user_scram_credentials.go | 2 +- kafka/migrate_kafka_acl_state.go | 2 +- kafka/resource_kafka_quota_test.go | 4 ++-- kafka/resource_kafka_topic.go | 4 ++-- kafka/resource_kafka_user_scram_credential.go | 2 +- .../resource_kafka_user_scram_credential_test.go | 2 +- kafka/scram_client.go | 6 ++++-- kafka/topic.go | 5 ++--- 13 files changed, 28 insertions(+), 28 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index de533682..45bd17e5 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -57,17 +57,17 @@ type Client struct { func NewClient(config *Config) (*Client, error) { if config == nil { - return nil, errors.New("Cannot create client without kafka config") + return nil, errors.New("cannot create client without kafka config") } log.Printf("[TRACE] configuring bootstrap_servers %v", config.copyWithMaskedSensitiveValues()) if config.BootstrapServers == nil { - return nil, fmt.Errorf("No bootstrap_servers provided") + return nil, fmt.Errorf("no bootstrap_servers provided") } bootstrapServers := *(config.BootstrapServers) if bootstrapServers == nil { - return nil, fmt.Errorf("No bootstrap_servers provided") + return nil, fmt.Errorf("no bootstrap_servers provided") } log.Printf("[INFO] configuring kafka client with %v", config.copyWithMaskedSensitiveValues()) diff --git a/kafka/config.go b/kafka/config.go index 306d2eb7..f3b57be9 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -282,7 +282,7 @@ func newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*t ok := caCertPool.AppendCertsFromPEM(caBytes) log.Printf("[TRACE] set cert pool %v", ok) if !ok { - return &tlsConfig, fmt.Errorf("Couldn't add the caPem") + return &tlsConfig, fmt.Errorf("couldn't add the caPem") } tlsConfig.RootCAs = caCertPool diff --git a/kafka/data_source_kafka_topic.go b/kafka/data_source_kafka_topic.go index 312b5d52..270efd54 100755 --- a/kafka/data_source_kafka_topic.go +++ b/kafka/data_source_kafka_topic.go @@ -43,13 +43,12 @@ func dataSourceTopicRead(d *schema.ResourceData, meta interface{}) error { client := meta.(*LazyClient) topic, err := client.ReadTopic(name, true) - if err != nil { log.Printf("[ERROR] Error getting topic %s from Kafka: %s", name, err) _, ok := err.(TopicMissingError) if ok { - return fmt.Errorf("Could not find topic '%s'", name) + return fmt.Errorf("could not find topic '%s'", name) } return err diff --git a/kafka/kafka_acls.go b/kafka/kafka_acls.go index 3a241a60..a683e773 100644 --- a/kafka/kafka_acls.go +++ b/kafka/kafka_acls.go @@ -36,19 +36,19 @@ func tfToAclCreation(s StringlyTypedACL) (*sarama.AclCreation, error) { op := stringToOperation(s.ACL.Operation) if op == unknownConversion { - return acl, fmt.Errorf("Unknown operation: %s", s.ACL.Operation) + return acl, fmt.Errorf("unknown operation: %s", s.ACL.Operation) } pType := stringToAclPermissionType(s.ACL.PermissionType) if pType == unknownConversion { - return acl, fmt.Errorf("Unknown permission type: %s", s.ACL.PermissionType) + return acl, fmt.Errorf("unknown permission type: %s", s.ACL.PermissionType) } rType := stringToACLResource(s.Resource.Type) if rType == unknownConversion { - return acl, fmt.Errorf("Unknown resource type: %s", s.Resource.Type) + return acl, fmt.Errorf("unknown resource type: %s", s.Resource.Type) } patternType := stringToACLPrefix(s.Resource.PatternTypeFilter) if patternType == unknownConversion { - return acl, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter) + return acl, fmt.Errorf("unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter) } acl.Acl = sarama.Acl{ @@ -77,25 +77,25 @@ func tfToAclFilter(s StringlyTypedACL) (sarama.AclFilter, error) { op := stringToOperation(s.ACL.Operation) if op == unknownConversion { - return f, fmt.Errorf("Unknown operation: %s", s.ACL.Operation) + return f, fmt.Errorf("unknown operation: %s", s.ACL.Operation) } f.Operation = op pType := stringToAclPermissionType(s.ACL.PermissionType) if pType == unknownConversion { - return f, fmt.Errorf("Unknown permission type: %s", s.ACL.PermissionType) + return f, fmt.Errorf("unknown permission type: %s", s.ACL.PermissionType) } f.PermissionType = pType rType := stringToACLResource(s.Resource.Type) if rType == unknownConversion { - return f, fmt.Errorf("Unknown resource type: %s", s.Resource.Type) + return f, fmt.Errorf("unknown resource type: %s", s.Resource.Type) } f.ResourceType = rType patternType := stringToACLPrefix(s.Resource.PatternTypeFilter) if patternType == unknownConversion { - return f, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter) + return f, fmt.Errorf("unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter) } f.ResourcePatternTypeFilter = patternType diff --git a/kafka/kafka_quotas.go b/kafka/kafka_quotas.go index bca25c9d..9d349303 100644 --- a/kafka/kafka_quotas.go +++ b/kafka/kafka_quotas.go @@ -113,7 +113,7 @@ func (c *Client) DescribeQuota(entityType string, entityName string) (*Quota, er if err == nil { if quotaR.ErrorCode != sarama.ErrNoError { - return nil, fmt.Errorf("Error describing quota %s", quotaR.ErrorCode) + return nil, fmt.Errorf("error describing quota %s", quotaR.ErrorCode) } } diff --git a/kafka/kafka_user_scram_credentials.go b/kafka/kafka_user_scram_credentials.go index 5e85581c..ba6aede7 100644 --- a/kafka/kafka_user_scram_credentials.go +++ b/kafka/kafka_user_scram_credentials.go @@ -74,7 +74,7 @@ func (c *Client) DescribeUserScramCredential(username string, mechanism string) num := len(results) if num != 1 { - return nil, fmt.Errorf("Got %d results (expected 1) when describing user scram credential %s", num, username) + return nil, fmt.Errorf("got %d results (expected 1) when describing user scram credential %s", num, username) } res := results[0] diff --git a/kafka/migrate_kafka_acl_state.go b/kafka/migrate_kafka_acl_state.go index d5ecbcee..d7ef631f 100644 --- a/kafka/migrate_kafka_acl_state.go +++ b/kafka/migrate_kafka_acl_state.go @@ -13,7 +13,7 @@ func migrateKafkaAclState(v int, is *terraform.InstanceState, meta interface{}) log.Println("[INFO] Found Kafka ACL v0 state; migrating to v1") return migrateKafkaAclV0toV1(is) default: - return is, fmt.Errorf("Unexpected schema version: %d", v) + return is, fmt.Errorf("unexpected schema version: %d", v) } } diff --git a/kafka/resource_kafka_quota_test.go b/kafka/resource_kafka_quota_test.go index 47871ac7..9b24900f 100644 --- a/kafka/resource_kafka_quota_test.go +++ b/kafka/resource_kafka_quota_test.go @@ -24,7 +24,7 @@ func TestAcc_BasicQuota(t *testing.T) { CheckDestroy: testAccCheckQuotaDestroy, Steps: []r.TestStep{ { - Config: cfgs(t, bs, fmt.Sprintf(testResourceQuota1, quotaEntityName, "4000000")), + Config: cfgs(bs, fmt.Sprintf(testResourceQuota1, quotaEntityName, "4000000")), Check: testResourceQuota_initialCheck, }, }, @@ -174,7 +174,7 @@ func testAccCheckQuotaDestroy(s *terraform.State) error { } // lintignore:AT004 -func cfgs(t *testing.T, bs string, extraCfg string) string { +func cfgs(bs string, extraCfg string) string { return fmt.Sprintf(` provider "kafka" { bootstrap_servers = ["%s"] diff --git a/kafka/resource_kafka_topic.go b/kafka/resource_kafka_topic.go index 1794246a..a8d1656b 100644 --- a/kafka/resource_kafka_topic.go +++ b/kafka/resource_kafka_topic.go @@ -221,7 +221,7 @@ func topicDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) stateConf := &retry.StateChangeConf{ Pending: []string{"Pending"}, Target: []string{"Deleted"}, - Refresh: topicDeleteFunc(c, d.Id(), t), + Refresh: topicDeleteFunc(c, t), Timeout: 300 * time.Second, Delay: 3 * time.Second, PollInterval: 2 * time.Second, @@ -237,7 +237,7 @@ func topicDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) return nil } -func topicDeleteFunc(client *LazyClient, id string, t Topic) retry.StateRefreshFunc { +func topicDeleteFunc(client *LazyClient, t Topic) retry.StateRefreshFunc { return func() (result interface{}, s string, err error) { topic, err := client.ReadTopic(t.Name, true) diff --git a/kafka/resource_kafka_user_scram_credential.go b/kafka/resource_kafka_user_scram_credential.go index 1d75519a..309591df 100644 --- a/kafka/resource_kafka_user_scram_credential.go +++ b/kafka/resource_kafka_user_scram_credential.go @@ -69,7 +69,7 @@ func importSCRAM(ctx context.Context, d *schema.ResourceData, m interface{}) ([] return nil, errSet.err } } else { - return nil, fmt.Errorf("Failed importing resource; expected format is username|scram_mechanism|password - got %v segments instead of 3", len(parts)) + return nil, fmt.Errorf("failed importing resource; expected format is username|scram_mechanism|password - got %v segments instead of 3", len(parts)) } return []*schema.ResourceData{d}, nil diff --git a/kafka/resource_kafka_user_scram_credential_test.go b/kafka/resource_kafka_user_scram_credential_test.go index 5dba611a..bfce1fad 100644 --- a/kafka/resource_kafka_user_scram_credential_test.go +++ b/kafka/resource_kafka_user_scram_credential_test.go @@ -27,7 +27,7 @@ func TestAcc_UserScramCredentialBasic(t *testing.T) { CheckDestroy: testAccCheckUserScramCredentialDestroy, Steps: []r.TestStep{ { - Config: cfgs(t, bs, fmt.Sprintf(testResourceUserScramCredential_SHA256, username)), + Config: cfgs(bs, fmt.Sprintf(testResourceUserScramCredential_SHA256, username)), Check: testResourceUserScramCredentialCheck_withoutIterations, }, }, diff --git a/kafka/scram_client.go b/kafka/scram_client.go index f6aa9d6c..2d55d4c6 100644 --- a/kafka/scram_client.go +++ b/kafka/scram_client.go @@ -8,8 +8,10 @@ import ( "github.com/xdg/scram" ) -var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } -var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } +var ( + SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } + SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } +) type XDGSCRAMClient struct { *scram.Client diff --git a/kafka/topic.go b/kafka/topic.go index d4712008..2ab99943 100644 --- a/kafka/topic.go +++ b/kafka/topic.go @@ -33,17 +33,16 @@ func ReplicaCount(c sarama.Client, topic string, partitions []int32) (int, error for _, p := range partitions { replicas, err := c.Replicas(topic, p) if err != nil { - return -1, errors.New("Could not get replicas for partition") + return -1, errors.New("could not get replicas for partition") } if count == -1 { count = len(replicas) } if count != len(replicas) { - return count, fmt.Errorf("The replica count isn't the same across partitions %d != %d", count, len(replicas)) + return count, fmt.Errorf("replica count isn't the same across partitions %d != %d", count, len(replicas)) } } return count, nil - } func configToResources(topic Topic) []*sarama.AlterConfigsResource { From b46085d8214ec57e46537ab04551c1390aeddfd9 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Fri, 16 Aug 2024 11:07:31 +0100 Subject: [PATCH 4/6] Refresh client metadata after updating topic --- kafka/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/client.go b/kafka/client.go index 45bd17e5..5d7301ee 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -369,7 +369,9 @@ func (c *Client) AlterReplicationFactor(t Topic) error { return err } - return admin.AlterPartitionReassignments(t.Name, *assignment) + err = admin.AlterPartitionReassignments(t.Name, *assignment) + c.client.RefreshMetadata(t.Name) + return err } func (c *Client) buildAssignment(t Topic) (*[][]int32, error) { From 0caf1d94466f8f347d050eeb1afed1b431fd24f9 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Fri, 16 Aug 2024 11:27:46 +0100 Subject: [PATCH 5/6] rety --- kafka/resource_kafka_topic_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index ca1a4462..83fce534 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -267,13 +267,19 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes } }() - if errs := producer.SendMessages(messages); errs != nil { - for _, err := range errs.(sarama.ProducerErrors) { - log.Println("[ERROR] Write to kafka failed: ", err) - return err + // rety 5 times + retries := 5 + for i := 0; i < retries; i++ { + if errs := producer.SendMessages(messages); errs != nil { + for _, err := range errs.(sarama.ProducerErrors) { + log.Println("[ERROR] Write to kafka failed: ", err) + if i == retries-1 { + return err + } + } + } else { + return nil } - return errs - } return nil From eae8e8d0375ea10a31e6bcc3db60ed5fe8ae49f4 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Fri, 16 Aug 2024 11:36:12 +0100 Subject: [PATCH 6/6] WIP --- kafka/resource_kafka_topic_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index 83fce534..47619ece 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -1,6 +1,7 @@ package kafka import ( + "errors" "fmt" "log" "strconv" @@ -144,8 +145,10 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) { r.Test(t, r.TestCase{ ProviderFactories: overrideProviderFactory(), - PreCheck: func() { testAccPreCheck(t) }, - CheckDestroy: testAccCheckTopicDestroy, + PreCheck: func() { + testAccPreCheck(t) + }, + CheckDestroy: testAccCheckTopicDestroy, Steps: []r.TestStep{ { Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 7)), @@ -251,6 +254,8 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes } kafkaConfig.Producer.Return.Errors = true kafkaConfig.Producer.Return.Successes = true + kafkaConfig.Metadata.Full = true + kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll kafkaConfig.Producer.Timeout = 90 * time.Second kafkaConfig.Producer.Retry.Max = 5 @@ -269,12 +274,14 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes // rety 5 times retries := 5 + produceErrs := make([]error, 0, retries) for i := 0; i < retries; i++ { if errs := producer.SendMessages(messages); errs != nil { + produceErrs = append(produceErrs, errs) for _, err := range errs.(sarama.ProducerErrors) { log.Println("[ERROR] Write to kafka failed: ", err) if i == retries-1 { - return err + return errors.Join(produceErrs...) } } } else {