Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #353

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft

WIP #353

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ jobs:
restore-keys: |
${{ runner.os }}-go-
- name: Bring up kafka + zk
run: docker-compose up -d
uses: JarvusInnovations/background-action@v1
with:
run: docker compose up &
wait-on: |
tcp:localhost:9092
tail: true
log-output-if: true

- name: "Run tests"
run: make testacc
run: sleep 10 && make testacc
1 change: 0 additions & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ testacc:
KAFKA_CLIENT_KEY_PASSPHRASE=test-pass \
KAFKA_SKIP_VERIFY=false \
KAFKA_ENABLE_TLS=true \
TF_LOG=DEBUG \
TF_ACC=1 go test ./kafka -v $(TESTARGS) -timeout 9m -count=1

.PHONY: build test testacc
19 changes: 15 additions & 4 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,21 @@ func NewClient(config *Config) (*Client, error) {
return nil, err
}

c, err := sarama.NewClient(bootstrapServers, kc)
if err != nil {
log.Printf("[ERROR] Error connecting to kafka %s", err)
return nil, err
// warning: at this point sarama will attempt to connect to the kafka cluster

maxRetry := 5

var c sarama.Client
for attempt := 0; attempt < maxRetry; attempt++ {
c, err = sarama.NewClient(bootstrapServers, kc)
if err != nil {
log.Printf("[ERROR] [%d/%d] Error connecting to kafka %s", attempt+1, maxRetry, err)
if attempt >= maxRetry-1 {
return nil, err
}

time.Sleep(5 * time.Second)
}
}

client := &Client{
Expand Down
4 changes: 3 additions & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig.Admin.Timeout = time.Duration(c.Timeout) * time.Second
kafkaConfig.Metadata.Full = true // the default, but just being clear
kafkaConfig.Metadata.AllowAutoTopicCreation = false
kafkaConfig.Metadata.Retry.Max = 10
kafkaConfig.Metadata.Retry.Backoff = 250 * time.Millisecond

kafkaConfig.Net.Proxy.Enable = true
kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment()
Expand Down Expand Up @@ -93,7 +95,7 @@ func parsePemOrLoadFromFile(input string) (*pem.Block, []byte, error) {

if inputBlock == nil {
//attempt to load from file
log.Printf("[INFO] Attempting to load from file '%s'", input)
log.Printf("[TRACE] Attempting to load from file '%s'", input)
var err error
inputBytes, err = os.ReadFile(input)
if err != nil {
Expand Down
30 changes: 24 additions & 6 deletions kafka/lazy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (c *LazyClient) init() error {
log.Printf("[TRACE] lazy client init %s", c.initErr)
}
if c.initErr == sarama.ErrBrokerNotAvailable || c.initErr == sarama.ErrOutOfBrokers {
log.Printf("[ERROR] Cannot connect to Kafka broker(s) %v", *(c.Config.BootstrapServers))
log.Printf("[ERROR] Check if Kafka broker(s) are up and running")
if c.Config.TLSEnabled {
log.Printf("[ERROR] Check if Kafka broker(s) are reachable from this machine using TLS")
tlsError := c.checkTLSConfig()
if tlsError != nil {
return fmt.Errorf("%w\n%s", tlsError, c.initErr)
Expand All @@ -48,14 +51,29 @@ func (c *LazyClient) checkTLSConfig() error {
}

brokers := *(c.Config.BootstrapServers)
broker := brokers[0]
tlsConf := kafkaConfig.Net.TLS.Config
conn, err := tls.Dial("tcp", broker, tlsConf)
if err != nil {
return err
errs := make([]error, 0, len(brokers))

for i := 0; i < len(brokers); i++ {
broker := brokers[i]
tlsConf := kafkaConfig.Net.TLS.Config
conn, err := tls.Dial("tcp", broker, tlsConf)
if err != nil {
errs = append(errs, err)
continue
}

err = conn.Handshake()
if err != nil {
errs = append(errs, err)
continue
}
}

if len(errs) > 0 {
return fmt.Errorf("TLS handshake failed for all brokers: %v", errs)
}

return conn.Handshake()
return nil
}

func (c *LazyClient) CreateTopic(t Topic) error {
Expand Down
Loading