From 4382aa3ec2561e7620ef12f99dcd5d9a44589ce4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 25 Sep 2024 16:15:25 -0400 Subject: [PATCH 1/5] network: add algod_peer_p2p_broadcast_dropped_total metric --- network/metrics.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/metrics.go b/network/metrics.go index a1e92b2424..eabc94f7df 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -90,6 +90,8 @@ var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "a var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) +var networkP2PPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_p2p_broadcast_dropped_total", Description: "number of broadcast messages not sent to some p2p peer"}) + var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) @@ -197,7 +199,9 @@ func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { } // DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. -func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} +func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + networkP2PPeerBroadcastDropped.Inc(nil) +} // UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and // the pressure release mechanism trigger, dropping messages. From 50f78ed5e8436245a5f775daa716c93b4647c61c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 25 Sep 2024 17:45:06 -0400 Subject: [PATCH 2/5] fix misplaced recvRPC metric --- network/metrics.go | 21 +++++++++++++-------- network/metrics_test.go | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index eabc94f7df..fe62434095 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -111,6 +111,8 @@ var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics. var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) +// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"}) + var _ = pubsub.RawTracer(pubsubMetricsTracer{}) // pubsubMetricsTracer is a tracer for pubsub events used to track metrics. @@ -136,14 +138,6 @@ func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {} // ValidateMessage is invoked when a message first enters the validation pipeline. func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) { - if msg != nil && msg.Topic != nil { - switch *msg.Topic { - case p2p.TXTopicName: - networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil) - networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data))) - networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) - } - } } // DeliverMessage is invoked when a message is delivered @@ -180,6 +174,17 @@ func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {} // RecvRPC is invoked when an incoming RPC is received. func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) { + for i := range rpc.GetPublish() { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case p2p.TXTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) + } + } + } + // service gossipsub traffic = networkP2PGossipSubReceivedBytesTotal - networkP2PReceivedBytesByTag_TX networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil) } diff --git a/network/metrics_test.go b/network/metrics_test.go index 857ab57051..ea0448ebc6 100644 --- a/network/metrics_test.go +++ b/network/metrics_test.go @@ -55,7 +55,7 @@ func TestMetrics_PubsubTracer_TagList(t *testing.T) { return true }) } - if stmt.Name.Name == "ValidateMessage" { + if stmt.Name.Name == "RecvRPC" { ast.Inspect(stmt.Body, func(n ast.Node) bool { if switchStmt, ok := n.(*ast.SwitchStmt); ok { for _, stmt := range switchStmt.Body.List { From 8fec6f8872bfd7c5ebad08e7a8af767bc02332c8 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 26 Sep 2024 18:36:52 -0400 Subject: [PATCH 3/5] fix metrics diff plotter --- test/heapwatch/metrics_lib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index 1ac0d85bd7..e6fafaeebb 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -267,8 +267,8 @@ def parse_metrics( if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2: m = list(out.keys()) name = f'{m[0]}_-_{m[1]}' - metric = Metric(name, MetricType.GAUGE, out[m[0]].value - out[m[1]].value) - out = [{name: metric}] + metric = Metric(name, MetricType.GAUGE, out[m[0]][0].value - out[m[1]][0].value) + out = {name: [metric]} return out From 11137ea6b40dfe47a672710614c832ca2c141386 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 30 Sep 2024 17:27:21 -0400 Subject: [PATCH 4/5] wip: graphite metrics --- test/heapwatch/metrics_lib.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index e6fafaeebb..647972f6f6 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -187,6 +187,8 @@ def __init__(self, metric_name: str, type: MetricType, value: Union[int, float]) tags = raw_tags.split(',') for tag in tags: key, value = tag.split('=') + if not value: + continue if value[0] == '"' and value[-1] == '"': value = value[1:-1] self.tags[key] = value @@ -206,6 +208,25 @@ def string(self, tags: Optional[set[str]]=None): result += '{' + ','.join([f'{k}={v}' for k, v in sorted(self.tags.items()) if k in tags]) + '}' return result + def graphite_string(self, with_role=False): + restricted_chars = ('"', '$', '(', ')', '*', '+', ',', '?', '[', ']', '\\', '^', '`', '{', '}', '|', ' ') + translate_table = str.maketrans({c: '_' for c in restricted_chars}) + result = self.name + if with_role: + node = self.tags.get('n') + if node: + role = 'relay' if node.startswith('r') else 'npn' if node.startswith('npn') else 'node' + self.add_tag('role', role) + + if self.tags: + tags = [] + for k, v in sorted(self.tags.items()): + v = v.translate(translate_table) + tags.append(f'{k}={v}') + result += ';' + ';'.join(tags) + # result += ';' + ';'.join([f'{k}={v}' for k, v in sorted(self.tags.items())]) + return result + def add_tag(self, key: str, value: str): self.tags[key] = value self.tag_keys.add(key) From 5f255bebb4b9e31ca24f49491f6a4a52e61db844 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 4 Oct 2024 11:49:05 -0400 Subject: [PATCH 5/5] prometheums metrics for promtool --- test/heapwatch/metrics_aggs.py | 2 +- test/heapwatch/metrics_gra.py | 93 ++++++++++++++++++++++++++++++++++ test/heapwatch/metrics_lib.py | 36 ++++++------- 3 files changed, 110 insertions(+), 21 deletions(-) create mode 100644 test/heapwatch/metrics_gra.py diff --git a/test/heapwatch/metrics_aggs.py b/test/heapwatch/metrics_aggs.py index 33379766a6..6bc7967867 100644 --- a/test/heapwatch/metrics_aggs.py +++ b/test/heapwatch/metrics_aggs.py @@ -164,7 +164,7 @@ def main(): mmax = max(rw) mmin = min(rw) print(f'{nick}: {metric_name}: count {len(rw)}, max {mmax}, min {mmin}, min-max {mmax - mmin}') - metric = Metric(metric_name, 0, MetricType.COUNTER) + metric = Metric(metric_name, 0, '', MetricType.COUNTER) if metric.short_name() not in metric_names_nick_max_avg: metric_names_nick_max_avg[metric.short_name()] = [] if args.avg_max_min: diff --git a/test/heapwatch/metrics_gra.py b/test/heapwatch/metrics_gra.py new file mode 100644 index 0000000000..8e6cf8ce95 --- /dev/null +++ b/test/heapwatch/metrics_gra.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# Copyright (C) 2019-2024 Algorand, Inc. +# This file is part of go-algorand +# +# go-algorand is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# go-algorand is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with go-algorand. If not, see . +# +### +# +# Convert metrics collected by heapWatch.py from prometheus format to prometheus + timestamp format. +# See https://prometheus.io/docs/prometheus/latest/storage/#backfilling-from-openmetrics-format +# +# Usage: +# python3 /data/go-algorand/test/heapwatch/metrics_gra.py -d metrics/500x15/ -o prom-metrics.txt +# +# Local Grafana setup: +# 1. Download standalone and unpack from https://grafana.com/grafana/download +# 2. Run ./grafana-v11.2.2/bin/grafana server -config ./grafana-v11.2.2/conf/defaults.ini -homepath ./grafana-v11.2.2 +# 3. Open http://localhost:3000/ in web browser +# +# Prometheus setup: +# 1. Download and unpack from https://prometheus.io/download/ +# +# Apply prom-metrics.txt to prometheus: +# (cd ./prometheus-2.54.1.linux-amd64 && ./promtool tsdb create-blocks-from openmetrics prom-metrics.txt) +# Start Prometheus +# ./prometheus-2.54.1.linux-amd64/prometheus --config.file=./prometheus-2.54.1.linux-amd64/prometheus.yml --storage.tsdb.path=./prometheus-2.54.1.linux-amd64/data --storage.tsdb.retention.time=60d --storage.tsdb.retention.size=500MB +# This should import the data into ./prometheus-2.54.1.linux-amd64/data and have them available for plotting. Use https://127.0.0.1:9090/ as Prometheus data source location in Grafana. +# Then create new or import dashboards from internal Grafana. +### + +import argparse +import glob +import logging +import os +import sys + +from metrics_lib import gather_metrics_files_by_nick, parse_metrics + +logger = logging.getLogger(__name__) + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('-d', '--dir', type=str, default=None, help='dir path to find /*.metrics in') + ap.add_argument('--nick-re', action='append', default=[], help='regexp to filter node names, may be repeated') + ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated') + ap.add_argument('-o', '--output', type=str, default=None, help='output file to write to') + ap.add_argument('--verbose', default=False, action='store_true') + args = ap.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + if not args.dir: + logging.error('need at least one dir set with -d/--dir') + return 1 + + metrics_files = sorted(glob.glob(os.path.join(args.dir, '*.metrics'))) + metrics_files.extend(glob.glob(os.path.join(args.dir, 'terraform-inventory.host'))) + filesByNick = gather_metrics_files_by_nick(metrics_files, args.nick_re, args.nick_lre) + + outf = sys.stdout + if args.output: + outf = open(args.output, 'wt') + + for nick, files_by_ts in filesByNick.items(): + for ts, metrics_file in files_by_ts.items(): + with open(metrics_file, 'rt') as fin: + metrics = parse_metrics(fin, nick) + for metric_seq in metrics.values(): + for metric in metric_seq: + print('# TYPE', metric.short_name(), metric.type, file=outf) + print('# HELP', metric.short_name(), metric.desc, file=outf) + print(metric.string(with_role=True, quote=True), metric.value, int(ts.timestamp()), file=outf) + + print('# EOF', file=outf) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index 647972f6f6..0f6f603cb6 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -166,13 +166,17 @@ class MetricType(Enum): GAUGE = 0 COUNTER = 1 + def __str__(self): + return self.name.lower() + class Metric: """Metric with tags""" - def __init__(self, metric_name: str, type: MetricType, value: Union[int, float]): + def __init__(self, metric_name: str, type: MetricType, desc: str, value: Union[int, float]): full_name = metric_name.strip() self.name = full_name self.value = value self.type = type + self.desc = desc self.tags: Dict[str, str] = {} self.tag_keys: set = set() @@ -200,31 +204,20 @@ def short_name(self): def __str__(self): return self.string() - def string(self, tags: Optional[set[str]]=None): + def string(self, tags: Optional[set[str]]=None, with_role=False, quote=False) -> str: result = self.name - if self.tags: - if not tags: - tags = self.tags - result += '{' + ','.join([f'{k}={v}' for k, v in sorted(self.tags.items()) if k in tags]) + '}' - return result - def graphite_string(self, with_role=False): - restricted_chars = ('"', '$', '(', ')', '*', '+', ',', '?', '[', ']', '\\', '^', '`', '{', '}', '|', ' ') - translate_table = str.maketrans({c: '_' for c in restricted_chars}) - result = self.name if with_role: node = self.tags.get('n') if node: role = 'relay' if node.startswith('r') else 'npn' if node.startswith('npn') else 'node' self.add_tag('role', role) - if self.tags: - tags = [] - for k, v in sorted(self.tags.items()): - v = v.translate(translate_table) - tags.append(f'{k}={v}') - result += ';' + ';'.join(tags) - # result += ';' + ';'.join([f'{k}={v}' for k, v in sorted(self.tags.items())]) + if self.tags or tags: + if not tags: + tags = self.tags + esc = '"' if quote else '' + result += '{' + ','.join([f'{k}={esc}{v}{esc}' for k, v in sorted(self.tags.items()) if k in tags]) + '}' return result def add_tag(self, key: str, value: str): @@ -252,6 +245,7 @@ def parse_metrics( out = {} try: last_type = None + last_desc = None for line in fin: if not line: continue @@ -265,6 +259,8 @@ def parse_metrics( last_type = MetricType.GAUGE elif tpe == 'counter': last_type = MetricType.COUNTER + elif line.startswith('# HELP'): + last_desc = line.split(None, 3)[-1] # skip first 3 words (#, HELP, metric name) continue m = metric_line_re.match(line) if m: @@ -275,7 +271,7 @@ def parse_metrics( name = ab[0] value = num(ab[1]) - metric = Metric(name, last_type, value) + metric = Metric(name, last_type, last_desc, value) metric.add_tag('n', nick) if not metrics_names or metric.name in metrics_names: if metric.name not in out: @@ -288,7 +284,7 @@ def parse_metrics( if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2: m = list(out.keys()) name = f'{m[0]}_-_{m[1]}' - metric = Metric(name, MetricType.GAUGE, out[m[0]][0].value - out[m[1]][0].value) + metric = Metric(name, MetricType.GAUGE, f'Diff of {m[0]} and {m[1]}', out[m[0]][0].value - out[m[1]][0].value) out = {name: [metric]} return out