Skip to content

Commit

Permalink
Merge pull request #432 from sciclon2/OffsetFetchRequest-version-issue
Browse files Browse the repository at this point in the history
fix: offsetFetchResponse missing topic partitions
  • Loading branch information
danielqsj authored Aug 20, 2024
2 parents 0e4b91c + 43bdad6 commit 8eee667
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,17 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF
}, nil
}

//func (e *Exporter) fetchOffsetVersion() int16 {
// version := e.client.Config().Version
// if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) {
// return 4
// } else if version.IsAtLeast(sarama.V0_10_2_0) {
// return 2
// } else if version.IsAtLeast(sarama.V0_8_2_2) {
// return 1
// }
// return 0
//}
func (e *Exporter) fetchOffsetVersion() int16 {
version := e.client.Config().Version
if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) {
return 4
} else if version.IsAtLeast(sarama.V0_10_2_0) {
return 2
} else if version.IsAtLeast(sarama.V0_8_2_2) {
return 1
}
return 0
}

// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
Expand Down Expand Up @@ -574,7 +574,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
klog.Errorf("Cannot describe for the group %s with error code %d", group.GroupId, group.Err)
continue
}
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: e.fetchOffsetVersion()}
if e.offsetShowAll {
for topic, partitions := range offset {
for partition := range partitions {
Expand Down

0 comments on commit 8eee667

Please sign in to comment.