Skip to content

Commit

Permalink
Merge pull request #52 from RedHatInsights/add_confluent_publisher
Browse files Browse the repository at this point in the history
Publisher of rule processing reports using Confluent
  • Loading branch information
joselsegura authored Mar 24, 2023
2 parents 339a65c + 30a8518 commit 981da31
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 0 deletions.
134 changes: 134 additions & 0 deletions ccx_messaging/publishers/rule_processing_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2019, 2020, 2021, 2022 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module that implements a custom Kafka publisher."""

import json
import logging
from json import JSONDecodeError

from confluent_kafka import KafkaException, Producer
from insights_messaging.publishers import Publisher

from ccx_messaging.error import CCXMessagingError

LOG = logging.getLogger(__name__)


class RuleProcessingPublisher(Publisher):
"""
RuleProcessingPublisher will handle the results of the applied rules and publish them to Kafka.
The results of the data analysis are received as a JSON (string)
and turned into a byte array using UTF-8 encoding.
The bytes are then sent to the output Kafka topic.
Custom error handling for the whole pipeline is implemented here.
"""

def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs):
"""Construct a new `RuleProcessingPublisher` given `kwargs` from the config YAML."""
self.topic = outgoing_topic
if type(self.topic) is not str:
raise CCXMessagingError("outgoing_topic should be a str")

if kafka_broker_config:
kwargs.update(kafka_broker_config)

if "bootstrap.servers" not in kwargs:
raise KafkaException("Broker not configured")

self.producer = Producer(kwargs)
LOG.info(
"Producing to topic '%s' on brokers %s", self.topic, kwargs.get("bootstrap.servers")
)
self.outdata_schema_version = 2

def publish(self, input_msg, response):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.
The response is assumed to be a string representing a valid JSON object.
A newline character will be appended to it, it will be converted into
a byte array using UTF-8 encoding and the result of that will be sent
to the producer to produce a message in the output Kafka topic.
"""
# Response is already a string, no need to JSON dump.
output_msg = {}
try:
org_id = int(input_msg["identity"]["identity"]["internal"]["org_id"])
except (ValueError, KeyError, TypeError) as err:
raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err

try:
account_number = int(input_msg["identity"]["identity"]["account_number"])
except (ValueError, KeyError, TypeError) as err:
raise CCXMessagingError(f"Error extracting the Account number: {err}") from err

try:
msg_timestamp = input_msg["timestamp"]
output_msg = {
"OrgID": org_id,
"AccountNumber": account_number,
"ClusterName": input_msg["cluster_name"],
"Report": json.loads(response),
"LastChecked": msg_timestamp,
"Version": self.outdata_schema_version,
"RequestId": input_msg.get("request_id"),
}

message = json.dumps(output_msg) + "\n"

LOG.debug("Sending response to the %s topic.", self.topic)
# Convert message string into a byte array.
self.producer.produce(self.topic, message.encode("utf-8"))
LOG.debug("Message has been sent successfully.")
LOG.debug(
"Message context: OrgId=%s, AccountNumber=%s, "
'ClusterName="%s", LastChecked="%s, Version=%d"',
output_msg["OrgID"],
output_msg["AccountNumber"],
output_msg["ClusterName"],
output_msg["LastChecked"],
output_msg["Version"],
)

LOG.info(
"Status: Success; "
"Topic: %s; "
"Partition: %s; "
"Offset: %s; "
"LastChecked: %s",
input_msg.get("topic"),
input_msg.get("partition"),
input_msg.get("offset"),
msg_timestamp,
)

except KeyError as err:
raise CCXMessagingError("Missing expected keys in the input message") from err

except (TypeError, UnicodeEncodeError, JSONDecodeError) as err:
raise CCXMessagingError(f"Error encoding the response to publish: {response}") from err

def error(self, input_msg, ex):
"""Handle pipeline errors by logging them."""
# The super call is probably unnecessary because the default behavior
# is to do nothing, but let's call it in case it ever does anything.
super().error(input_msg, ex)

if not isinstance(ex, CCXMessagingError):
ex = CCXMessagingError(ex)

LOG.error(ex.format(input_msg))
196 changes: 196 additions & 0 deletions test/publishers/rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# Copyright 2023 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the RuleProcessingPublisher class."""

import json
from unittest.mock import MagicMock, patch

import pytest
from confluent_kafka import KafkaException

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.publishers.rule_processing_publisher import RuleProcessingPublisher


def test_init():
"""Check that init creates a valid object."""
kakfa_config = {
"bootstrap.servers": "kafka:9092",
}
RuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config)


INVALID_TOPIC_NAMES = [
None,
b"Topic name",
[],
{},
4,
5.5,
5 + 2j,
]


@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES)
def test_init_invalid_topic(topic_name):
"""Check what happens when the output_topic parameter is not valid."""
with pytest.raises(CCXMessagingError):
RuleProcessingPublisher(topic_name)


INVALID_KWARGS = [
{},
{"bootstrap_servers": "kafka:9092"},
{"bootstrap.servers": "kafka:9092", "unknown_option": "value"},
]


@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_initialization(kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
RuleProcessingPublisher(outgoing_topic="topic", **kwargs)


@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS)
@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
RuleProcessingPublisher(outgoing_topic="topic", **kwargs)


INVALID_INPUT_MSGS = [
None,
"",
1,
2.0,
3 + 1j,
[],
{}, # right type, missing identity
{"identity": {}}, # missing identity-identity
{"identity": {"identity": {}}}, # missing identity-identity-internal
{"identity": {"identity": {"internal": {}}}}, # missing identity-identity-internal-org_id
{"identity": {"identity": {"internal": {"org_id": 15.2}}}}, # incorrect org_id type
{"identity": {"identity": {"internal": {"org_id": 10}}}}, # missing "account_number"
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1 + 2j, # incorrect account number type
},
},
},
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
}, # missing timestamp
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
"timestamp": "a timestamp",
}, # missing cluster_name
]


@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS)
def test_publish_bad_argument(wrong_input_msg):
"""Check that invalid messages passed by the framework are handled gracefully."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(wrong_input_msg, {})
assert not sut.producer.produce.called


VALID_INPUT_MSG = {
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
"timestamp": "a timestamp",
"cluster_name": "uuid",
"request_id": "a request id",
"topic": "incoming_topic",
"partition": 0,
"offset": 100,
}


def test_publish_valid():
"""Check that Kafka producer is called with an expected message."""
report = "{}"

expected_output = (
json.dumps(
{
"OrgID": 10,
"AccountNumber": 1,
"ClusterName": "uuid",
"Report": {},
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
}
)
+ "\n"
)
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

sut.publish(VALID_INPUT_MSG, report)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())


INVALID_REPORTS = [
None,
1,
2.0,
1 + 3j,
[],
{},
"",
]


@pytest.mark.parametrize("invalid_report", INVALID_REPORTS)
def test_publish_invalid_report(invalid_report):
"""Check the behaviour of publish when an invalid report is received."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(VALID_INPUT_MSG, invalid_report)
assert not sut.producer.produce.called


def test_error():
"""Check that error just prints a log."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})

with patch("ccx_messaging.publishers.rule_processing_publisher.LOG") as log_mock:
sut.error(VALID_INPUT_MSG, None)
assert log_mock.error.called

0 comments on commit 981da31

Please sign in to comment.