Skip to content

Commit

Permalink
feat: add more kafka config
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Jun 5, 2024
1 parent 508d8af commit b1beec1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
10 changes: 7 additions & 3 deletions batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ type KafkaBatchReader struct {

func NewKafkaBatchReader(cfg *config.Config) *KafkaBatchReader {
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: parseKafkaServers(cfg.KafkaBootstrapServers),
GroupID: cfg.KafkaConsumerGroup,
Topic: cfg.KafkaTopic,
Brokers: parseKafkaServers(cfg.KafkaBootstrapServers),
GroupID: cfg.KafkaConsumerGroup,
Topic: cfg.KafkaTopic,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
ReadBatchTimeout: 2 * time.Duration(cfg.BatchMaxInterval) * time.Second,
MaxWait: time.Duration(cfg.MaxWait) * time.Second,
})
return &KafkaBatchReader{
batchSize: cfg.BatchSize,
Expand Down
5 changes: 4 additions & 1 deletion config/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@
"workers": 1,
"copyPurge": false,
"copyForce": false,
"disableVariantCheck": true
"disableVariantCheck": true,
"minBytes": 1024,
"maxBytes": 1048576,
"maxWait": 10
}
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ type Config struct {
CopyPurge bool `json:"copyPurge" default:"false"`
CopyForce bool `json:"copyForce" default:"false"`
DisableVariantCheck bool `json:"disableVariantCheck" default:"false"`
// MinBytes indicates to the broker the minimum batch size that the consumer
// will accept. Setting a high minimum when consuming from a low-volume topic
// may result in delayed delivery when the broker does not have enough data to
// satisfy the defined minimum.
//
// Default: 1KB
MinBytes int `json:"minBytes" default:"1024"`
// MaxBytes indicates to the broker the maximum batch size that the consumer
// will accept. The broker will truncate a message to satisfy this maximum, so
// choose a value that is high enough for your largest message size.
//
// Default: 20MB
MaxBytes int `json:"maxBytes" default:"20 * 1024 * 1024"`
// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
//
// Default: 10s
MaxWait int `json:"maxWait" default:"10"`
}

func LoadConfig() (*Config, error) {
Expand Down

0 comments on commit b1beec1

Please sign in to comment.