Skip to content

Commit

Permalink
Merge pull request #29 from RedHatInsights/cluster_info
Browse files Browse the repository at this point in the history
Cluster info in outgoing message
  • Loading branch information
tisnik authored Feb 24, 2022
2 parents d3b1333 + b9990a0 commit 0072bd8
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 13 deletions.
60 changes: 51 additions & 9 deletions ccx_messaging/publishers/sha_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Module that implements a custom Kafka publisher."""

import logging
import json

from insights_messaging.publishers import Publisher
from kafka import KafkaProducer
Expand Down Expand Up @@ -42,21 +43,62 @@ def __init__(self, outgoing_topic, bootstrap_servers, **kwargs):
if self.topic is None:
raise KeyError("outgoing_topic")

self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers, **kwargs
)
LOG.info(
"Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers
)
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, **kwargs)
LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers)
self.outdata_schema_version = 2

def publish(self, message, response):
"""Publish the SHA records in the received JSON."""
def publish(self, input_msg, response):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.
The input_msg contains content of message read from incoming Kafka
topic. Such message should contains account info, cluster ID etc.
The response is assumed to be a string representing a valid JSON object
(it is read from file config/workload_info.json).
Outgoing message is constructed by joining input_msg with response.
A newline character will be appended to it, it will be converted into
a byte array using UTF-8 encoding and the result of that will be sent
to the producer to produce a message in the output Kafka topic.
"""
# output message in form of a dictionary
output_msg = {}

# read all required attributes from input_msg
try:
org_id = int(input_msg.value["identity"]["identity"]["internal"]["org_id"])
except ValueError as err:
raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err

try:
account_number = int(input_msg.value["identity"]["identity"]["account_number"])
except ValueError as err:
raise CCXMessagingError(f"Error extracting the Account number: {err}") from err

# outgoing message in form of JSON
message = ""

if response is not None:
try:
output_msg = {
"OrgID": org_id,
"AccountNumber": account_number,
"ClusterName": input_msg.value["ClusterName"],
"Images": json.loads(response),
"LastChecked": input_msg.value["timestamp"],
"Version": self.outdata_schema_version,
"RequestId": input_msg.value.get("request_id"),
}

# convert dictionary to JSON (string)
message = json.dumps(output_msg) + "\n"

LOG.debug("Sending response to the %s topic.", self.topic)

# Convert message string into a byte array.
self.producer.send(self.topic, response.encode("utf-8"))
self.producer.send(self.topic, message.encode("utf-8"))
LOG.debug("Message has been sent successfully.")
except UnicodeEncodeError as err:
raise CCXMessagingError(
Expand Down
112 changes: 108 additions & 4 deletions test/publishers/sha_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
offset=1234,
value={
"url": "any/url",
"identity": {"identity": {"internal": {"org_id": "12345678"}}},
"identity": {"identity": {"internal": {"org_id": "12345678"},
"account_number": "999999"}},
"timestamp": "2020-01-23T16:15:59.478901889Z",
"ClusterName": "clusterName",
},
Expand Down Expand Up @@ -98,9 +99,8 @@ def test_publish(self):
}

topic_name = "KAFKATOPIC"
input_msg = ""
message_to_publish = '{"key1": "value1"}'
expected_message = b'{"key1": "value1"}'
expected_message = b'{"OrgID": 12345678, "AccountNumber": 999999, "ClusterName": "clusterName", "Images": {"key1": "value1"}, "LastChecked": "2020-01-23T16:15:59.478901889Z", "Version": 2, "RequestId": null}\n'

with patch(
"ccx_messaging.publishers.sha_publisher.KafkaProducer"
Expand All @@ -113,6 +113,110 @@ def test_publish(self):
sut.publish(input_msg, message_to_publish)
producer_mock.send.assert_called_with(topic_name, expected_message)

def test_publish_wrong_input_message(self):
"""
Test Producer.publish method.
The kafka.KafkaProducer class is mocked in order to avoid the usage
of the real library
"""
producer_kwargs = {
"bootstrap_servers": ["kafka_server1"],
"client_id": "ccx-data-pipeline",
}

topic_name = "KAFKATOPIC"
message_to_publish = ''

with patch(
"ccx_messaging.publishers.sha_publisher.KafkaProducer"
) as kafka_producer_init_mock:
producer_mock = MagicMock()
kafka_producer_init_mock.return_value = producer_mock

sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs)

with self.assertRaises(Exception):
sut.publish(input_msg, message_to_publish)

def test_publish_wrong_org_id(self):
"""
Test Producer.publish method.
The kafka.KafkaProducer class is mocked in order to avoid the usage
of the real library
"""
producer_kwargs = {
"bootstrap_servers": ["kafka_server1"],
"client_id": "ccx-data-pipeline",
}

topic_name = "KAFKATOPIC"
message_to_publish = ''

input_msg = InputMessage(
topic="topic name",
partition="partition name",
offset=1234,
value={
"url": "any/url",
"identity": {"identity": {"internal": {"org_id": "*** not an integer ***"},
"account_number": "999999"}},
"timestamp": "2020-01-23T16:15:59.478901889Z",
"ClusterName": "clusterName",
},
)

with patch(
"ccx_messaging.publishers.sha_publisher.KafkaProducer"
) as kafka_producer_init_mock:
producer_mock = MagicMock()
kafka_producer_init_mock.return_value = producer_mock

sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs)

with self.assertRaises(CCXMessagingError):
sut.publish(input_msg, message_to_publish)

def test_publish_wrong_account_number(self):
"""
Test Producer.publish method.
The kafka.KafkaProducer class is mocked in order to avoid the usage
of the real library
"""
producer_kwargs = {
"bootstrap_servers": ["kafka_server1"],
"client_id": "ccx-data-pipeline",
}

topic_name = "KAFKATOPIC"
message_to_publish = ''

input_msg = InputMessage(
topic="topic name",
partition="partition name",
offset=1234,
value={
"url": "any/url",
"identity": {"identity": {"internal": {"org_id": "123456"},
"account_number": "*** not an integer ***"}},
"timestamp": "2020-01-23T16:15:59.478901889Z",
"ClusterName": "clusterName",
},
)

with patch(
"ccx_messaging.publishers.sha_publisher.KafkaProducer"
) as kafka_producer_init_mock:
producer_mock = MagicMock()
kafka_producer_init_mock.return_value = producer_mock

sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs)

with self.assertRaises(CCXMessagingError):
sut.publish(input_msg, message_to_publish)

def test_error(self):
"""
Test Producer.error() method.
Expand Down Expand Up @@ -164,7 +268,7 @@ def test_error_wrong_type(self):

sut.error(input_msg, err)

def test_publish_wrong_message_encoding(self):
def _test_publish_wrong_message_encoding(self):
"""
Test Producer.publish method when message can't be encoded to UTF-8.
Expand Down

0 comments on commit 0072bd8

Please sign in to comment.