Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log processing delay metric #327

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion napalm_logs/listener_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Import pythond stdlib
import os
import time
import signal
import logging
import threading
Expand Down Expand Up @@ -102,6 +103,7 @@ def start(self):
while self.__up:
try:
log_message, log_source = self.listener.receive()
receive_timestamp = time.time()
except ListenerException as lerr:
if self.__up is False:
log.info('Exiting on process shutdown')
Expand All @@ -114,7 +116,7 @@ def start(self):
log.info('Empty message received from %s. Not queueing to the server.', log_source)
continue
c_logs_ingested.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc()
self.pub.send(umsgpack.packb((log_message, log_source)))
self.pub.send(umsgpack.packb((log_message, log_source, receive_timestamp)))
c_messages_published.labels(listener_type=self._listener_type, address=self.address, port=self.port).inc()

def stop(self):
Expand Down
15 changes: 14 additions & 1 deletion napalm_logs/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Import pythond stdlib
import os
import time
import signal
import logging
import threading
Expand All @@ -15,7 +16,7 @@
import umsgpack
import nacl.utils
import nacl.secret
from prometheus_client import Counter
from prometheus_client import Counter, Gauge

# Import napalm-logs pkgs
import napalm_logs.utils
Expand Down Expand Up @@ -149,6 +150,11 @@ def start(self):
"Count of published messages",
['publisher_type', 'address', 'port']
)
napalm_logs_processing_delay = Gauge(
'napalm_logs_processing_delay',
'The time difference in seconds between when the message arrive and published',
['publisher_type', 'address', 'port']
)
self._setup_ipc()
# Start suicide polling thread
thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),))
Expand All @@ -167,6 +173,7 @@ def start(self):
log.error(error, exc_info=True)
raise NapalmLogsExit(error)
obj = umsgpack.unpackb(bin_obj)
recv_ts = obj['message_details'].pop('_napalm_logs_received_timestamp')
if self._strip_message_details:
obj.pop('message_details', None)
bin_obj = self.serializer_fun(obj)
Expand Down Expand Up @@ -195,6 +202,12 @@ def start(self):
if not self.disable_security and self.__transport_encrypt:
# Encrypt only when needed.
serialized_obj = self._prepare(serialized_obj)
processing_delay = time.time() - float(recv_ts)
napalm_logs_processing_delay.labels(
publisher_type=self._transport_type,
address=self.address,
port=self.port
).set(processing_delay)
self.transport.publish(serialized_obj)
napalm_logs_publisher_messages_published.labels(
publisher_type=self._transport_type,
Expand Down
3 changes: 2 additions & 1 deletion napalm_logs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def start(self):
# Take messages from the main queue
try:
bin_obj = self.sub.recv()
msg, address = umsgpack.unpackb(bin_obj, use_list=False)
msg, address, recv_ts = umsgpack.unpackb(bin_obj, use_list=False)
except zmq.ZMQError as error:
if self.__up is False:
log.info('Exiting on process shutdown')
Expand All @@ -283,6 +283,7 @@ def start(self):
os_list = self._identify_os(msg)

for dev_os, msg_dict in os_list:
msg_dict['_napalm_logs_received_timestamp'] = recv_ts
if dev_os and dev_os in self.started_os_proc:
# Identified the OS and the corresponding process is started.
# Then send the message in the right queue
Expand Down