diff --git a/ccx_messaging/publishers/idp_rule_processing_publisher.py b/ccx_messaging/publishers/idp_rule_processing_publisher.py new file mode 100644 index 0000000..71e5a96 --- /dev/null +++ b/ccx_messaging/publishers/idp_rule_processing_publisher.py @@ -0,0 +1,80 @@ +# Copyright 2025 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module that implements a custom Kafka publisher.""" + +import json +import logging +from typing import Any + +import jsonschema + +from ccx_messaging.error import CCXMessagingError +from ccx_messaging.publishers.kafka_publisher import KafkaPublisher +from ccx_messaging.schemas import ARCHIVE_SYNCED_SCHEMA + + +log = logging.getLogger(__name__) + + + +class IDPRuleProcessingPublisher(KafkaPublisher): + """RuleProcessingPublisher handles the results of the applied rules and publish them to Kafka. + + The results of the data analysis are received as a JSON (string) + and turned into a byte array using UTF-8 encoding. + The bytes are then sent to the output Kafka topic. + + Custom error handling for the whole pipeline is implemented here. + """ + + def publish(self, input_msg: dict[str, Any], report: str | bytes) -> None: + """Publish an EOL-terminated JSON message to the output Kafka topic. + + The report is assumed to be a string representing a valid JSON object. + A newline character will be appended to it, it will be converted into + a byte array using UTF-8 encoding and the result of that will be sent + to the producer to produce a message in the output Kafka topic. + """ + try: + jsonschema.validate(input_msg, ARCHIVE_SYNCED_SCHEMA) + + except jsonschema.ValidationError as ex: + raise CCXMessagingError("Invalid JSON format in the input message.") from ex + + try: + report = json.loads(report) + + except (TypeError, json.decoder.JSONDecodeError): + raise CCXMessagingError("Could not parse report; report is not in JSON format") + + output_msg = { + "path": input_msg["path"], + "metadata": input_msg["metadata"], + "report": report, + } + + message = json.dumps(output_msg) + log.debug("Sending response to the %s topic.", self.topic) + # Convert message string into a byte array. + self.produce(message.encode("utf-8")) + log.debug("Message has been sent successfully.") + + def error(self, input_msg: dict, ex: Exception): + """Handle pipeline errors by logging them.""" + log.warning( + "An error has ocurred during the processing of %s: %s", + input_msg, + ex, + ) diff --git a/test/publishers/idp_rule_processing_publisher_test.py b/test/publishers/idp_rule_processing_publisher_test.py new file mode 100644 index 0000000..6a7eceb --- /dev/null +++ b/test/publishers/idp_rule_processing_publisher_test.py @@ -0,0 +1,186 @@ +# Copyright 2025 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the RuleProcessingPublisher class.""" + +import json +from unittest.mock import MagicMock, patch + +import pytest +from confluent_kafka import KafkaException + +from ccx_messaging.error import CCXMessagingError +from ccx_messaging.publishers.idp_rule_processing_publisher import IDPRuleProcessingPublisher + + +def test_init(): + """Check that init creates a valid object.""" + kakfa_config = { + "bootstrap.servers": "kafka:9092", + } + IDPRuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config) + + +INVALID_TOPIC_NAMES = [ + None, + b"Topic name", + [], + {}, + 4, + 5.5, + 5 + 2j, +] + + +@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES) +def test_init_invalid_topic(topic_name): + """Check what happens when the output_topic parameter is not valid.""" + with pytest.raises(CCXMessagingError): + IDPRuleProcessingPublisher(topic_name) + + +INVALID_KWARGS = [ + {}, + {"bootstrap_servers": "kafka:9092"}, + {"bootstrap.servers": "kafka:9092", "unknown_option": "value"}, +] + + +@pytest.mark.parametrize("kwargs", INVALID_KWARGS) +def test_bad_initialization(kwargs): + """Check that init fails when using not valid kwargs.""" + with pytest.raises(KafkaException): + IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs) + + +@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS) +@pytest.mark.parametrize("kwargs", INVALID_KWARGS) +def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs): + """Check that init fails when using not valid kwargs.""" + with pytest.raises(KafkaException): + IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs) + + +INVALID_INPUT_MSGS = [ + None, + "", + 1, + 2.0, + 3 + 1j, + [], + {}, # right type, missing path and metadata + {"path": ""}, # missing metadata + {"path": "", "metadata": {}}, # missing metadata-cluster_id +] + + +@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS) +def test_publish_bad_argument(wrong_input_msg): + """Check that invalid messages passed by the framework are handled gracefully.""" + sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + with pytest.raises(CCXMessagingError): + sut.publish(wrong_input_msg, {}) + assert not sut.producer.produce.called + + +VALID_INPUT_MSG = [ + pytest.param( + { + "path": "bucket/path/to/archive.tgz", + "metadata": { + "cluster_id": "uuid", + }, + }, + { + "path": "bucket/path/to/archive.tgz", + "metadata": { + "cluster_id": "uuid", + }, + "report": { + "reports": [], + }, + }, + id="minimal valid", + ), + pytest.param( + { + "path": "bucket/path/to/archive.tgz", + "original_path": "other_than_current_path", + "metadata": { + "cluster_id": "uuid", + "external_organization": "an organization" + }, + }, + { + "path": "bucket/path/to/archive.tgz", + "metadata": { + "cluster_id": "uuid", + "external_organization": "an organization" + }, + "report": { + "reports": [], + }, + }, + id="adding optional elements", + ), +] + + +@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG) +def test_publish_valid(input, expected_output): + """Check that Kafka producer is called with an expected message.""" + report = '{"reports": []}' + + expected_output = json.dumps(expected_output) + sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + sut.publish(input, report) + sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode()) + + +INVALID_REPORTS = [ + None, + 1, + 2.0, + 1 + 3j, + [], + {}, + "", +] + + +@pytest.mark.parametrize("invalid_report", INVALID_REPORTS) +def test_publish_invalid_report(invalid_report): + """Check the behaviour of publish when an invalid report is received.""" + sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + sut.producer = MagicMock() + + with pytest.raises(CCXMessagingError): + sut.publish(VALID_INPUT_MSG, invalid_report) + assert not sut.producer.produce.called + + +@pytest.mark.parametrize("input,output", VALID_INPUT_MSG) +def test_error(input, output): + """Check that error just prints a log.""" + _ = output # output values are not needed + + sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) + + with patch("ccx_messaging.publishers.idp_rule_processing_publisher.log") as log_mock: + sut.error(input, None) + assert log_mock.warning.called