From 3605e2ca9097fe43b7a2f07b99e362bcafda7948 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 12:36:21 +0100 Subject: [PATCH 1/9] WIP --- kafka/config.go | 4 +++- kafka/lazy_client.go | 30 ++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/kafka/config.go b/kafka/config.go index e0028f93..7e9502cb 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -34,6 +34,8 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { kafkaConfig.Admin.Timeout = time.Duration(c.Timeout) * time.Second kafkaConfig.Metadata.Full = true // the default, but just being clear kafkaConfig.Metadata.AllowAutoTopicCreation = false + kafkaConfig.Metadata.Retry.Max = 10 + kafkaConfig.Metadata.Retry.Backoff = 250 * time.Millisecond kafkaConfig.Net.Proxy.Enable = true kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment() @@ -93,7 +95,7 @@ func parsePemOrLoadFromFile(input string) (*pem.Block, []byte, error) { if inputBlock == nil { //attempt to load from file - log.Printf("[INFO] Attempting to load from file '%s'", input) + log.Printf("[TRACE] Attempting to load from file '%s'", input) var err error inputBytes, err = os.ReadFile(input) if err != nil { diff --git a/kafka/lazy_client.go b/kafka/lazy_client.go index 7840c1a5..7c661652 100644 --- a/kafka/lazy_client.go +++ b/kafka/lazy_client.go @@ -30,7 +30,10 @@ func (c *LazyClient) init() error { log.Printf("[TRACE] lazy client init %s", c.initErr) } if c.initErr == sarama.ErrBrokerNotAvailable || c.initErr == sarama.ErrOutOfBrokers { + log.Printf("[ERROR] Cannot connect to Kafka broker(s) %v", *(c.Config.BootstrapServers)) + log.Printf("[ERROR] Check if Kafka broker(s) are up and running") if c.Config.TLSEnabled { + log.Printf("[ERROR] Check if Kafka broker(s) are reachable from this machine using TLS") tlsError := c.checkTLSConfig() if tlsError != nil { return fmt.Errorf("%w\n%s", tlsError, c.initErr) @@ -48,14 +51,29 @@ func (c *LazyClient) checkTLSConfig() error { } brokers := *(c.Config.BootstrapServers) - broker := brokers[0] - tlsConf := kafkaConfig.Net.TLS.Config - conn, err := tls.Dial("tcp", broker, tlsConf) - if err != nil { - return err + errs := make([]error, 0, len(brokers)) + + for i := 0; i < len(brokers); i++ { + broker := brokers[i] + tlsConf := kafkaConfig.Net.TLS.Config + conn, err := tls.Dial("tcp", broker, tlsConf) + if err != nil { + errs = append(errs, err) + continue + } + + err = conn.Handshake() + if err != nil { + errs = append(errs, err) + continue + } + } + + if len(errs) > 0 { + return fmt.Errorf("TLS handshake failed for all brokers: %v", errs) } - return conn.Handshake() + return nil } func (c *LazyClient) CreateTopic(t Topic) error { From 3af9596f3e12593ece7c4e7fee72f8de2c77464d Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 12:41:54 +0100 Subject: [PATCH 2/9] Wait for kafka --- .github/workflows/test.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index feb1250f..ac262264 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -58,6 +58,14 @@ jobs: restore-keys: | ${{ runner.os }}-go- - name: Bring up kafka + zk - run: docker-compose up -d + uses: JarvusInnovations/background-action@v1 + with: + run: docker compose up & + wait-on: | + tcp:localhost:9092 + tcp:localhost:9093 + tail: true + log-output-if: true + - name: "Run tests" run: make testacc From 7de7e0aa253523f60fc4af4c0c96e707322f0b74 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 13:38:34 +0100 Subject: [PATCH 3/9] sleep --- .github/workflows/test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ac262264..5fcfde46 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -63,9 +63,8 @@ jobs: run: docker compose up & wait-on: | tcp:localhost:9092 - tcp:localhost:9093 tail: true log-output-if: true - name: "Run tests" - run: make testacc + run: sleep 10 && make testacc From 16cc6fd03d1d27a41677108699b0c223bb4df643 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:01:16 +0100 Subject: [PATCH 4/9] Add retry loop --- kafka/client.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index 93293a70..695b67f5 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -53,10 +53,20 @@ func NewClient(config *Config) (*Client, error) { return nil, err } - c, err := sarama.NewClient(bootstrapServers, kc) - if err != nil { - log.Printf("[ERROR] Error connecting to kafka %s", err) - return nil, err + // warning: at this point sarama will attempt to connect to the kafka cluster + + maxRetry := 5 + for i := 0; i < maxRetry; i++ { + + c, err := sarama.NewClient(bootstrapServers, kc) + if err != nil { + log.Printf("[ERROR] Error connecting to kafka %s", err) + if i == maxRetry-1 { + return nil, err + } + + time.Sleep(5 * time.Second) + } } client := &Client{ From 2c5cb25f5792b4505a47cb1c5d1474fb4d56c96d Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:03:56 +0100 Subject: [PATCH 5/9] Add retry loop --- kafka/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index 695b67f5..51c7529e 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -56,9 +56,10 @@ func NewClient(config *Config) (*Client, error) { // warning: at this point sarama will attempt to connect to the kafka cluster maxRetry := 5 - for i := 0; i < maxRetry; i++ { - c, err := sarama.NewClient(bootstrapServers, kc) + var c sarama.Client + for i := 0; i < maxRetry; i++ { + c, err = sarama.NewClient(bootstrapServers, kc) if err != nil { log.Printf("[ERROR] Error connecting to kafka %s", err) if i == maxRetry-1 { From 51ed524ca1d0f931bc7ef78ab95fc4c3fe755c01 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:08:21 +0100 Subject: [PATCH 6/9] Logging --- kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client.go b/kafka/client.go index 51c7529e..ae193f0e 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -61,7 +61,7 @@ func NewClient(config *Config) (*Client, error) { for i := 0; i < maxRetry; i++ { c, err = sarama.NewClient(bootstrapServers, kc) if err != nil { - log.Printf("[ERROR] Error connecting to kafka %s", err) + log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", i, maxRetry, err) if i == maxRetry-1 { return nil, err } From c413bf7872693255a13230cf6006f45766e50676 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:13:09 +0100 Subject: [PATCH 7/9] rename --- kafka/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index ae193f0e..52efb564 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -58,11 +58,11 @@ func NewClient(config *Config) (*Client, error) { maxRetry := 5 var c sarama.Client - for i := 0; i < maxRetry; i++ { + for attempt := 0; attempt < maxRetry; attempt++ { c, err = sarama.NewClient(bootstrapServers, kc) if err != nil { - log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", i, maxRetry, err) - if i == maxRetry-1 { + log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", attempt, maxRetry, err) + if attempt >= maxRetry-1 { return nil, err } From 50c1e1146384ef911d82c07e5c025e948d07f32b Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:13:20 +0100 Subject: [PATCH 8/9] rename --- kafka/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client.go b/kafka/client.go index 52efb564..184af66d 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -61,7 +61,7 @@ func NewClient(config *Config) (*Client, error) { for attempt := 0; attempt < maxRetry; attempt++ { c, err = sarama.NewClient(bootstrapServers, kc) if err != nil { - log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", attempt, maxRetry, err) + log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", attempt+1, maxRetry, err) if attempt >= maxRetry-1 { return nil, err } From 7ab2c8607fa3d3849cfd03ea89f1e6f7f32179de Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Mon, 16 Oct 2023 14:43:45 +0100 Subject: [PATCH 9/9] Remove debug logs --- GNUmakefile | 1 - 1 file changed, 1 deletion(-) diff --git a/GNUmakefile b/GNUmakefile index 7d44c9f3..e2efe589 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -16,7 +16,6 @@ testacc: KAFKA_CLIENT_KEY_PASSPHRASE=test-pass \ KAFKA_SKIP_VERIFY=false \ KAFKA_ENABLE_TLS=true \ - TF_LOG=DEBUG \ TF_ACC=1 go test ./kafka -v $(TESTARGS) -timeout 9m -count=1 .PHONY: build test testacc