Skip to content

Commit

Permalink
Merge pull request #55 from RedHatInsights/dlq_producer_migration
Browse files Browse the repository at this point in the history
Migrate DLQ producer from Kafka Python to Confluent
  • Loading branch information
joselsegura authored Mar 31, 2023
2 parents cd48c7d + 8331864 commit 4ffb312
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
exclude: "doc|research|demos"
repos:
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 23.3.0
hooks:
- id: black
args: [--safe, --quiet, --line-length, "100"]
Expand Down
24 changes: 6 additions & 18 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
import time
from threading import Thread

from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message
from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message, Producer
from insights_messaging.consumers import Consumer
from kafka import KafkaProducer

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.ingress import parse_ingress_message
from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -87,9 +85,7 @@ def __init__(
self.dead_letter_queue_topic = dead_letter_queue_topic

if self.dead_letter_queue_topic is not None:
config = translate_kafka_configuration(kafka_broker_config)
dlq_producer_config = producer_config(config)
self.dlq_producer = KafkaProducer(**dlq_producer_config)
self.dlq_producer = Producer(kwargs)

def get_url(self, input_msg: dict) -> str:
"""
Expand Down Expand Up @@ -237,18 +233,10 @@ def process_dead_letter(self, msg: Message) -> None:
if not self.dlq_producer:
return

if isinstance(msg, Message):
self.dlq_producer.send(
self.dead_letter_queue_topic,
msg.value(),
)

else:
# just add at least some record in case that the message is not of the expected type
self.dlq_producer.send(
self.dead_letter_queue_topic,
str(msg).encode("utf-8"),
)
self.dlq_producer.send(
self.dead_letter_queue_topic,
msg.value(),
)


def get_stringfied_record(input_record: dict) -> str:
Expand Down
34 changes: 33 additions & 1 deletion test/consumers/kafka_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import logging
import io
import time

from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -393,3 +392,36 @@ def test_non_processed_to_dlq(value, expected):

sut.process_msg(input_msg)
process_dlq_mock.assert_called_with(input_msg)


@pytest.mark.parametrize("value,expected", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_process_dead_letter_no_configured(value, expected):
"""Check that process_dead_letter method works as expected."""
sut = KafkaConsumer(None, None, None, None)
sut.dlq_producer = MagicMock() # inject a mock producer
sut.dlq_producer.__bool__.return_value = False

input_message = KafkaMessage(value)

sut.process_dead_letter(input_message)
assert not sut.dlq_producer.send.called


@pytest.mark.parametrize("value,expected", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
@patch("ccx_messaging.consumers.kafka_consumer.Producer")
def test_process_dead_letter_message(producer_init_mock, value, expected):
"""Check behaviour when DLQ is properly configured."""
dlq_topic_name = "dlq_topic"
producer_mock = MagicMock()
producer_init_mock.return_value = producer_mock

sut = KafkaConsumer(None, None, None, None, dead_letter_queue_topic=dlq_topic_name)
assert producer_init_mock.called

message_mock = MagicMock()
message_mock.value.return_value = value

sut.process_dead_letter(message_mock)
producer_mock.send.assert_called_with(dlq_topic_name, value)

0 comments on commit 4ffb312

Please sign in to comment.