Skip to content

Commit

Permalink
Merge pull request #19 from databendcloud/feat/more-kafka-config
Browse files Browse the repository at this point in the history
feat: add more kafka config
  • Loading branch information
hantmac authored Jun 5, 2024
2 parents 508d8af + c86f168 commit 6a05466
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
13 changes: 10 additions & 3 deletions batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"strings"
"time"

Expand Down Expand Up @@ -68,9 +70,13 @@ type KafkaBatchReader struct {

func NewKafkaBatchReader(cfg *config.Config) *KafkaBatchReader {
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: parseKafkaServers(cfg.KafkaBootstrapServers),
GroupID: cfg.KafkaConsumerGroup,
Topic: cfg.KafkaTopic,
Brokers: parseKafkaServers(cfg.KafkaBootstrapServers),
GroupID: cfg.KafkaConsumerGroup,
Topic: cfg.KafkaTopic,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
ReadBatchTimeout: 2 * time.Duration(cfg.BatchMaxInterval) * time.Second,
MaxWait: time.Duration(cfg.MaxWait) * time.Second,
})
return &KafkaBatchReader{
batchSize: cfg.BatchSize,
Expand All @@ -95,6 +101,7 @@ func (br *KafkaBatchReader) fetchMessageWithTimeout(ctx context.Context, timeout
if ctx.Err() == context.Canceled {
logrus.Errorf("Failed to fetch message, attempt %d: %v", i+1, err)
time.Sleep(1 * time.Second)
fmt.Printf("Stack trace: %s\n", debug.Stack())
continue
}
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion config/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@
"workers": 1,
"copyPurge": false,
"copyForce": false,
"disableVariantCheck": true
"disableVariantCheck": true,
"minBytes": 1024,
"maxBytes": 1048576,
"maxWait": 10
}
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ type Config struct {
CopyPurge bool `json:"copyPurge" default:"false"`
CopyForce bool `json:"copyForce" default:"false"`
DisableVariantCheck bool `json:"disableVariantCheck" default:"false"`
// MinBytes indicates to the broker the minimum batch size that the consumer
// will accept. Setting a high minimum when consuming from a low-volume topic
// may result in delayed delivery when the broker does not have enough data to
// satisfy the defined minimum.
//
// Default: 1KB
MinBytes int `json:"minBytes" default:"1024"`
// MaxBytes indicates to the broker the maximum batch size that the consumer
// will accept. The broker will truncate a message to satisfy this maximum, so
// choose a value that is high enough for your largest message size.
//
// Default: 20MB
MaxBytes int `json:"maxBytes" default:"20 * 1024 * 1024"`
// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
//
// Default: 10s
MaxWait int `json:"maxWait" default:"10"`
}

func LoadConfig() (*Config, error) {
Expand Down
6 changes: 3 additions & 3 deletions consume_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"runtime/debug"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -54,13 +55,12 @@ func (c *ConsumeWorker) stepBatch(ctx context.Context) error {
logrus.Debug("DEBUG: commit")
maxRetries := 5
for i := 0; i < maxRetries; i++ {
ctx, cancel := context.WithTimeout(ctx, 2*time.Duration(c.cfg.BatchMaxInterval)*time.Second)
err = batch.CommitFunc(ctx)
cancel()
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
if err == context.Canceled {
logrus.Errorf("Failed to commit messages at %d, attempt %d: %v", batch.LastMessageOffset, i+1, err)
time.Sleep(1 * time.Second)
fmt.Printf("Stack trace: %s\n", debug.Stack())
continue
}
fmt.Fprintf(os.Stderr, "Failed to commit messages at %d: %v\n", batch.LastMessageOffset, err)
Expand Down

0 comments on commit 6a05466

Please sign in to comment.