Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency.
For details check our blog post
If you need a whole consumer lifecycle with exception management, check Kafka Konsumer
- Iteration-based back-off strategies are applicable
- Messages could be processed in an eventually consistent state
- Max retry exceeded messages could be ignored and send to dead letter topic
- To increase consumer resiliency
- To increase consumer performance with concurrency
- Messages should be processed in order
- Messages should be certainly processed (we discard messages if max retry is exceeded)
- Messages should be committed (we use auto-commit interval for increasing performance)
- Messages with TTL (Time to Live)
go get github.com/Trendyol/kafka-cronsumer@latest
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Don't forget its cron based :)
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return errors.New("error occurred")
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
func main() {
// ...
var firstConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
return nil
}
first := cronsumer.New(firstCfg, firstConsumerFn)
first.Start()
var secondConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
return nil
}
second := cronsumer.New(secondCfg, secondConsumerFn)
second.Start()
// ...
}
func main() {
// ...
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
return errors.New("err occurred")
}
c := cronsumer.New(config, consumeFn)
StartAPI(*config, c.GetMetricCollectors()...)
c.Start()
// ...
}
func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
// ...
f := fiber.New(
fiber.Config{},
)
metricMiddleware, err := NewMetricMiddleware(cfg, f, metricCollectors...)
f.Use(metricMiddleware)
// ...
}
config | description | default | example |
---|---|---|---|
logLevel |
Describes log level, valid options are debug , info , warn , and error |
info | |
metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer . Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current . So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current |
kafka_cronsumer | |
consumer.clientId |
see doc | ||
consumer.cron |
Cron expression when exception consumer starts to work at | */1 * * * * | |
consumer.backOffStrategy |
Define consumer backoff strategy for retry topics | fixed | exponential, linear |
consumer.duration |
Work duration exception consumer actively consuming messages | NonStopWork (zero duration) | 20s, 15m, 1h, NonStopWork (zero duration) |
consumer.brokers |
broker address | ||
consumer.topic |
Exception topic names | exception-topic | |
consumer.groupId |
Exception consumer group id | exception-consumer-group | |
consumer.maxRetry |
Maximum retry value for attempting to retry a message | 3 | |
consumer.concurrency |
Number of goroutines used at listeners | 1 | |
consumer.verifyTopicOnStartup |
it checks existence of the given retry topic on the kafka cluster. | false | |
consumer.minBytes |
see doc | 1 | |
consumer.maxBytes |
see doc | 1 MB | |
consumer.maxWait |
see doc | 10s | |
consumer.commitInterval |
see doc | 1s | |
consumer.heartbeatInterval |
see doc | 3s | |
consumer.sessionTimeout |
see doc | 30s | |
consumer.rebalanceTimeout |
see doc | 30s | |
consumer.startOffset |
see doc | earliest | |
consumer.retentionTime |
see doc | 24h | |
consumer.queueCapacity |
see doc | 100 | |
consumer.skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message | nil | |
producer.clientId |
see doc | ||
producer.brokers |
Broker address if it is not given, uses consumer.Brokers addr | consumer.Brokers addr | |
producer.batchSize |
see doc | 100 | |
producer.batchTimeout |
see doc | 1s | |
producer.balancer |
see doc | leastBytes | |
sasl.enabled |
It enables sasl authentication mechanism | false | |
sasl.authType |
Currently we only support SCRAM |
"" | |
sasl.username |
SCRAM username | "" | |
sasl.password |
SCRAM password | "" | |
sasl.rootCAPath |
see doc | "" | |
sasl.intermediateCAPath |
"" | ||
sasl.rack |
see doc | "" |
Metric Name | Description | Value Type |
---|---|---|
kafka_cronsumer_retried_messages_total | Total number of retried messages. | Counter |
kafka_cronsumer_discarded_messages_total | Total number of discarded messages. | Counter |
Use issues for everything
- For a small change, just send a PR.
- For bigger changes open an issue for discussion before sending a PR.
- PR should have:
- Test case
- Documentation
- Example (If it makes sense)
- You can also contribute by:
- Reporting issues
- Suggesting new features or enhancements
- Improve/fix documentation
Please adhere to this project's code of conduct
.
Contributor Code of Conduct. By participating in this project you agree to abide by its terms.