From c964f3790574e4989767e60110826c7a134e6e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Segura=20Lucas?= Date: Thu, 23 Mar 2023 16:35:14 +0100 Subject: [PATCH 1/2] Publisher of rule processing reports using Confluent --- .../publishers/rule_processing_publisher.py | 134 ++++++++++++ .../rule_processing_publisher_test.py | 196 ++++++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 ccx_messaging/publishers/rule_processing_publisher.py create mode 100644 test/publishers/rule_processing_publisher_test.py diff --git a/ccx_messaging/publishers/rule_processing_publisher.py b/ccx_messaging/publishers/rule_processing_publisher.py new file mode 100644 index 0000000..7c3dd9d --- /dev/null +++ b/ccx_messaging/publishers/rule_processing_publisher.py @@ -0,0 +1,134 @@ +# Copyright 2019, 2020, 2021, 2022 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module that implements a custom Kafka publisher.""" + +import json +import logging +from json import JSONDecodeError + +from confluent_kafka import KafkaException, Producer +from insights_messaging.publishers import Publisher + +from ccx_messaging.error import CCXMessagingError + +LOG = logging.getLogger(__name__) + + +class RuleProcessingPublisher(Publisher): + """ + RuleProcessingPublisher will handle the results of the applied rules and publish them to Kafka. + + The results of the data analysis are received as a JSON (string) + and turned into a byte array using UTF-8 encoding. + The bytes are then sent to the output Kafka topic. + + Custom error handling for the whole pipeline is implemented here. + """ + + def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs): + """Construct a new `RuleProcessingPublisher` given `kwargs` from the config YAML.""" + self.topic = outgoing_topic + if type(self.topic) is not str: + raise CCXMessagingError("outgoing_topic should be an str") + + if kafka_broker_config: + kwargs.update(kafka_broker_config) + + if "bootstrap.servers" not in kwargs: + raise KafkaException("Broker not configured") + + self.producer = Producer(kwargs) + LOG.info( + "Producing to topic '%s' on brokers %s", self.topic, kwargs.get("bootstrap.servers") + ) + self.outdata_schema_version = 2 + + def publish(self, input_msg, response): + """ + Publish an EOL-terminated JSON message to the output Kafka topic. + + The response is assumed to be a string representing a valid JSON object. + A newline character will be appended to it, it will be converted into + a byte array using UTF-8 encoding and the result of that will be sent + to the producer to produce a message in the output Kafka topic. + """ + # Response is already a string, no need to JSON dump. + output_msg = {} + try: + org_id = int(input_msg["identity"]["identity"]["internal"]["org_id"]) + except (ValueError, KeyError, TypeError) as err: + raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err + + try: + account_number = int(input_msg["identity"]["identity"]["account_number"]) + except (ValueError, KeyError, TypeError) as err: + raise CCXMessagingError(f"Error extracting the Account number: {err}") from err + + try: + msg_timestamp = input_msg["timestamp"] + output_msg = { + "OrgID": org_id, + "AccountNumber": account_number, + "ClusterName": input_msg["cluster_name"], + "Report": json.loads(response), + "LastChecked": msg_timestamp, + "Version": self.outdata_schema_version, + "RequestId": input_msg.get("request_id"), + } + + message = json.dumps(output_msg) + "\n" + + LOG.debug("Sending response to the %s topic.", self.topic) + # Convert message string into a byte array. + self.producer.produce(self.topic, message.encode("utf-8")) + LOG.debug("Message has been sent successfully.") + LOG.debug( + "Message context: OrgId=%s, AccountNumber=%s, " + 'ClusterName="%s", LastChecked="%s, Version=%d"', + output_msg["OrgID"], + output_msg["AccountNumber"], + output_msg["ClusterName"], + output_msg["LastChecked"], + output_msg["Version"], + ) + + LOG.info( + "Status: Success; " + "Topic: %s; " + "Partition: %s; " + "Offset: %s; " + "LastChecked: %s", + input_msg.get("topic"), + input_msg.get("partition"), + input_msg.get("offset"), + msg_timestamp, + ) + + except KeyError as err: + raise CCXMessagingError("Missing expected keys in the input message") from err + + except (TypeError, UnicodeEncodeError, JSONDecodeError) as err: + raise CCXMessagingError(f"Error encoding the response to publish: {response}") from err + + def error(self, input_msg, ex): + """Handle pipeline errors by logging them.""" + # The super call is probably unnecessary because the default behavior + # is to do nothing, but let's call it in case it ever does anything. + super().error(input_msg, ex) + + if not isinstance(ex, CCXMessagingError): + ex = CCXMessagingError(ex) + + LOG.error(ex.format(input_msg)) diff --git a/test/publishers/rule_processing_publisher_test.py b/test/publishers/rule_processing_publisher_test.py new file mode 100644 index 0000000..9437e6e --- /dev/null +++ b/test/publishers/rule_processing_publisher_test.py @@ -0,0 +1,196 @@ +# Copyright 2023 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the RuleProcessingPublisher class.""" + +import json +from unittest.mock import MagicMock, patch + +import pytest +from confluent_kafka import KafkaException + +from ccx_messaging.error import CCXMessagingError +from ccx_messaging.publishers.rule_processing_publisher import RuleProcessingPublisher + + +def test_init(): + """Check that init creates a valid object.""" + kakfa_config = { + "bootstrap.servers": "kafka:9092", + } + RuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config) + + +INVALID_TOPIC_NAMES = [ + None, + b"Topic name", + [], + {}, + 4, + 5.5, + 5 + 2j, +] + + +@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES) +def test_init_invalid_topic(topic_name): + """Check what happens when the output_topic parameter is not valid.""" + with pytest.raises(CCXMessagingError): + RuleProcessingPublisher(topic_name) + + +INVALID_KWARGS = [ + {}, + {"bootstrap_servers": "kafka:9092"}, + {"bootstrap.servers": "kafka:9092", "unknown_option": "value"}, +] + + +@pytest.mark.parametrize("kwargs", INVALID_KWARGS) +def test_bad_initialization(kwargs): + """Check that init fails when using not valid kwargs.""" + with pytest.raises(KafkaException): + RuleProcessingPublisher(outgoing_topic="topic", **kwargs) + + +@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS) +@pytest.mark.parametrize("kwargs", INVALID_KWARGS) +def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs): + """Check that init fails when using not valid kwargs.""" + with pytest.raises(KafkaException): + RuleProcessingPublisher(outgoing_topic="topic", **kwargs) + + +INVALID_INPUT_MSGS = [ + None, + "", + 1, + 2.0, + 3 + 1j, + [], + {}, # right type, missing identity + {"identity": {}}, # missing identity-identity + {"identity": {"identity": {}}}, # missing identity-identity-internal + {"identity": {"identity": {"internal": {}}}}, # missing identity-identity-internal-org_id + {"identity": {"identity": {"internal": {"org_id": 15.2}}}}, # incorrect org_id type + {"identity": {"identity": {"internal": {"org_id": 10}}}}, # missing "account_number" + { + "identity": { + "identity": { + "internal": {"org_id": 10}, + "account_number": 1 + 2j, # incorrect account number type + }, + }, + }, + { + "identity": { + "identity": { + "internal": {"org_id": 10}, + "account_number": 1, + }, + }, + }, # missing timestamp + { + "identity": { + "identity": { + "internal": {"org_id": 10}, + "account_number": 1, + }, + }, + "timestamp": "a timestamp", + }, # missing cluster_name +] + + +@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS) +def test_publish_bad_argument(wrong_input_msg): + """Check that invalid messages passed by the framework are handled gracefully.""" + sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + with pytest.raises(CCXMessagingError): + sut.publish(wrong_input_msg, {}) + assert not sut.producer.produce.called + + +VALID_INPUT_MSG = { + "identity": { + "identity": { + "internal": {"org_id": 10}, + "account_number": 1, + }, + }, + "timestamp": "a timestamp", + "cluster_name": "uuid", + "request_id": "a request id", + "topic": "incoming_topic", + "partition": 0, + "offset": 100, +} + + +def test_publish_valid(): + """Check that Kafka producer is called with an expected message.""" + report = "{}" + + expected_output = ( + json.dumps( + { + "OrgID": 10, + "AccountNumber": 1, + "ClusterName": "uuid", + "Report": {}, + "LastChecked": "a timestamp", + "Version": 2, + "RequestId": "a request id", + } + ) + + "\n" + ) + sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + sut.publish(VALID_INPUT_MSG, report) + sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode()) + + +INVALID_REPORTS = [ + None, + 1, + 2.0, + 1 + 3j, + [], + {}, + "", +] + + +@pytest.mark.parametrize("invalid_report", INVALID_REPORTS) +def test_publish_invalid_report(invalid_report): + """Check the behaviour of publish when an invalid report is received.""" + sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + with pytest.raises(CCXMessagingError): + sut.publish(VALID_INPUT_MSG, invalid_report) + assert not sut.producer.produce.called + + +def test_error(): + """Check that error just prints a log.""" + sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + + with patch("ccx_messaging.publishers.rule_processing_publisher.LOG") as log_mock: + sut.error(VALID_INPUT_MSG, None) + assert log_mock.error.called From 30a851866ea7a0c316d1387cbe89d50e57ece749 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Segura=20Lucas?= Date: Fri, 24 Mar 2023 13:07:36 +0100 Subject: [PATCH 2/2] Fixing a typo --- ccx_messaging/publishers/rule_processing_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ccx_messaging/publishers/rule_processing_publisher.py b/ccx_messaging/publishers/rule_processing_publisher.py index 7c3dd9d..e394296 100644 --- a/ccx_messaging/publishers/rule_processing_publisher.py +++ b/ccx_messaging/publishers/rule_processing_publisher.py @@ -41,7 +41,7 @@ def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs): """Construct a new `RuleProcessingPublisher` given `kwargs` from the config YAML.""" self.topic = outgoing_topic if type(self.topic) is not str: - raise CCXMessagingError("outgoing_topic should be an str") + raise CCXMessagingError("outgoing_topic should be a str") if kafka_broker_config: kwargs.update(kafka_broker_config)