Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).
Documentation and example are available via godoc at http://godoc.org/github.com/mmchen/sarama-cluster
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/mmchen/sarama-cluster"
)
func main() {
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
select {
case msg, more := <-consumer.Messages():
if more {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-consumer.Errors():
if more {
log.Printf("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Printf("Rebalanced: %+v\n", ntf)
}
case <-signals:
return
}
}
}
You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.
To run tests, call:
$ make test
By default, sarama's Config.Consumer.Offsets.Initial
is set to sarama.OffsetNewest
. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial
to sarama.OffsetOldest
.