diff --git a/napalm_logs/listener_proc.py b/napalm_logs/listener_proc.py index 9020edec..210de1c2 100644 --- a/napalm_logs/listener_proc.py +++ b/napalm_logs/listener_proc.py @@ -6,6 +6,7 @@ # Import pythond stdlib import os +import time import signal import logging import threading @@ -102,6 +103,7 @@ def start(self): while self.__up: try: log_message, log_source = self.listener.receive() + receive_timestamp = time.time() except ListenerException as lerr: if self.__up is False: log.info('Exiting on process shutdown') @@ -114,7 +116,7 @@ def start(self): log.info('Empty message received from %s. Not queueing to the server.', log_source) continue c_logs_ingested.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc() - self.pub.send(umsgpack.packb((log_message, log_source))) + self.pub.send(umsgpack.packb((log_message, log_source, receive_timestamp))) c_messages_published.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc() def stop(self): diff --git a/napalm_logs/publisher.py b/napalm_logs/publisher.py index 576f6879..d313b910 100644 --- a/napalm_logs/publisher.py +++ b/napalm_logs/publisher.py @@ -6,6 +6,7 @@ # Import pythond stdlib import os +import time import signal import logging import threading @@ -15,7 +16,7 @@ import umsgpack import nacl.utils import nacl.secret -from prometheus_client import Counter +from prometheus_client import Counter, Gauge # Import napalm-logs pkgs import napalm_logs.utils @@ -149,6 +150,11 @@ def start(self): "Count of published messages", ['publisher_type', 'address', 'port'] ) + napalm_logs_processing_delay = Gauge( + 'napalm_logs_processing_delay', + 'The time difference in seconds between when the message arrive and published', + ['publisher_type', 'address', 'port'] + ) self._setup_ipc() # Start suicide polling thread thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),)) @@ -167,6 +173,7 @@ def start(self): log.error(error, exc_info=True) raise NapalmLogsExit(error) obj = umsgpack.unpackb(bin_obj) + recv_ts = obj['message_details'].pop('_napalm_logs_received_timestamp') if self._strip_message_details: obj.pop('message_details', None) bin_obj = self.serializer_fun(obj) @@ -195,6 +202,12 @@ def start(self): if not self.disable_security and self.__transport_encrypt: # Encrypt only when needed. serialized_obj = self._prepare(serialized_obj) + processing_delay = time.time() - float(recv_ts) + napalm_logs_processing_delay.labels( + publisher_type=self._transport_type, + address=self.address, + port=self.port + ).set(processing_delay) self.transport.publish(serialized_obj) napalm_logs_publisher_messages_published.labels( publisher_type=self._transport_type, diff --git a/napalm_logs/server.py b/napalm_logs/server.py index 92c98f9c..be37f65b 100644 --- a/napalm_logs/server.py +++ b/napalm_logs/server.py @@ -265,7 +265,7 @@ def start(self): # Take messages from the main queue try: bin_obj = self.sub.recv() - msg, address = umsgpack.unpackb(bin_obj, use_list=False) + msg, address, recv_ts = umsgpack.unpackb(bin_obj, use_list=False) except zmq.ZMQError as error: if self.__up is False: log.info('Exiting on process shutdown') @@ -283,6 +283,7 @@ def start(self): os_list = self._identify_os(msg) for dev_os, msg_dict in os_list: + msg_dict['_napalm_logs_received_timestamp'] = recv_ts if dev_os and dev_os in self.started_os_proc: # Identified the OS and the corresponding process is started. # Then send the message in the right queue