diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index 0063f51c..5daf65fb 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -16,7 +16,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: [3.7, 3.8, 3.9, 3.10.8] + python-version: [3.7, 3.8, 3.9, 3.10.13] steps: - uses: actions/checkout@v2 diff --git a/docs/index.rst b/docs/index.rst index 78730c0d..180b59bf 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -220,12 +220,26 @@ e.g.: the CLI option ``auth-address`` becomes ``auth_address`` in the - 10.10.10.1 - 10.10.10.2 - 10.10.10.3 + opts: + batch_size: 16384 + max_request_size: 1048576 + buffer_memory: 33554432 + send_buffer_bytes: 131072 + max_in_flight_requests_per_connection: 5 + retries: 0 + max_block_ms: 60000 + linger_ms: 1000 The configuration above listens to the syslog messages from the Kafka bootstrap servers ``10.10.10.1``, ``10.10.10.2`` and ``10.10.10.3`` then publishes the structured objects encrypted and serialized via ZeroMQ, serving them at the address ``172.17.17.2``, port ``49017``. +The opts listed there are the kafka producer options that the napalm-logs exposes +They are directly named in the same way as the kafka python3 package: +https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html +These opts are optional - you can chose to add them completely, only partially or not at all. + Check the complete list of configuration options under :ref:`configuration-options`. diff --git a/docs/options/index.rst b/docs/options/index.rst index ddd7f60f..60a7c393 100644 --- a/docs/options/index.rst +++ b/docs/options/index.rst @@ -344,6 +344,34 @@ Configuration file example: hwm: 0 +.. _configuration-backlog: + +``backlog``: 100 +------------- + +.. versionadded:: 0.11.0 + +The zmq backlog option shall set the maximum length of the queue of outstanding peer +connections for the specified socket; this only applies to connection-oriented transports. +This is used for both external zmq publishers but also but the internally defined zmq that +saves the messages before sending them to the configured publishers. + +This option can be used to tune the performances of the napalm-logs. +While the default limit should be generally enough, in environments with extremely high +density of syslog messages to be processed, it is recommended to increase this value. + +CLI usage example: + +.. code-block:: bash + + $ napalm-logs --backlog 0 + +Configuration file example: + +.. code-block:: yaml + + backlog: 0 + .. _configuration-options-keyfile: ``keyfile`` diff --git a/napalm_logs/base.py b/napalm_logs/base.py index 7d36f107..f727125e 100644 --- a/napalm_logs/base.py +++ b/napalm_logs/base.py @@ -75,6 +75,7 @@ def __init__( device_blacklist=[], device_whitelist=[], hwm=None, + backlog=None, device_worker_processes=1, serializer="msgpack", buffer=None, @@ -128,6 +129,7 @@ def __init__( self.serializer = serializer self.device_worker_processes = device_worker_processes self.hwm = hwm + self.backlog = backlog self._buffer_cfg = buffer self._buffer = None # Setup the environment @@ -209,6 +211,7 @@ def _post_preparation(self): already setup). """ self.opts["hwm"] = CONFIG.ZMQ_INTERNAL_HWM if self.hwm is None else self.hwm + self.opts["backlog"] = CONFIG.ZMQ_INTERNAL_BACKLOG if self.backlog is None else self.backlog self.opts["_server_send_unknown"] = False for pub in self.publisher: pub_name = list(pub.keys())[0] @@ -631,7 +634,7 @@ def _start_srv_proc(self, started_os_proc): def _start_pub_px_proc(self): """ """ - px = NapalmLogsPublisherProxy(self.opts["hwm"]) + px = NapalmLogsPublisherProxy(self.opts["hwm"], self.opts["backlog"]) proc = Process(target=px.start) proc.start() proc.description = "Publisher proxy process" diff --git a/napalm_logs/config/__init__.py b/napalm_logs/config/__init__.py index 67ca7f2d..e0f98b4f 100644 --- a/napalm_logs/config/__init__.py +++ b/napalm_logs/config/__init__.py @@ -29,6 +29,7 @@ LOG_FILE = os.path.join(ROOT_DIR, "var", "log", "napalm", "logs") LOG_FILE_CLI_OPTIONS = ("cli", "screen") ZMQ_INTERNAL_HWM = 1000 +ZMQ_INTERNAL_BACKLOG = 100 METRICS_ADDRESS = "0.0.0.0" METRICS_PORT = 9443 METRICS_DIR = "/tmp/napalm_logs_metrics" diff --git a/napalm_logs/device.py b/napalm_logs/device.py index 47f02e81..9b782cd0 100644 --- a/napalm_logs/device.py +++ b/napalm_logs/device.py @@ -177,13 +177,13 @@ def _parse(self, msg_dict): ret[key] = result return ret if error_present is True: - log.info( + log.debug( "Configured regex did not match for os: %s tag %s", self._name, msg_dict.get("tag", ""), ) else: - log.info( + log.debug( "Syslog message not configured for os: %s tag %s", self._name, msg_dict.get("tag", ""), @@ -248,7 +248,7 @@ def start(self): "Counter of failed OpenConfig object generations", ["device_os"], ) - if self.opts.get("metrics_include_attributes", True): + if self.opts.get("metrics_include_attributes", False): napalm_logs_device_published_messages_attrs = Counter( "napalm_logs_device_published_messages_attrs", "Counter of published messages, with more granular selection", @@ -355,7 +355,7 @@ def start(self): self.pub.send(umsgpack.packb(to_publish)) # self._publish(to_publish) napalm_logs_device_published_messages.labels(device_os=self._name).inc() - if self.opts.get("metrics_include_attributes", True): + if self.opts.get("metrics_include_attributes", False): napalm_logs_device_published_messages_attrs.labels( device_os=self._name, error=to_publish["error"], diff --git a/napalm_logs/listener/kafka.py b/napalm_logs/listener/kafka.py index 2232933b..14545cd4 100644 --- a/napalm_logs/listener/kafka.py +++ b/napalm_logs/listener/kafka.py @@ -70,13 +70,15 @@ def receive(self): log.error("Received kafka error: %s", error, exc_info=True) raise ListenerException(error) log_source = msg.key + if isinstance(log_source, bytes): + log_source = log_source.decode() try: decoded = json.loads(msg.value.decode("utf-8")) except ValueError: log.error("Not in json format: %s", msg.value.decode("utf-8")) return "", "" log_message = decoded.get("message") - log.debug("[%s] Received %s from %s", log_message, log_source, time.time()) + log.debug("[%s] Received from kafka %s from %s", log_message, log_source, time.time()) return log_message, log_source def stop(self): diff --git a/napalm_logs/listener/tcp.py b/napalm_logs/listener/tcp.py index 9f2989e4..65efd7c8 100644 --- a/napalm_logs/listener/tcp.py +++ b/napalm_logs/listener/tcp.py @@ -72,7 +72,7 @@ def _client_connection(self, conn, addr): # log.debug('Received empty message from %s', addr) # disabled ^ as it was too noisy continue - log.debug("[%s] Received %s from %s", time.time(), msg, addr) + log.debug("[%s] Received from tcp %s from %s", time.time(), msg, addr) messages = [] if isinstance(msg, bytes): msg = msg.decode("utf-8") diff --git a/napalm_logs/listener/udp.py b/napalm_logs/listener/udp.py index b8e161c0..1b6be699 100644 --- a/napalm_logs/listener/udp.py +++ b/napalm_logs/listener/udp.py @@ -74,7 +74,7 @@ def receive(self): except socket.error as error: log.error("Received listener socket error: %s", error, exc_info=True) raise ListenerException(error) - log.debug("[%s] Received %s from %s", msg, addr, time.time()) + log.debug("[%s] Received from udp %s from %s", msg, addr, time.time()) return msg, addr[0] def stop(self): diff --git a/napalm_logs/listener_proc.py b/napalm_logs/listener_proc.py index 727322e0..97a865e2 100644 --- a/napalm_logs/listener_proc.py +++ b/napalm_logs/listener_proc.py @@ -100,6 +100,8 @@ def start(self): while self.__up: try: log_message, log_source = self.listener.receive() + if isinstance(log_source, bytes): + log_source = log_source.decode() except ListenerException as lerr: if self.__up is False: log.info("Exiting on process shutdown") diff --git a/napalm_logs/pub_proxy.py b/napalm_logs/pub_proxy.py index 8a67f392..cbeb3b0d 100644 --- a/napalm_logs/pub_proxy.py +++ b/napalm_logs/pub_proxy.py @@ -29,8 +29,9 @@ class NapalmLogsPublisherProxy(NapalmLogsProc): Internal IPC proxy sub-process class. """ - def __init__(self, hwm): + def __init__(self, hwm, backlog): self.hwm = hwm + self.backlog = backlog self.__up = False def _exit_gracefully(self, signum, _): @@ -49,6 +50,7 @@ def _setup_ipc(self): self.sub.setsockopt(zmq.SUBSCRIBE, b"") log.debug("Setting HWM for the proxy frontend: %d", self.hwm) self.sub.setsockopt(zmq.SNDHWM, self.hwm) + self.sub.setsockopt(zmq.BACKLOG, self.backlog) # Backend self.pub = self.ctx.socket(zmq.PUB) self.pub.bind(PUB_IPC_URL) diff --git a/napalm_logs/server.py b/napalm_logs/server.py index 5281ae41..c6e899a6 100644 --- a/napalm_logs/server.py +++ b/napalm_logs/server.py @@ -254,7 +254,7 @@ def start(self): "napalm_logs_server_messages_unknown_queued", "Count of messages queued as unknown", ) - if self.opts.get("metrics_include_attributes", True): + if self.opts.get("metrics_include_attributes", False): napalm_logs_server_messages_attrs = Counter( "napalm_logs_server_messages_attrs", "Count of messages from the server process with their details", diff --git a/napalm_logs/transport/kafka.py b/napalm_logs/transport/kafka.py index 8969b548..e451ddaa 100644 --- a/napalm_logs/transport/kafka.py +++ b/napalm_logs/transport/kafka.py @@ -21,7 +21,16 @@ from napalm_logs.transport.base import TransportBase log = logging.getLogger(__name__) - +kafka_producer_opts = { + "batch_size": 16384, + "max_request_size": 1048576, + "buffer_memory": 33554432, #32MB + "send_buffer_bytes": 131072, + "max_in_flight_requests_per_connection": 5, + "retries": 0, + "max_block_ms": 60000, + "linger_ms": 0 +} class KafkaTransport(TransportBase): """ @@ -39,18 +48,32 @@ def __init__(self, address, port, **kwargs): self.bootstrap_servers = kwargs["bootstrap_servers"] else: self.bootstrap_servers = "{}:{}".format(address, port) + if kwargs.get("opts"): + self.opts = kwargs["opts"] + else: + self.opts = kafka_producer_opts self.kafka_topic = kwargs.get("topic", "napalm-logs") def start(self): try: self.producer = kafka.KafkaProducer( - bootstrap_servers=self.bootstrap_servers - ) + bootstrap_servers=self.bootstrap_servers, + batch_size=self.opts.get("batch_size", kafka_producer_opts["batch_size"]), + max_request_size=self.opts.get("max_request_size", kafka_producer_opts["max_request_size"]), + buffer_memory=self.opts.get("buffer_memory", kafka_producer_opts["buffer_memory"]), + send_buffer_bytes=self.opts.get("send_buffer_bytes", kafka_producer_opts["send_buffer_bytes"]), + max_in_flight_requests_per_connection=self.opts.get("max_in_flight_requests_per_connection", kafka_producer_opts["max_in_flight_requests_per_connection"]), + retries=self.opts.get("retries", kafka_producer_opts["retries"]), + max_block_ms=self.opts.get("max_block_ms", kafka_producer_opts["max_block_ms"]), + linger_ms=self.opts.get("linger_ms", kafka_producer_opts["linger_ms"]), + ) except kafka.errors.NoBrokersAvailable as err: log.error(err, exc_info=True) raise NapalmLogsException(err) def publish(self, obj): + if isinstance(obj, str): + obj = obj.encode() self.producer.send(self.kafka_topic, obj) def stop(self):