From 9ae311864941acffc650b91b0a68476735f8e5ae Mon Sep 17 00:00:00 2001 From: Andrey Polyakov Date: Thu, 26 Sep 2024 17:34:30 -0700 Subject: [PATCH] Report current_offset metric for incomplete partitions For consumers of low volume topic-partitions, it can take a very long time for enough offset commits to happen before a partition is marked as "complete". For the `burrow_kafka_consumer_current_offset` metric to be reported, only a single committed offset is required. --- core/internal/evaluator/caching.go | 2 +- core/internal/httpserver/prometheus.go | 4 +++- core/internal/httpserver/prometheus_test.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/internal/evaluator/caching.go b/core/internal/evaluator/caching.go index 0784f26c..dfa8cf42 100644 --- a/core/internal/evaluator/caching.go +++ b/core/internal/evaluator/caching.go @@ -281,7 +281,7 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition, minimumCompl } // Slice the offsets to remove all nil entries (they'll be at the start) - firstOffset := len(partition.Offsets) - 1 + firstOffset := len(partition.Offsets) // defaults to the length, so if all offsets are nil we make an empty slice for i, offset := range partition.Offsets { if offset != nil { firstOffset = i diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index 1ba1059e..c1dbbdbf 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -137,8 +137,10 @@ func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc { consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag)) - if partition.Complete == 1.0 { + if partition.Complete > 0.0 { consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset)) + } + if partition.Complete == 1.0 { partitionStatusGauge.With(labels).Set(float64(partition.Status)) } } diff --git a/core/internal/httpserver/prometheus_test.go b/core/internal/httpserver/prometheus_test.go index 0b751ec7..633ea67d 100644 --- a/core/internal/httpserver/prometheus_test.go +++ b/core/internal/httpserver/prometheus_test.go @@ -158,7 +158,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`) assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`) assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`) - assert.NotContains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 5335`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 5335`) assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="incomplete"} 99888`) assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`)