diff --git a/batch_reader.go b/batch_reader.go index 71d1ff4..d5cc708 100644 --- a/batch_reader.go +++ b/batch_reader.go @@ -3,6 +3,8 @@ package main import ( "context" "encoding/json" + "fmt" + "runtime/debug" "strings" "time" @@ -68,9 +70,13 @@ type KafkaBatchReader struct { func NewKafkaBatchReader(cfg *config.Config) *KafkaBatchReader { kafkaReader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: parseKafkaServers(cfg.KafkaBootstrapServers), - GroupID: cfg.KafkaConsumerGroup, - Topic: cfg.KafkaTopic, + Brokers: parseKafkaServers(cfg.KafkaBootstrapServers), + GroupID: cfg.KafkaConsumerGroup, + Topic: cfg.KafkaTopic, + MinBytes: cfg.MinBytes, + MaxBytes: cfg.MaxBytes, + ReadBatchTimeout: 2 * time.Duration(cfg.BatchMaxInterval) * time.Second, + MaxWait: time.Duration(cfg.MaxWait) * time.Second, }) return &KafkaBatchReader{ batchSize: cfg.BatchSize, @@ -95,6 +101,7 @@ func (br *KafkaBatchReader) fetchMessageWithTimeout(ctx context.Context, timeout if ctx.Err() == context.Canceled { logrus.Errorf("Failed to fetch message, attempt %d: %v", i+1, err) time.Sleep(1 * time.Second) + fmt.Printf("Stack trace: %s\n", debug.Stack()) continue } return nil, err diff --git a/config/conf.json b/config/conf.json index d01ef84..efea084 100644 --- a/config/conf.json +++ b/config/conf.json @@ -12,5 +12,8 @@ "workers": 1, "copyPurge": false, "copyForce": false, - "disableVariantCheck": true + "disableVariantCheck": true, + "minBytes": 1024, + "maxBytes": 1048576, + "maxWait": 10 } \ No newline at end of file diff --git a/config/config.go b/config/config.go index 3b85178..306dd94 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,24 @@ type Config struct { CopyPurge bool `json:"copyPurge" default:"false"` CopyForce bool `json:"copyForce" default:"false"` DisableVariantCheck bool `json:"disableVariantCheck" default:"false"` + // MinBytes indicates to the broker the minimum batch size that the consumer + // will accept. Setting a high minimum when consuming from a low-volume topic + // may result in delayed delivery when the broker does not have enough data to + // satisfy the defined minimum. + // + // Default: 1KB + MinBytes int `json:"minBytes" default:"1024"` + // MaxBytes indicates to the broker the maximum batch size that the consumer + // will accept. The broker will truncate a message to satisfy this maximum, so + // choose a value that is high enough for your largest message size. + // + // Default: 20MB + MaxBytes int `json:"maxBytes" default:"20 * 1024 * 1024"` + // Maximum amount of time to wait for new data to come when fetching batches + // of messages from kafka. + // + // Default: 10s + MaxWait int `json:"maxWait" default:"10"` } func LoadConfig() (*Config, error) { diff --git a/consume_worker.go b/consume_worker.go index bc0a650..e9777f0 100644 --- a/consume_worker.go +++ b/consume_worker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "runtime/debug" "time" "github.com/sirupsen/logrus" @@ -54,13 +55,12 @@ func (c *ConsumeWorker) stepBatch(ctx context.Context) error { logrus.Debug("DEBUG: commit") maxRetries := 5 for i := 0; i < maxRetries; i++ { - ctx, cancel := context.WithTimeout(ctx, 2*time.Duration(c.cfg.BatchMaxInterval)*time.Second) err = batch.CommitFunc(ctx) - cancel() if err != nil { - if err == context.Canceled || err == context.DeadlineExceeded { + if err == context.Canceled { logrus.Errorf("Failed to commit messages at %d, attempt %d: %v", batch.LastMessageOffset, i+1, err) time.Sleep(1 * time.Second) + fmt.Printf("Stack trace: %s\n", debug.Stack()) continue } fmt.Fprintf(os.Stderr, "Failed to commit messages at %d: %v\n", batch.LastMessageOffset, err)