Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase producer rety backoff in acc test #441

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@

func NewClient(config *Config) (*Client, error) {
if config == nil {
return nil, errors.New("Cannot create client without kafka config")
return nil, errors.New("cannot create client without kafka config")
}

log.Printf("[TRACE] configuring bootstrap_servers %v", config.copyWithMaskedSensitiveValues())
if config.BootstrapServers == nil {
return nil, fmt.Errorf("No bootstrap_servers provided")
return nil, fmt.Errorf("no bootstrap_servers provided")
}

bootstrapServers := *(config.BootstrapServers)
if bootstrapServers == nil {
return nil, fmt.Errorf("No bootstrap_servers provided")
return nil, fmt.Errorf("no bootstrap_servers provided")
}

log.Printf("[INFO] configuring kafka client with %v", config.copyWithMaskedSensitiveValues())
Expand Down Expand Up @@ -369,7 +369,9 @@
return err
}

return admin.AlterPartitionReassignments(t.Name, *assignment)
err = admin.AlterPartitionReassignments(t.Name, *assignment)
c.client.RefreshMetadata(t.Name)

Check failure on line 373 in kafka/client.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `c.client.RefreshMetadata` is not checked (errcheck)
return err
}

func (c *Client) buildAssignment(t Topic) (*[][]int32, error) {
Expand Down
2 changes: 1 addition & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*t
ok := caCertPool.AppendCertsFromPEM(caBytes)
log.Printf("[TRACE] set cert pool %v", ok)
if !ok {
return &tlsConfig, fmt.Errorf("Couldn't add the caPem")
return &tlsConfig, fmt.Errorf("couldn't add the caPem")
}

tlsConfig.RootCAs = caCertPool
Expand Down
3 changes: 1 addition & 2 deletions kafka/data_source_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ func dataSourceTopicRead(d *schema.ResourceData, meta interface{}) error {

client := meta.(*LazyClient)
topic, err := client.ReadTopic(name, true)

if err != nil {
log.Printf("[ERROR] Error getting topic %s from Kafka: %s", name, err)
_, ok := err.(TopicMissingError)

if ok {
return fmt.Errorf("Could not find topic '%s'", name)
return fmt.Errorf("could not find topic '%s'", name)
}

return err
Expand Down
16 changes: 8 additions & 8 deletions kafka/kafka_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ func tfToAclCreation(s StringlyTypedACL) (*sarama.AclCreation, error) {

op := stringToOperation(s.ACL.Operation)
if op == unknownConversion {
return acl, fmt.Errorf("Unknown operation: %s", s.ACL.Operation)
return acl, fmt.Errorf("unknown operation: %s", s.ACL.Operation)
}
pType := stringToAclPermissionType(s.ACL.PermissionType)
if pType == unknownConversion {
return acl, fmt.Errorf("Unknown permission type: %s", s.ACL.PermissionType)
return acl, fmt.Errorf("unknown permission type: %s", s.ACL.PermissionType)
}
rType := stringToACLResource(s.Resource.Type)
if rType == unknownConversion {
return acl, fmt.Errorf("Unknown resource type: %s", s.Resource.Type)
return acl, fmt.Errorf("unknown resource type: %s", s.Resource.Type)
}
patternType := stringToACLPrefix(s.Resource.PatternTypeFilter)
if patternType == unknownConversion {
return acl, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
return acl, fmt.Errorf("unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
}

acl.Acl = sarama.Acl{
Expand Down Expand Up @@ -77,25 +77,25 @@ func tfToAclFilter(s StringlyTypedACL) (sarama.AclFilter, error) {

op := stringToOperation(s.ACL.Operation)
if op == unknownConversion {
return f, fmt.Errorf("Unknown operation: %s", s.ACL.Operation)
return f, fmt.Errorf("unknown operation: %s", s.ACL.Operation)
}
f.Operation = op

pType := stringToAclPermissionType(s.ACL.PermissionType)
if pType == unknownConversion {
return f, fmt.Errorf("Unknown permission type: %s", s.ACL.PermissionType)
return f, fmt.Errorf("unknown permission type: %s", s.ACL.PermissionType)
}
f.PermissionType = pType

rType := stringToACLResource(s.Resource.Type)
if rType == unknownConversion {
return f, fmt.Errorf("Unknown resource type: %s", s.Resource.Type)
return f, fmt.Errorf("unknown resource type: %s", s.Resource.Type)
}
f.ResourceType = rType

patternType := stringToACLPrefix(s.Resource.PatternTypeFilter)
if patternType == unknownConversion {
return f, fmt.Errorf("Unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
return f, fmt.Errorf("unknown pattern type filter: '%s'", s.Resource.PatternTypeFilter)
}
f.ResourcePatternTypeFilter = patternType

Expand Down
2 changes: 1 addition & 1 deletion kafka/kafka_quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) DescribeQuota(entityType string, entityName string) (*Quota, er

if err == nil {
if quotaR.ErrorCode != sarama.ErrNoError {
return nil, fmt.Errorf("Error describing quota %s", quotaR.ErrorCode)
return nil, fmt.Errorf("error describing quota %s", quotaR.ErrorCode)
}
}

Expand Down
2 changes: 1 addition & 1 deletion kafka/kafka_user_scram_credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Client) DescribeUserScramCredential(username string, mechanism string)

num := len(results)
if num != 1 {
return nil, fmt.Errorf("Got %d results (expected 1) when describing user scram credential %s", num, username)
return nil, fmt.Errorf("got %d results (expected 1) when describing user scram credential %s", num, username)
}

res := results[0]
Expand Down
2 changes: 1 addition & 1 deletion kafka/migrate_kafka_acl_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func migrateKafkaAclState(v int, is *terraform.InstanceState, meta interface{})
log.Println("[INFO] Found Kafka ACL v0 state; migrating to v1")
return migrateKafkaAclV0toV1(is)
default:
return is, fmt.Errorf("Unexpected schema version: %d", v)
return is, fmt.Errorf("unexpected schema version: %d", v)
}
}

Expand Down
4 changes: 2 additions & 2 deletions kafka/resource_kafka_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestAcc_BasicQuota(t *testing.T) {
CheckDestroy: testAccCheckQuotaDestroy,
Steps: []r.TestStep{
{
Config: cfgs(t, bs, fmt.Sprintf(testResourceQuota1, quotaEntityName, "4000000")),
Config: cfgs(bs, fmt.Sprintf(testResourceQuota1, quotaEntityName, "4000000")),
Check: testResourceQuota_initialCheck,
},
},
Expand Down Expand Up @@ -174,7 +174,7 @@ func testAccCheckQuotaDestroy(s *terraform.State) error {
}

// lintignore:AT004
func cfgs(t *testing.T, bs string, extraCfg string) string {
func cfgs(bs string, extraCfg string) string {
return fmt.Sprintf(`
provider "kafka" {
bootstrap_servers = ["%s"]
Expand Down
4 changes: 2 additions & 2 deletions kafka/resource_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func topicDelete(ctx context.Context, d *schema.ResourceData, meta interface{})
stateConf := &retry.StateChangeConf{
Pending: []string{"Pending"},
Target: []string{"Deleted"},
Refresh: topicDeleteFunc(c, d.Id(), t),
Refresh: topicDeleteFunc(c, t),
Timeout: 300 * time.Second,
Delay: 3 * time.Second,
PollInterval: 2 * time.Second,
Expand All @@ -237,7 +237,7 @@ func topicDelete(ctx context.Context, d *schema.ResourceData, meta interface{})
return nil
}

func topicDeleteFunc(client *LazyClient, id string, t Topic) retry.StateRefreshFunc {
func topicDeleteFunc(client *LazyClient, t Topic) retry.StateRefreshFunc {
return func() (result interface{}, s string, err error) {
topic, err := client.ReadTopic(t.Name, true)

Expand Down
31 changes: 26 additions & 5 deletions kafka/resource_kafka_topic_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"errors"
"fmt"
"log"
"strconv"
Expand Down Expand Up @@ -144,14 +145,17 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) {

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckTopicDestroy,
PreCheck: func() {
testAccPreCheck(t)
},
CheckDestroy: testAccCheckTopicDestroy,
Steps: []r.TestStep{
{
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 7)),
Check: r.ComposeTestCheckFunc(
testResourceTopic_initialCheck,
testResourceTopic_produceMessages(messages),
testResourceTopic_initialCheck),
),
},
{
Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 3, 7)),
Expand Down Expand Up @@ -250,8 +254,12 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes
}
kafkaConfig.Producer.Return.Errors = true
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Metadata.Full = true

kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
kafkaConfig.Producer.Timeout = 90 * time.Second
kafkaConfig.Producer.Retry.Max = 5
kafkaConfig.Producer.Retry.Backoff = time.Second

producer, err := sarama.NewSyncProducer(*config.BootstrapServers, kafkaConfig)
if err != nil {
Expand All @@ -264,8 +272,21 @@ func testResourceTopic_produceMessages(messages []*sarama.ProducerMessage) r.Tes
}
}()

if err := producer.SendMessages(messages); err != nil {
return err
// rety 5 times
retries := 5
produceErrs := make([]error, 0, retries)
for i := 0; i < retries; i++ {
if errs := producer.SendMessages(messages); errs != nil {
produceErrs = append(produceErrs, errs)
for _, err := range errs.(sarama.ProducerErrors) {
log.Println("[ERROR] Write to kafka failed: ", err)
if i == retries-1 {
return errors.Join(produceErrs...)
}
}
} else {
return nil
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion kafka/resource_kafka_user_scram_credential.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func importSCRAM(ctx context.Context, d *schema.ResourceData, m interface{}) ([]
return nil, errSet.err
}
} else {
return nil, fmt.Errorf("Failed importing resource; expected format is username|scram_mechanism|password - got %v segments instead of 3", len(parts))
return nil, fmt.Errorf("failed importing resource; expected format is username|scram_mechanism|password - got %v segments instead of 3", len(parts))
}

return []*schema.ResourceData{d}, nil
Expand Down
2 changes: 1 addition & 1 deletion kafka/resource_kafka_user_scram_credential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAcc_UserScramCredentialBasic(t *testing.T) {
CheckDestroy: testAccCheckUserScramCredentialDestroy,
Steps: []r.TestStep{
{
Config: cfgs(t, bs, fmt.Sprintf(testResourceUserScramCredential_SHA256, username)),
Config: cfgs(bs, fmt.Sprintf(testResourceUserScramCredential_SHA256, username)),
Check: testResourceUserScramCredentialCheck_withoutIterations,
},
},
Expand Down
6 changes: 4 additions & 2 deletions kafka/scram_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
var (
SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
)

type XDGSCRAMClient struct {
*scram.Client
Expand Down
5 changes: 2 additions & 3 deletions kafka/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ func ReplicaCount(c sarama.Client, topic string, partitions []int32) (int, error
for _, p := range partitions {
replicas, err := c.Replicas(topic, p)
if err != nil {
return -1, errors.New("Could not get replicas for partition")
return -1, errors.New("could not get replicas for partition")
}
if count == -1 {
count = len(replicas)
}
if count != len(replicas) {
return count, fmt.Errorf("The replica count isn't the same across partitions %d != %d", count, len(replicas))
return count, fmt.Errorf("replica count isn't the same across partitions %d != %d", count, len(replicas))
}
}
return count, nil

}

func configToResources(topic Topic) []*sarama.AlterConfigsResource {
Expand Down
Loading