Skip to content

Commit

Permalink
feat: increase configuration options and fix bugs after the python3 u…
Browse files Browse the repository at this point in the history
…pgrade
  • Loading branch information
talves committed May 9, 2024
1 parent 1778d79 commit a7e57a0
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 13 deletions.
14 changes: 14 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,26 @@ e.g.: the CLI option ``auth-address`` becomes ``auth_address`` in the
- 10.10.10.1
- 10.10.10.2
- 10.10.10.3
opts:
batch_size: 16384
max_request_size: 1048576
buffer_memory: 33554432
send_buffer_bytes: 131072
max_in_flight_requests_per_connection: 5
retries: 0
max_block_ms: 60000
linger_ms: 1000
The configuration above listens to the syslog messages from the Kafka bootstrap
servers ``10.10.10.1``, ``10.10.10.2`` and ``10.10.10.3`` then publishes the
structured objects encrypted and serialized via ZeroMQ, serving them at the
address ``172.17.17.2``, port ``49017``.

The opts listed there are the kafka producer options that the napalm-logs exposes
They are directly named in the same way as the kafka python3 package:
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
These opts are optional - you can chose to add them completely, only partially or not at all.

Check the complete list of configuration options under
:ref:`configuration-options`.

Expand Down
28 changes: 28 additions & 0 deletions docs/options/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,34 @@ Configuration file example:
hwm: 0
.. _configuration-backlog:

``backlog``: 100
-------------

.. versionadded:: 0.11.0

The zmq backlog option shall set the maximum length of the queue of outstanding peer
connections for the specified socket; this only applies to connection-oriented transports.
This is used for both external zmq publishers but also but the internally defined zmq that
saves the messages before sending them to the configured publishers.

This option can be used to tune the performances of the napalm-logs.
While the default limit should be generally enough, in environments with extremely high
density of syslog messages to be processed, it is recommended to increase this value.

CLI usage example:

.. code-block:: bash
$ napalm-logs --backlog 0
Configuration file example:

.. code-block:: yaml
backlog: 0
.. _configuration-options-keyfile:

``keyfile``
Expand Down
5 changes: 4 additions & 1 deletion napalm_logs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
device_blacklist=[],
device_whitelist=[],
hwm=None,
backlog=None,
device_worker_processes=1,
serializer="msgpack",
buffer=None,
Expand Down Expand Up @@ -128,6 +129,7 @@ def __init__(
self.serializer = serializer
self.device_worker_processes = device_worker_processes
self.hwm = hwm
self.backlog = backlog
self._buffer_cfg = buffer
self._buffer = None
# Setup the environment
Expand Down Expand Up @@ -209,6 +211,7 @@ def _post_preparation(self):
already setup).
"""
self.opts["hwm"] = CONFIG.ZMQ_INTERNAL_HWM if self.hwm is None else self.hwm
self.opts["backlog"] = CONFIG.ZMQ_INTERNAL_BACKLOG if self.backlog is None else self.backlog
self.opts["_server_send_unknown"] = False
for pub in self.publisher:
pub_name = list(pub.keys())[0]
Expand Down Expand Up @@ -631,7 +634,7 @@ def _start_srv_proc(self, started_os_proc):

def _start_pub_px_proc(self):
""" """
px = NapalmLogsPublisherProxy(self.opts["hwm"])
px = NapalmLogsPublisherProxy(self.opts["hwm"], self.opts["backlog"])
proc = Process(target=px.start)
proc.start()
proc.description = "Publisher proxy process"
Expand Down
1 change: 1 addition & 0 deletions napalm_logs/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LOG_FILE = os.path.join(ROOT_DIR, "var", "log", "napalm", "logs")
LOG_FILE_CLI_OPTIONS = ("cli", "screen")
ZMQ_INTERNAL_HWM = 1000
ZMQ_INTERNAL_BACKLOG = 100
METRICS_ADDRESS = "0.0.0.0"
METRICS_PORT = 9443
METRICS_DIR = "/tmp/napalm_logs_metrics"
Expand Down
8 changes: 4 additions & 4 deletions napalm_logs/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ def _parse(self, msg_dict):
ret[key] = result
return ret
if error_present is True:
log.info(
log.debug(
"Configured regex did not match for os: %s tag %s",
self._name,
msg_dict.get("tag", ""),
)
else:
log.info(
log.debug(
"Syslog message not configured for os: %s tag %s",
self._name,
msg_dict.get("tag", ""),
Expand Down Expand Up @@ -248,7 +248,7 @@ def start(self):
"Counter of failed OpenConfig object generations",
["device_os"],
)
if self.opts.get("metrics_include_attributes", True):
if self.opts.get("metrics_include_attributes", False):
napalm_logs_device_published_messages_attrs = Counter(
"napalm_logs_device_published_messages_attrs",
"Counter of published messages, with more granular selection",
Expand Down Expand Up @@ -355,7 +355,7 @@ def start(self):
self.pub.send(umsgpack.packb(to_publish))
# self._publish(to_publish)
napalm_logs_device_published_messages.labels(device_os=self._name).inc()
if self.opts.get("metrics_include_attributes", True):
if self.opts.get("metrics_include_attributes", False):
napalm_logs_device_published_messages_attrs.labels(
device_os=self._name,
error=to_publish["error"],
Expand Down
4 changes: 3 additions & 1 deletion napalm_logs/listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ def receive(self):
log.error("Received kafka error: %s", error, exc_info=True)
raise ListenerException(error)
log_source = msg.key
if isinstance(log_source, bytes):
log_source = log_source.decode()
try:
decoded = json.loads(msg.value.decode("utf-8"))
except ValueError:
log.error("Not in json format: %s", msg.value.decode("utf-8"))
return "", ""
log_message = decoded.get("message")
log.debug("[%s] Received %s from %s", log_message, log_source, time.time())
log.debug("[%s] Received from kafka %s from %s", log_message, log_source, time.time())
return log_message, log_source

def stop(self):
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/listener/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _client_connection(self, conn, addr):
# log.debug('Received empty message from %s', addr)
# disabled ^ as it was too noisy
continue
log.debug("[%s] Received %s from %s", time.time(), msg, addr)
log.debug("[%s] Received from tcp %s from %s", time.time(), msg, addr)
messages = []
if isinstance(msg, bytes):
msg = msg.decode("utf-8")
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/listener/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def receive(self):
except socket.error as error:
log.error("Received listener socket error: %s", error, exc_info=True)
raise ListenerException(error)
log.debug("[%s] Received %s from %s", msg, addr, time.time())
log.debug("[%s] Received from udp %s from %s", msg, addr, time.time())
return msg, addr[0]

def stop(self):
Expand Down
2 changes: 2 additions & 0 deletions napalm_logs/listener_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def start(self):
while self.__up:
try:
log_message, log_source = self.listener.receive()
if isinstance(log_source, bytes):
log_source = log_source.decode()
except ListenerException as lerr:
if self.__up is False:
log.info("Exiting on process shutdown")
Expand Down
4 changes: 3 additions & 1 deletion napalm_logs/pub_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class NapalmLogsPublisherProxy(NapalmLogsProc):
Internal IPC proxy sub-process class.
"""

def __init__(self, hwm):
def __init__(self, hwm, backlog):
self.hwm = hwm
self.backlog = backlog
self.__up = False

def _exit_gracefully(self, signum, _):
Expand All @@ -49,6 +50,7 @@ def _setup_ipc(self):
self.sub.setsockopt(zmq.SUBSCRIBE, b"")
log.debug("Setting HWM for the proxy frontend: %d", self.hwm)
self.sub.setsockopt(zmq.SNDHWM, self.hwm)
self.sub.setsockopt(zmq.BACKLOG, self.backlog)
# Backend
self.pub = self.ctx.socket(zmq.PUB)
self.pub.bind(PUB_IPC_URL)
Expand Down
2 changes: 1 addition & 1 deletion napalm_logs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def start(self):
"napalm_logs_server_messages_unknown_queued",
"Count of messages queued as unknown",
)
if self.opts.get("metrics_include_attributes", True):
if self.opts.get("metrics_include_attributes", False):
napalm_logs_server_messages_attrs = Counter(
"napalm_logs_server_messages_attrs",
"Count of messages from the server process with their details",
Expand Down
29 changes: 26 additions & 3 deletions napalm_logs/transport/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@
from napalm_logs.transport.base import TransportBase

log = logging.getLogger(__name__)

kafka_producer_opts = {
"batch_size": 16384,
"max_request_size": 1048576,
"buffer_memory": 33554432, #32MB
"send_buffer_bytes": 131072,
"max_in_flight_requests_per_connection": 5,
"retries": 0,
"max_block_ms": 60000,
"linger_ms": 0
}

class KafkaTransport(TransportBase):
"""
Expand All @@ -39,18 +48,32 @@ def __init__(self, address, port, **kwargs):
self.bootstrap_servers = kwargs["bootstrap_servers"]
else:
self.bootstrap_servers = "{}:{}".format(address, port)
if kwargs.get("opts"):
self.opts = kwargs["opts"]
else:
self.opts = kafka_producer_opts
self.kafka_topic = kwargs.get("topic", "napalm-logs")

def start(self):
try:
self.producer = kafka.KafkaProducer(
bootstrap_servers=self.bootstrap_servers
)
bootstrap_servers=self.bootstrap_servers,
batch_size=self.opts.get("batch_size", kafka_producer_opts["batch_size"]),
max_request_size=self.opts.get("max_request_size", kafka_producer_opts["max_request_size"]),
buffer_memory=self.opts.get("buffer_memory", kafka_producer_opts["buffer_memory"]),
send_buffer_bytes=self.opts.get("send_buffer_bytes", kafka_producer_opts["send_buffer_bytes"]),
max_in_flight_requests_per_connection=self.opts.get("max_in_flight_requests_per_connection", kafka_producer_opts["max_in_flight_requests_per_connection"]),
retries=self.opts.get("retries", kafka_producer_opts["retries"]),
max_block_ms=self.opts.get("max_block_ms", kafka_producer_opts["max_block_ms"]),
linger_ms=self.opts.get("linger_ms", kafka_producer_opts["linger_ms"]),
)
except kafka.errors.NoBrokersAvailable as err:
log.error(err, exc_info=True)
raise NapalmLogsException(err)

def publish(self, obj):
if isinstance(obj, str):
obj = obj.encode()
self.producer.send(self.kafka_topic, obj)

def stop(self):
Expand Down

0 comments on commit a7e57a0

Please sign in to comment.