diff --git a/surveyor/jetstream_configs.go b/surveyor/jetstream_configs.go index 694b107..431df4e 100644 --- a/surveyor/jetstream_configs.go +++ b/surveyor/jetstream_configs.go @@ -28,27 +28,30 @@ import ( var ( //JSStreamList = `$JS.API.STREAM.LIST` - streamConfigLabels = []string{"discard_policy", "storage_type", "replica_number", "stream_name"} - consumerConfigLabels = []string{"stream_name", "max_pending_ack", "ack_policy", "is_pull", "consumer_name"} - streamRaftInfoLabels = []string{"stream_name", "leader", "replica_count"} - streamRaftPeerInfoLabels = []string{"stream_name", "peer_name", "offline", "current", "leader", "lag"} - consumerRaftInfoLabels = []string{"consumer_name", "leader", "replica_count", "stream_name"} - consumerStateLabels = []string{"consumer_name", "stream_name", "last_delivered_message_consumer", "last_delivered_message_stream", "ack_floor_consumer", "ack_floor_stream"} - consumerRaftPeerInfoLabels = []string{"stream_name", "consumer_name", "peer_name", "offline", "current", "leader", "lag"} - - DefaultScrapeInterval = 10 * time.Second + streamConfigLabels = []string{"discard_policy", "storage_type", "replica_number", "stream_name"} + consumerConfigLabels = []string{"stream_name", "max_pending_ack", "ack_policy", "is_pull", "consumer_name"} + streamRaftInfoLabels = []string{"stream_name", "leader", "replica_count"} + streamRaftPeerInfoLabels = []string{"stream_name", "peer_name", "offline", "current", "leader", "lag"} + consumerRaftInfoLabels = []string{"consumer_name", "leader", "replica_count", "stream_name"} + consumerStateLabels = []string{"consumer_name", "stream_name", "last_delivered_message_consumer", "last_delivered_message_stream", "ack_floor_consumer", "ack_floor_stream"} + consumerRaftPeerInfoLabels = []string{"stream_name", "consumer_name", "peer_name", "offline", "current", "leader", "lag"} + streamReplicationLagLabels = []string{"stream_name", "peer_name"} + consumerReplicationLagLabels = []string{"stream_name", "consumer_name", "peer_name"} + DefaultScrapeInterval = 10 * time.Second //DefaultListenerID = "default_listener" ) type JSStreamConfigMetrics struct { - jsStreamConfig *prometheus.GaugeVec - jsStreamRaftInfo *prometheus.GaugeVec - jsStreamRaftPeerInfo *prometheus.GaugeVec + jsStreamConfig *prometheus.GaugeVec + jsStreamRaftInfo *prometheus.GaugeVec + jsStreamRaftPeerInfo *prometheus.GaugeVec + jsStreamReplicationLag *prometheus.GaugeVec - jsConsumerConfig *prometheus.GaugeVec - jsConsumerState *prometheus.GaugeVec - jsConsumerRaftInfo *prometheus.GaugeVec - jsConsumerRaftPeerInfo *prometheus.GaugeVec + jsConsumerConfig *prometheus.GaugeVec + jsConsumerState *prometheus.GaugeVec + jsConsumerRaftInfo *prometheus.GaugeVec + jsConsumerRaftPeerInfo *prometheus.GaugeVec + jsConsumerReplicationLag *prometheus.GaugeVec } func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels prometheus.Labels) *JSStreamConfigMetrics { @@ -89,6 +92,16 @@ func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels pr Help: "raft peer info for consumer", ConstLabels: constLabels, }, consumerRaftPeerInfoLabels), + jsStreamReplicationLag: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName("nats", "jetstream", "stream_replication_lag"), + Help: "replication lag of stream peers", + ConstLabels: constLabels, + }, streamReplicationLagLabels), + jsConsumerReplicationLag: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName("nats", "jetstream", "consumer_replication_lag"), + Help: "replication lag of consumer peers", + ConstLabels: constLabels, + }, consumerReplicationLagLabels), } registry.MustRegister(metrics.jsStreamConfig) @@ -98,6 +111,8 @@ func NewJetStreamConfigListMetrics(registry *prometheus.Registry, constLabels pr registry.MustRegister(metrics.jsStreamRaftPeerInfo) registry.MustRegister(metrics.jsConsumerRaftInfo) registry.MustRegister(metrics.jsConsumerRaftPeerInfo) + registry.MustRegister(metrics.jsStreamReplicationLag) + registry.MustRegister(metrics.jsConsumerReplicationLag) return metrics } @@ -209,6 +224,18 @@ func (o *jsConfigListListener) StreamHandler(streamInfo *nats.StreamInfo) { "lag": strconv.FormatUint(peer.Lag, 10), }, ).Set(1) + o.metrics.jsStreamReplicationLag.DeletePartialMatch( + prometheus.Labels{ + "stream_name": streamInfo.Config.Name, + "peer_name": peer.Name, + }, + ) + o.metrics.jsStreamReplicationLag.With( + prometheus.Labels{ + "stream_name": streamInfo.Config.Name, + "peer_name": peer.Name, + }, + ).Set(float64(peer.Lag)) } } func convertBoolToString(value bool) string { @@ -280,6 +307,20 @@ func (o *jsConfigListListener) ConsumerHandler(consumerInfo *nats.ConsumerInfo) "lag": strconv.FormatUint(peer.Lag, 10), }, ).Set(1) + o.metrics.jsConsumerReplicationLag.DeletePartialMatch( + prometheus.Labels{ + "stream_name": consumerInfo.Stream, + "consumer_name": consumerInfo.Name, + "peer_name": peer.Name, + }, + ) + o.metrics.jsConsumerReplicationLag.With( + prometheus.Labels{ + "stream_name": consumerInfo.Stream, + "consumer_name": consumerInfo.Name, + "peer_name": peer.Name, + }, + ).Set(float64(peer.Lag)) } } func IsPullBased(info *nats.ConsumerInfo) string {