Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: heapWatch metrics imported into Prometheus/Grafana #6147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions network/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"})

var networkP2PPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_p2p_broadcast_dropped_total", Description: "number of broadcast messages not sent to some p2p peer"})

var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"})
var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"})
var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"})
Expand All @@ -109,6 +111,8 @@
var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"})
var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"})

// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"})

var _ = pubsub.RawTracer(pubsubMetricsTracer{})

// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.
Expand All @@ -134,14 +138,6 @@

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) {
if msg != nil && msg.Topic != nil {
switch *msg.Topic {
case p2p.TXTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
}
}
}

// DeliverMessage is invoked when a message is delivered
Expand Down Expand Up @@ -178,6 +174,17 @@

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {
for i := range rpc.GetPublish() {
if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil {
switch *rpc.Publish[i].Topic {
case p2p.TXTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
}
}
}
// service gossipsub traffic = networkP2PGossipSubReceivedBytesTotal - networkP2PReceivedBytesByTag_TX
networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil)
}

Expand All @@ -197,7 +204,9 @@
}

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}
func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
networkP2PPeerBroadcastDropped.Inc(nil)

Check warning on line 208 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L207-L208

Added lines #L207 - L208 were not covered by tests
}

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
Expand Down
2 changes: 1 addition & 1 deletion network/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestMetrics_PubsubTracer_TagList(t *testing.T) {
return true
})
}
if stmt.Name.Name == "ValidateMessage" {
if stmt.Name.Name == "RecvRPC" {
ast.Inspect(stmt.Body, func(n ast.Node) bool {
if switchStmt, ok := n.(*ast.SwitchStmt); ok {
for _, stmt := range switchStmt.Body.List {
Expand Down
2 changes: 1 addition & 1 deletion test/heapwatch/metrics_aggs.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def main():
mmax = max(rw)
mmin = min(rw)
print(f'{nick}: {metric_name}: count {len(rw)}, max {mmax}, min {mmin}, min-max {mmax - mmin}')
metric = Metric(metric_name, 0, MetricType.COUNTER)
metric = Metric(metric_name, 0, '', MetricType.COUNTER)
if metric.short_name() not in metric_names_nick_max_avg:
metric_names_nick_max_avg[metric.short_name()] = []
if args.avg_max_min:
Expand Down
93 changes: 93 additions & 0 deletions test/heapwatch/metrics_gra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# Copyright (C) 2019-2024 Algorand, Inc.
# This file is part of go-algorand
#
# go-algorand is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# go-algorand is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
#
###
#
# Convert metrics collected by heapWatch.py from prometheus format to prometheus + timestamp format.
# See https://prometheus.io/docs/prometheus/latest/storage/#backfilling-from-openmetrics-format
#
# Usage:
# python3 /data/go-algorand/test/heapwatch/metrics_gra.py -d metrics/500x15/ -o prom-metrics.txt
#
# Local Grafana setup:
# 1. Download standalone and unpack from https://grafana.com/grafana/download
# 2. Run ./grafana-v11.2.2/bin/grafana server -config ./grafana-v11.2.2/conf/defaults.ini -homepath ./grafana-v11.2.2
# 3. Open http://localhost:3000/ in web browser
#
# Prometheus setup:
# 1. Download and unpack from https://prometheus.io/download/
#
# Apply prom-metrics.txt to prometheus:
# (cd ./prometheus-2.54.1.linux-amd64 && ./promtool tsdb create-blocks-from openmetrics prom-metrics.txt)
# Start Prometheus
# ./prometheus-2.54.1.linux-amd64/prometheus --config.file=./prometheus-2.54.1.linux-amd64/prometheus.yml --storage.tsdb.path=./prometheus-2.54.1.linux-amd64/data --storage.tsdb.retention.time=60d --storage.tsdb.retention.size=500MB
# This should import the data into ./prometheus-2.54.1.linux-amd64/data and have them available for plotting. Use https://127.0.0.1:9090/ as Prometheus data source location in Grafana.
# Then create new or import dashboards from internal Grafana.
###

import argparse
import glob
import logging
import os
import sys

from metrics_lib import gather_metrics_files_by_nick, parse_metrics

logger = logging.getLogger(__name__)

def main():
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--dir', type=str, default=None, help='dir path to find /*.metrics in')
ap.add_argument('--nick-re', action='append', default=[], help='regexp to filter node names, may be repeated')
ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated')
ap.add_argument('-o', '--output', type=str, default=None, help='output file to write to')
ap.add_argument('--verbose', default=False, action='store_true')
args = ap.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

if not args.dir:
logging.error('need at least one dir set with -d/--dir')
return 1

metrics_files = sorted(glob.glob(os.path.join(args.dir, '*.metrics')))
metrics_files.extend(glob.glob(os.path.join(args.dir, 'terraform-inventory.host')))
filesByNick = gather_metrics_files_by_nick(metrics_files, args.nick_re, args.nick_lre)

outf = sys.stdout
if args.output:
outf = open(args.output, 'wt')

for nick, files_by_ts in filesByNick.items():
for ts, metrics_file in files_by_ts.items():
with open(metrics_file, 'rt') as fin:
metrics = parse_metrics(fin, nick)
for metric_seq in metrics.values():
for metric in metric_seq:
print('# TYPE', metric.short_name(), metric.type, file=outf)
print('# HELP', metric.short_name(), metric.desc, file=outf)
print(metric.string(with_role=True, quote=True), metric.value, int(ts.timestamp()), file=outf)

print('# EOF', file=outf)

return 0


if __name__ == '__main__':
sys.exit(main())
31 changes: 24 additions & 7 deletions test/heapwatch/metrics_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,17 @@ class MetricType(Enum):
GAUGE = 0
COUNTER = 1

def __str__(self):
return self.name.lower()

class Metric:
"""Metric with tags"""
def __init__(self, metric_name: str, type: MetricType, value: Union[int, float]):
def __init__(self, metric_name: str, type: MetricType, desc: str, value: Union[int, float]):
full_name = metric_name.strip()
self.name = full_name
self.value = value
self.type = type
self.desc = desc
self.tags: Dict[str, str] = {}
self.tag_keys: set = set()

Expand All @@ -187,6 +191,8 @@ def __init__(self, metric_name: str, type: MetricType, value: Union[int, float])
tags = raw_tags.split(',')
for tag in tags:
key, value = tag.split('=')
if not value:
continue
if value[0] == '"' and value[-1] == '"':
value = value[1:-1]
self.tags[key] = value
Expand All @@ -198,12 +204,20 @@ def short_name(self):
def __str__(self):
return self.string()

def string(self, tags: Optional[set[str]]=None):
def string(self, tags: Optional[set[str]]=None, with_role=False, quote=False) -> str:
result = self.name
if self.tags:

if with_role:
node = self.tags.get('n')
if node:
role = 'relay' if node.startswith('r') else 'npn' if node.startswith('npn') else 'node'
self.add_tag('role', role)

if self.tags or tags:
if not tags:
tags = self.tags
result += '{' + ','.join([f'{k}={v}' for k, v in sorted(self.tags.items()) if k in tags]) + '}'
esc = '"' if quote else ''
result += '{' + ','.join([f'{k}={esc}{v}{esc}' for k, v in sorted(self.tags.items()) if k in tags]) + '}'
return result

def add_tag(self, key: str, value: str):
Expand Down Expand Up @@ -231,6 +245,7 @@ def parse_metrics(
out = {}
try:
last_type = None
last_desc = None
for line in fin:
if not line:
continue
Expand All @@ -244,6 +259,8 @@ def parse_metrics(
last_type = MetricType.GAUGE
elif tpe == 'counter':
last_type = MetricType.COUNTER
elif line.startswith('# HELP'):
last_desc = line.split(None, 3)[-1] # skip first 3 words (#, HELP, metric name)
continue
m = metric_line_re.match(line)
if m:
Expand All @@ -254,7 +271,7 @@ def parse_metrics(
name = ab[0]
value = num(ab[1])

metric = Metric(name, last_type, value)
metric = Metric(name, last_type, last_desc, value)
metric.add_tag('n', nick)
if not metrics_names or metric.name in metrics_names:
if metric.name not in out:
Expand All @@ -267,8 +284,8 @@ def parse_metrics(
if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2:
m = list(out.keys())
name = f'{m[0]}_-_{m[1]}'
metric = Metric(name, MetricType.GAUGE, out[m[0]].value - out[m[1]].value)
out = [{name: metric}]
metric = Metric(name, MetricType.GAUGE, f'Diff of {m[0]} and {m[1]}', out[m[0]][0].value - out[m[1]][0].value)
out = {name: [metric]}

return out

Expand Down
Loading