Skip to content

Commit

Permalink
Implement several adjustments to the Topic and AvroTopic.
Browse files Browse the repository at this point in the history
- `Topic.produce` now takes `key_serializer_kwargs` and `value_serializer_kwargs` kw arguments and passes them to the serializer init.
- `Topic.consume` is not required to implement.
- Introduce `AvroSerializer.schema_config` which is used to initialize serializers with `conf` kwargs.
  • Loading branch information
Bogdan Radko committed Aug 12, 2024
1 parent 980e0ee commit 7932a39
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 83 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ DJANGO_KAFKA = {
}
```

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.
**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Settings:

Expand Down
6 changes: 3 additions & 3 deletions django_kafka/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from django_kafka.consumer import Consumer, Topics


class StopWhileTrueException(Exception):
class StopWhileTrueError(Exception):
pass


class ConsumerTestCase(TestCase):
@patch(
"django_kafka.consumer.Consumer.process_message",
side_effect=StopWhileTrueException(),
side_effect=StopWhileTrueError(),
)
@patch(
"django_kafka.consumer.ConfluentConsumer",
Expand All @@ -34,7 +34,7 @@ class SomeConsumer(Consumer):
# hack to break infinite loop
# `consumer.start` is using `while True:` loop which never ends
# in order to test it we need to break it which is achievable with side_effect
with suppress(StopWhileTrueException):
with suppress(StopWhileTrueError):
consumer.start()

# subscribed to the topics defined by consumer class
Expand Down
93 changes: 71 additions & 22 deletions django_kafka/tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,36 @@ class TopicTestCase(TestCase):
def setUp(self):
self.topic = SomeTopic()

@patch("django_kafka.topic.Topic.serialize")
@patch("django_kafka.DjangoKafka.producer")
def test_produce_serializer_kwargs(self, mock_kafka_producer, mock_topic_serialize):
key = "key"
value = "message value"
headers = None # default is None when not provided
key_serializer_kwargs = {"a": "b"}
value_serializer_kwargs = {"c": "d"}

self.topic.produce(
value,
key=key,
key_serializer_kwargs=key_serializer_kwargs,
value_serializer_kwargs=value_serializer_kwargs,
)

self.assertEqual(
mock_topic_serialize.call_args_list,
[
call(key, MessageField.KEY, headers, **key_serializer_kwargs),
call(value, MessageField.VALUE, headers, **value_serializer_kwargs),
],
)

mock_kafka_producer.produce.assert_called_once_with(
self.topic.name,
mock_topic_serialize.return_value,
key=mock_topic_serialize.return_value,
)

@patch("django_kafka.topic.Topic.serialize")
@patch("django_kafka.DjangoKafka.producer")
def test_produce_only_value(self, mock_kafka_producer, mock_topic_serialize):
Expand Down Expand Up @@ -62,11 +92,13 @@ def test_produce_key_is_serialized(self, mock_kafka_producer, mock_topic_seriali
def test_deserialize_key(self, mock_topic_context, mock_key_deserializer):
value = b"some key"
field = MessageField.KEY
kwargs = {"key": "value"}

self.topic.deserialize(value, field)
self.topic.deserialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_key_deserializer.assert_called_once_with(
mock_key_deserializer.assert_called_once_with(**kwargs)
mock_key_deserializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand All @@ -76,11 +108,13 @@ def test_deserialize_key(self, mock_topic_context, mock_key_deserializer):
def test_deserialize_value(self, mock_topic_context, mock_value_deserializer):
value = b"some value"
field = MessageField.VALUE
kwargs = {"key": "value"}

self.topic.deserialize(value, field)
self.topic.deserialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_value_deserializer.assert_called_once_with(
mock_value_deserializer.assert_called_once_with(**kwargs)
mock_value_deserializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand Down Expand Up @@ -111,11 +145,13 @@ def test_deserialize_unknown_field(
def test_serialize_key(self, mock_topic_context, mock_key_serializer):
value = "some key"
field = MessageField.KEY
kwargs = {"key": "value"}

self.topic.serialize(value, field)
self.topic.serialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_key_serializer.assert_called_once_with(
mock_key_serializer.assert_called_once_with(**kwargs)
mock_key_serializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand All @@ -125,11 +161,13 @@ def test_serialize_key(self, mock_topic_context, mock_key_serializer):
def test_serialize_value(self, mock_topic_context, mock_value_serializer):
value = "some value"
field = MessageField.VALUE
kwargs = {"key": "value"}

self.topic.serialize(value, field)
self.topic.serialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_value_serializer.assert_called_once_with(
mock_value_serializer.assert_called_once_with(**kwargs)
mock_value_serializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand Down Expand Up @@ -187,51 +225,62 @@ def setUp(self):
self.topic = ATopic()

@patch("django_kafka.topic.AvroSerializer")
def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
key_serializer = self.topic.key_serializer
def test_get_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
kwargs = {
"schema_str": "<some schema>",
"conf": {},
}
key_serializer = self.topic.get_key_serializer(**kwargs)

# returns AvroSerializer instance
self.assertEqual(key_serializer, mock_avro_serializer.return_value)
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
**kwargs,
)

@patch("django_kafka.topic.AvroDeserializer")
def test_key_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client):
key_deserializer = self.topic.key_deserializer
def test_get_key_deserializer(
self, mock_avro_deserializer, mock_kafka_schema_client):
kwargs = {}
key_deserializer = self.topic.get_key_deserializer(**kwargs)

# returns mock_AvroDeserializer instance
self.assertEqual(key_deserializer, mock_avro_deserializer.return_value)
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
**kwargs,
)

@patch("django_kafka.topic.AvroSerializer")
def test_value_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
value_serializer = self.topic.value_serializer
def test_get_value_serializer(
self, mock_avro_serializer, mock_kafka_schema_client):
kwargs = {
"schema_str": "<some schema>",
"conf": {},
}
value_serializer = self.topic.get_value_serializer(**kwargs)

# returns AvroSerializer instance
self.assertEqual(value_serializer, mock_avro_serializer.return_value)
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
**kwargs,
)

@patch("django_kafka.topic.AvroDeserializer")
def test_value_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client):
value_deserializer = self.topic.value_deserializer
def test_get_value_deserializer(
self, mock_avro_deserializer, mock_kafka_schema_client):
kwargs = {}
value_deserializer = self.topic.get_value_deserializer(**kwargs)

# returns mock_AvroDeserializer instance
self.assertEqual(value_deserializer, mock_avro_deserializer.return_value)
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
**kwargs,
)
Loading

0 comments on commit 7932a39

Please sign in to comment.