From 00498a4a2e23ab2432bea910cd52263427abf46e Mon Sep 17 00:00:00 2001 From: Sergio Troiano Date: Sat, 16 Mar 2024 08:19:10 +0100 Subject: [PATCH] offsetFetchResponse missing topic partitions --- kafka_exporter.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/kafka_exporter.go b/kafka_exporter.go index 36021149..0ce9324a 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -283,17 +283,17 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF }, nil } -//func (e *Exporter) fetchOffsetVersion() int16 { -// version := e.client.Config().Version -// if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) { -// return 4 -// } else if version.IsAtLeast(sarama.V0_10_2_0) { -// return 2 -// } else if version.IsAtLeast(sarama.V0_8_2_2) { -// return 1 -// } -// return 0 -//} +func (e *Exporter) fetchOffsetVersion() int16 { + version := e.client.Config().Version + if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) { + return 4 + } else if version.IsAtLeast(sarama.V0_10_2_0) { + return 2 + } else if version.IsAtLeast(sarama.V0_8_2_2) { + return 1 + } + return 0 +} // Describe describes all the metrics ever exported by the Kafka exporter. It // implements prometheus.Collector. @@ -569,7 +569,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { return } for _, group := range describeGroups.Groups { - offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1} + offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: e.fetchOffsetVersion()} if e.offsetShowAll { for topic, partitions := range offset { for partition := range partitions {