From edbd1f9cf457bbec80bc82f950b2dd595b37a6c3 Mon Sep 17 00:00:00 2001 From: Parham Alvani Date: Wed, 2 Oct 2024 14:01:29 +0000 Subject: [PATCH] fix: correct issue with metrics --- internal/client/jetstream.go | 10 ++++++++++ internal/client/metric.go | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/client/jetstream.go b/internal/client/jetstream.go index 6de7458..4b40520 100644 --- a/internal/client/jetstream.go +++ b/internal/client/jetstream.go @@ -208,11 +208,13 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) { latency := time.Since(publishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ + "subject": msg.Subject, "stream": streamName, "cluster": clusterName, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": msg.Subject, "type": successfulSubscribe, "stream": streamName, "cluster": clusterName, @@ -246,10 +248,14 @@ func (client *Client) coreSubscribe(subject string) { latency := time.Since(publishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ + "subject": subject, + "stream": "-", "cluster": clusterName, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, + "stream": "-", "type": successfulSubscribe, "cluster": clusterName, }).Add(1) @@ -269,6 +275,7 @@ func (client *Client) corePublish(subject string) { if err := client.connection.Publish(subject, t); err != nil { client.metrics.SuccessCounter.With(prometheus.Labels{ + "stream": "-", "subject": subject, "type": failedPublish, "cluster": clusterName, @@ -284,6 +291,7 @@ func (client *Client) corePublish(subject string) { } } else { client.metrics.SuccessCounter.With(prometheus.Labels{ + "stream": "-", "type": successfulPublish, "cluster": clusterName, "subject": subject, @@ -305,6 +313,7 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre if ack, err := client.jetstream.Publish(ctx, subject, t); err != nil { client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, "type": failedPublish, "stream": streamName, "cluster": clusterName, @@ -320,6 +329,7 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre } } else { client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, "type": successfulPublish, "stream": streamName, "cluster": clusterName, diff --git a/internal/client/metric.go b/internal/client/metric.go index a2c5f33..f662738 100644 --- a/internal/client/metric.go +++ b/internal/client/metric.go @@ -115,7 +115,7 @@ func NewMetrics(conn string) Metrics { "conn": conn, }, Buckets: latencyBuckets, - }, []string{"stream", "cluster"}), + }, []string{"stream", "cluster", "subject"}), SuccessCounter: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Subsystem: Subsystem, @@ -124,6 +124,6 @@ func NewMetrics(conn string) Metrics { ConstLabels: prometheus.Labels{ "conn": conn, }, - }, []string{"type", "stream", "cluster"}), + }, []string{"type", "stream", "cluster", "subject"}), } }