diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cdc0424..2b58ada 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ exclude: "doc|research|demos" repos: - repo: https://github.com/psf/black - rev: 23.1.0 + rev: 23.3.0 hooks: - id: black args: [--safe, --quiet, --line-length, "100"] diff --git a/ccx_messaging/consumers/kafka_consumer.py b/ccx_messaging/consumers/kafka_consumer.py index 37f3b5e..2f2cc6e 100644 --- a/ccx_messaging/consumers/kafka_consumer.py +++ b/ccx_messaging/consumers/kafka_consumer.py @@ -4,13 +4,11 @@ import time from threading import Thread -from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message +from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message, Producer from insights_messaging.consumers import Consumer -from kafka import KafkaProducer from ccx_messaging.error import CCXMessagingError from ccx_messaging.ingress import parse_ingress_message -from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration LOG = logging.getLogger(__name__) @@ -87,9 +85,7 @@ def __init__( self.dead_letter_queue_topic = dead_letter_queue_topic if self.dead_letter_queue_topic is not None: - config = translate_kafka_configuration(kafka_broker_config) - dlq_producer_config = producer_config(config) - self.dlq_producer = KafkaProducer(**dlq_producer_config) + self.dlq_producer = Producer(kwargs) def get_url(self, input_msg: dict) -> str: """ @@ -237,18 +233,10 @@ def process_dead_letter(self, msg: Message) -> None: if not self.dlq_producer: return - if isinstance(msg, Message): - self.dlq_producer.send( - self.dead_letter_queue_topic, - msg.value(), - ) - - else: - # just add at least some record in case that the message is not of the expected type - self.dlq_producer.send( - self.dead_letter_queue_topic, - str(msg).encode("utf-8"), - ) + self.dlq_producer.send( + self.dead_letter_queue_topic, + msg.value(), + ) def get_stringfied_record(input_record: dict) -> str: diff --git a/test/consumers/kafka_consumer_test.py b/test/consumers/kafka_consumer_test.py index 1362688..3b166f8 100644 --- a/test/consumers/kafka_consumer_test.py +++ b/test/consumers/kafka_consumer_test.py @@ -17,7 +17,6 @@ import logging import io import time - from unittest.mock import MagicMock, patch import pytest @@ -393,3 +392,36 @@ def test_non_processed_to_dlq(value, expected): sut.process_msg(input_msg) process_dlq_mock.assert_called_with(input_msg) + + +@pytest.mark.parametrize("value,expected", _VALID_MESSAGES) +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_process_dead_letter_no_configured(value, expected): + """Check that process_dead_letter method works as expected.""" + sut = KafkaConsumer(None, None, None, None) + sut.dlq_producer = MagicMock() # inject a mock producer + sut.dlq_producer.__bool__.return_value = False + + input_message = KafkaMessage(value) + + sut.process_dead_letter(input_message) + assert not sut.dlq_producer.send.called + + +@pytest.mark.parametrize("value,expected", _VALID_MESSAGES) +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +@patch("ccx_messaging.consumers.kafka_consumer.Producer") +def test_process_dead_letter_message(producer_init_mock, value, expected): + """Check behaviour when DLQ is properly configured.""" + dlq_topic_name = "dlq_topic" + producer_mock = MagicMock() + producer_init_mock.return_value = producer_mock + + sut = KafkaConsumer(None, None, None, None, dead_letter_queue_topic=dlq_topic_name) + assert producer_init_mock.called + + message_mock = MagicMock() + message_mock.value.return_value = value + + sut.process_dead_letter(message_mock) + producer_mock.send.assert_called_with(dlq_topic_name, value)