Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro topic pass through kwargs #6

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.0.4"
current_version = "0.1.0"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ DJANGO_KAFKA = {
}
```

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.
**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Settings:

Expand Down
2 changes: 1 addition & 1 deletion django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.0.4"
__version__ = "0.1.0"

__all__ = [
"autodiscover",
Expand Down
6 changes: 3 additions & 3 deletions django_kafka/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from django_kafka.consumer import Consumer, Topics


class StopWhileTrueException(Exception):
class StopWhileTrueError(Exception):
pass


class ConsumerTestCase(TestCase):
@patch(
"django_kafka.consumer.Consumer.process_message",
side_effect=StopWhileTrueException(),
side_effect=StopWhileTrueError(),
)
@patch(
"django_kafka.consumer.ConfluentConsumer",
Expand All @@ -34,7 +34,7 @@ class SomeConsumer(Consumer):
# hack to break infinite loop
# `consumer.start` is using `while True:` loop which never ends
# in order to test it we need to break it which is achievable with side_effect
with suppress(StopWhileTrueException):
with suppress(StopWhileTrueError):
consumer.start()

# subscribed to the topics defined by consumer class
Expand Down
93 changes: 71 additions & 22 deletions django_kafka/tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,36 @@ class TopicTestCase(TestCase):
def setUp(self):
self.topic = SomeTopic()

@patch("django_kafka.topic.Topic.serialize")
@patch("django_kafka.DjangoKafka.producer")
def test_produce_serializer_kwargs(self, mock_kafka_producer, mock_topic_serialize):
key = "key"
value = "message value"
headers = None # default is None when not provided
key_serializer_kwargs = {"a": "b"}
value_serializer_kwargs = {"c": "d"}

self.topic.produce(
value,
key=key,
key_serializer_kwargs=key_serializer_kwargs,
value_serializer_kwargs=value_serializer_kwargs,
)

self.assertEqual(
mock_topic_serialize.call_args_list,
[
call(key, MessageField.KEY, headers, **key_serializer_kwargs),
call(value, MessageField.VALUE, headers, **value_serializer_kwargs),
],
)

mock_kafka_producer.produce.assert_called_once_with(
self.topic.name,
mock_topic_serialize.return_value,
key=mock_topic_serialize.return_value,
)

@patch("django_kafka.topic.Topic.serialize")
@patch("django_kafka.DjangoKafka.producer")
def test_produce_only_value(self, mock_kafka_producer, mock_topic_serialize):
Expand Down Expand Up @@ -62,11 +92,13 @@ def test_produce_key_is_serialized(self, mock_kafka_producer, mock_topic_seriali
def test_deserialize_key(self, mock_topic_context, mock_key_deserializer):
value = b"some key"
field = MessageField.KEY
kwargs = {"key": "value"}

self.topic.deserialize(value, field)
self.topic.deserialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_key_deserializer.assert_called_once_with(
mock_key_deserializer.assert_called_once_with(**kwargs)
mock_key_deserializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand All @@ -76,11 +108,13 @@ def test_deserialize_key(self, mock_topic_context, mock_key_deserializer):
def test_deserialize_value(self, mock_topic_context, mock_value_deserializer):
value = b"some value"
field = MessageField.VALUE
kwargs = {"key": "value"}

self.topic.deserialize(value, field)
self.topic.deserialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_value_deserializer.assert_called_once_with(
mock_value_deserializer.assert_called_once_with(**kwargs)
mock_value_deserializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand Down Expand Up @@ -111,11 +145,13 @@ def test_deserialize_unknown_field(
def test_serialize_key(self, mock_topic_context, mock_key_serializer):
value = "some key"
field = MessageField.KEY
kwargs = {"key": "value"}

self.topic.serialize(value, field)
self.topic.serialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_key_serializer.assert_called_once_with(
mock_key_serializer.assert_called_once_with(**kwargs)
mock_key_serializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand All @@ -125,11 +161,13 @@ def test_serialize_key(self, mock_topic_context, mock_key_serializer):
def test_serialize_value(self, mock_topic_context, mock_value_serializer):
value = "some value"
field = MessageField.VALUE
kwargs = {"key": "value"}

self.topic.serialize(value, field)
self.topic.serialize(value, field, **kwargs)

mock_topic_context.assert_called_once_with(field, None)
mock_value_serializer.assert_called_once_with(
mock_value_serializer.assert_called_once_with(**kwargs)
mock_value_serializer.return_value.assert_called_once_with(
value,
mock_topic_context.return_value,
)
Expand Down Expand Up @@ -187,51 +225,62 @@ def setUp(self):
self.topic = ATopic()

@patch("django_kafka.topic.AvroSerializer")
def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
key_serializer = self.topic.key_serializer
def test_get_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
kwargs = {
"schema_str": "<some schema>",
"conf": {},
}
key_serializer = self.topic.get_key_serializer(**kwargs)

# returns AvroSerializer instance
self.assertEqual(key_serializer, mock_avro_serializer.return_value)
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
**kwargs,
)

@patch("django_kafka.topic.AvroDeserializer")
def test_key_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client):
key_deserializer = self.topic.key_deserializer
def test_get_key_deserializer(
self, mock_avro_deserializer, mock_kafka_schema_client):
kwargs = {}
key_deserializer = self.topic.get_key_deserializer(**kwargs)

# returns mock_AvroDeserializer instance
self.assertEqual(key_deserializer, mock_avro_deserializer.return_value)
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
**kwargs,
)

@patch("django_kafka.topic.AvroSerializer")
def test_value_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
value_serializer = self.topic.value_serializer
def test_get_value_serializer(
self, mock_avro_serializer, mock_kafka_schema_client):
kwargs = {
"schema_str": "<some schema>",
"conf": {},
}
value_serializer = self.topic.get_value_serializer(**kwargs)

# returns AvroSerializer instance
self.assertEqual(value_serializer, mock_avro_serializer.return_value)
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
**kwargs,
)

@patch("django_kafka.topic.AvroDeserializer")
def test_value_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client):
value_deserializer = self.topic.value_deserializer
def test_get_value_deserializer(
self, mock_avro_deserializer, mock_kafka_schema_client):
kwargs = {}
value_deserializer = self.topic.get_value_deserializer(**kwargs)

# returns mock_AvroDeserializer instance
self.assertEqual(value_deserializer, mock_avro_deserializer.return_value)
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema,
**kwargs,
)
Loading
Loading