Skip to content

Commit

Permalink
Add ClientId and ClientHost to consumer group partition metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
theY4Kman committed Jun 16, 2020
1 parent 61a00f5 commit 23bfa09
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,24 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
return
}
for _, group := range describeGroups.Groups {
topicPartitionAssignments := make(map[string]map[int32]*sarama.GroupMemberDescription)
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
for topic, partitions := range offset {
topicPartitionAssignments[topic] = make(map[int32]*sarama.GroupMemberDescription)
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err == nil {
for topic, partitions := range assignment.Topics {
for _, partition := range partitions {
topicPartitionAssignments[topic][partition] = member
}
}
}
}
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)
Expand All @@ -430,10 +442,16 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
plog.Errorf("Error for partition %d :%v", partition, err.Error())
continue
}
var clientId = ""
var clientHost = ""
if assignedMember, hasMember := topicPartitionAssignments[topic][partition]; hasMember {
clientId = assignedMember.ClientId
clientHost = assignedMember.ClientHost
}
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), clientId, clientHost,
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
Expand All @@ -447,7 +465,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
lagSum += lag
}
ch <- prometheus.MustNewConstMetric(
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), clientId, clientHost,
)
} else {
plog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition)
Expand Down Expand Up @@ -583,7 +601,7 @@ func main() {
consumergroupCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
[]string{"consumergroup", "topic", "partition", "client_id", "client_host"}, labels,
)

consumergroupCurrentOffsetSum = prometheus.NewDesc(
Expand All @@ -595,7 +613,7 @@ func main() {
consumergroupLag = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
[]string{"consumergroup", "topic", "partition", "client_id", "client_host"}, labels,
)

consumergroupLagZookeeper = prometheus.NewDesc(
Expand Down

0 comments on commit 23bfa09

Please sign in to comment.