From 420651c0f5c6ed09b55bb7b27962d3f4047425b7 Mon Sep 17 00:00:00 2001 From: Mircea Ulinic Date: Wed, 22 Apr 2020 14:33:30 +0100 Subject: [PATCH] Add log processing delay metric Expose a new metric ``napalm_logs_processing_delay`` that provides a (rough?) view on how long it takes a message to be processed through napalm-logs - from the moment the message is received, till published. This timing excludes the time spent effectively publishing (as it may depend of various external factors), but it includes the time spent in the internal queues. --- napalm_logs/listener_proc.py | 4 +++- napalm_logs/publisher.py | 15 ++++++++++++++- napalm_logs/server.py | 3 ++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/napalm_logs/listener_proc.py b/napalm_logs/listener_proc.py index 9020edec..210de1c2 100644 --- a/napalm_logs/listener_proc.py +++ b/napalm_logs/listener_proc.py @@ -6,6 +6,7 @@ # Import pythond stdlib import os +import time import signal import logging import threading @@ -102,6 +103,7 @@ def start(self): while self.__up: try: log_message, log_source = self.listener.receive() + receive_timestamp = time.time() except ListenerException as lerr: if self.__up is False: log.info('Exiting on process shutdown') @@ -114,7 +116,7 @@ def start(self): log.info('Empty message received from %s. Not queueing to the server.', log_source) continue c_logs_ingested.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc() - self.pub.send(umsgpack.packb((log_message, log_source))) + self.pub.send(umsgpack.packb((log_message, log_source, receive_timestamp))) c_messages_published.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc() def stop(self): diff --git a/napalm_logs/publisher.py b/napalm_logs/publisher.py index 576f6879..d313b910 100644 --- a/napalm_logs/publisher.py +++ b/napalm_logs/publisher.py @@ -6,6 +6,7 @@ # Import pythond stdlib import os +import time import signal import logging import threading @@ -15,7 +16,7 @@ import umsgpack import nacl.utils import nacl.secret -from prometheus_client import Counter +from prometheus_client import Counter, Gauge # Import napalm-logs pkgs import napalm_logs.utils @@ -149,6 +150,11 @@ def start(self): "Count of published messages", ['publisher_type', 'address', 'port'] ) + napalm_logs_processing_delay = Gauge( + 'napalm_logs_processing_delay', + 'The time difference in seconds between when the message arrive and published', + ['publisher_type', 'address', 'port'] + ) self._setup_ipc() # Start suicide polling thread thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),)) @@ -167,6 +173,7 @@ def start(self): log.error(error, exc_info=True) raise NapalmLogsExit(error) obj = umsgpack.unpackb(bin_obj) + recv_ts = obj['message_details'].pop('_napalm_logs_received_timestamp') if self._strip_message_details: obj.pop('message_details', None) bin_obj = self.serializer_fun(obj) @@ -195,6 +202,12 @@ def start(self): if not self.disable_security and self.__transport_encrypt: # Encrypt only when needed. serialized_obj = self._prepare(serialized_obj) + processing_delay = time.time() - float(recv_ts) + napalm_logs_processing_delay.labels( + publisher_type=self._transport_type, + address=self.address, + port=self.port + ).set(processing_delay) self.transport.publish(serialized_obj) napalm_logs_publisher_messages_published.labels( publisher_type=self._transport_type, diff --git a/napalm_logs/server.py b/napalm_logs/server.py index 92c98f9c..be37f65b 100644 --- a/napalm_logs/server.py +++ b/napalm_logs/server.py @@ -265,7 +265,7 @@ def start(self): # Take messages from the main queue try: bin_obj = self.sub.recv() - msg, address = umsgpack.unpackb(bin_obj, use_list=False) + msg, address, recv_ts = umsgpack.unpackb(bin_obj, use_list=False) except zmq.ZMQError as error: if self.__up is False: log.info('Exiting on process shutdown') @@ -283,6 +283,7 @@ def start(self): os_list = self._identify_os(msg) for dev_os, msg_dict in os_list: + msg_dict['_napalm_logs_received_timestamp'] = recv_ts if dev_os and dev_os in self.started_os_proc: # Identified the OS and the corresponding process is started. # Then send the message in the right queue