-
I have the following piece of code that I am using currently to find the total number of messages in topics topics, topicerr := client.Topics()
for _, topic := range topics {
if strings.Contains(topic, "__consumer_offsets") {
continue
}
partitions, partitionerr := client.Partitions(topic)
count := 0
for _, partition := range partitions {
latestoffset, _ := client.GetOffset(topic, partition, sarama.OffsetNewest)
startoffset, _ := client.GetOffset(topic, partition, sarama.OffsetOldest)
count += int(latestoffset - startoffset)
}
} This works but it is very slow. Any ideas how to find the count faster? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
What I meant by slowness is that I have 65 topics with a total of 761 partitions. Going over all of them takes time. |
Beta Was this translation helpful? Give feedback.
-
@eapache Any suggestions for me? |
Beta Was this translation helpful? Give feedback.
-
Not sure if this will help someone, but I ended up using Here is the code that I used. The time difference is massive in my experience. package main
import (
...
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
...
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
m, e := c.GetMetadata(nil, true, 5000)
if e != nil {
fmt.Fprintf(os.Stderr, "Failed to get metadata: %s\n", e)
os.Exit(1)
}
var wg sync.WaitGroup
for _, topic := range m.Topics {
if topic.Topic[0] == '_' {
continue
}
wg.Add(1)
go func(topic kafka.TopicMetadata) {
defer wg.Done()
total := 0
for _, partition := range topic.Partitions {
high, low, _ := c.QueryWatermarkOffsets(topic.Topic, partition.ID, 5000)
total += int(low - high)
}
fmt.Printf("Topic %s partitions: %d total: %d\n", topic.Topic, len(topic.Partitions), total)
}(topic)
}
wg.Wait()
fmt.Printf("Closing consumer\n")
c.Close()
} |
Beta Was this translation helpful? Give feedback.
Not sure if this will help someone, but I ended up using
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
to solve this.Here is the code that I used. The time difference is massive in my experience.