From 7932a39c468724e27d29985bcb262c77e32fc3f2 Mon Sep 17 00:00:00 2001 From: Bogdan Radko Date: Mon, 12 Aug 2024 16:58:24 +0200 Subject: [PATCH 1/2] Implement several adjustments to the `Topic` and `AvroTopic`. - `Topic.produce` now takes `key_serializer_kwargs` and `value_serializer_kwargs` kw arguments and passes them to the serializer init. - `Topic.consume` is not required to implement. - Introduce `AvroSerializer.schema_config` which is used to initialize serializers with `conf` kwargs. --- README.md | 2 +- django_kafka/tests/test_consumer.py | 6 +- django_kafka/tests/test_topic.py | 93 +++++++++++++----- django_kafka/topic.py | 144 +++++++++++++++++----------- 4 files changed, 162 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index 8908070..efc6f65 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ DJANGO_KAFKA = { } ``` -**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema. +**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema. ## Settings: diff --git a/django_kafka/tests/test_consumer.py b/django_kafka/tests/test_consumer.py index 89959e7..dd9e865 100644 --- a/django_kafka/tests/test_consumer.py +++ b/django_kafka/tests/test_consumer.py @@ -7,14 +7,14 @@ from django_kafka.consumer import Consumer, Topics -class StopWhileTrueException(Exception): +class StopWhileTrueError(Exception): pass class ConsumerTestCase(TestCase): @patch( "django_kafka.consumer.Consumer.process_message", - side_effect=StopWhileTrueException(), + side_effect=StopWhileTrueError(), ) @patch( "django_kafka.consumer.ConfluentConsumer", @@ -34,7 +34,7 @@ class SomeConsumer(Consumer): # hack to break infinite loop # `consumer.start` is using `while True:` loop which never ends # in order to test it we need to break it which is achievable with side_effect - with suppress(StopWhileTrueException): + with suppress(StopWhileTrueError): consumer.start() # subscribed to the topics defined by consumer class diff --git a/django_kafka/tests/test_topic.py b/django_kafka/tests/test_topic.py index dddd294..96c3523 100644 --- a/django_kafka/tests/test_topic.py +++ b/django_kafka/tests/test_topic.py @@ -19,6 +19,36 @@ class TopicTestCase(TestCase): def setUp(self): self.topic = SomeTopic() + @patch("django_kafka.topic.Topic.serialize") + @patch("django_kafka.DjangoKafka.producer") + def test_produce_serializer_kwargs(self, mock_kafka_producer, mock_topic_serialize): + key = "key" + value = "message value" + headers = None # default is None when not provided + key_serializer_kwargs = {"a": "b"} + value_serializer_kwargs = {"c": "d"} + + self.topic.produce( + value, + key=key, + key_serializer_kwargs=key_serializer_kwargs, + value_serializer_kwargs=value_serializer_kwargs, + ) + + self.assertEqual( + mock_topic_serialize.call_args_list, + [ + call(key, MessageField.KEY, headers, **key_serializer_kwargs), + call(value, MessageField.VALUE, headers, **value_serializer_kwargs), + ], + ) + + mock_kafka_producer.produce.assert_called_once_with( + self.topic.name, + mock_topic_serialize.return_value, + key=mock_topic_serialize.return_value, + ) + @patch("django_kafka.topic.Topic.serialize") @patch("django_kafka.DjangoKafka.producer") def test_produce_only_value(self, mock_kafka_producer, mock_topic_serialize): @@ -62,11 +92,13 @@ def test_produce_key_is_serialized(self, mock_kafka_producer, mock_topic_seriali def test_deserialize_key(self, mock_topic_context, mock_key_deserializer): value = b"some key" field = MessageField.KEY + kwargs = {"key": "value"} - self.topic.deserialize(value, field) + self.topic.deserialize(value, field, **kwargs) mock_topic_context.assert_called_once_with(field, None) - mock_key_deserializer.assert_called_once_with( + mock_key_deserializer.assert_called_once_with(**kwargs) + mock_key_deserializer.return_value.assert_called_once_with( value, mock_topic_context.return_value, ) @@ -76,11 +108,13 @@ def test_deserialize_key(self, mock_topic_context, mock_key_deserializer): def test_deserialize_value(self, mock_topic_context, mock_value_deserializer): value = b"some value" field = MessageField.VALUE + kwargs = {"key": "value"} - self.topic.deserialize(value, field) + self.topic.deserialize(value, field, **kwargs) mock_topic_context.assert_called_once_with(field, None) - mock_value_deserializer.assert_called_once_with( + mock_value_deserializer.assert_called_once_with(**kwargs) + mock_value_deserializer.return_value.assert_called_once_with( value, mock_topic_context.return_value, ) @@ -111,11 +145,13 @@ def test_deserialize_unknown_field( def test_serialize_key(self, mock_topic_context, mock_key_serializer): value = "some key" field = MessageField.KEY + kwargs = {"key": "value"} - self.topic.serialize(value, field) + self.topic.serialize(value, field, **kwargs) mock_topic_context.assert_called_once_with(field, None) - mock_key_serializer.assert_called_once_with( + mock_key_serializer.assert_called_once_with(**kwargs) + mock_key_serializer.return_value.assert_called_once_with( value, mock_topic_context.return_value, ) @@ -125,11 +161,13 @@ def test_serialize_key(self, mock_topic_context, mock_key_serializer): def test_serialize_value(self, mock_topic_context, mock_value_serializer): value = "some value" field = MessageField.VALUE + kwargs = {"key": "value"} - self.topic.serialize(value, field) + self.topic.serialize(value, field, **kwargs) mock_topic_context.assert_called_once_with(field, None) - mock_value_serializer.assert_called_once_with( + mock_value_serializer.assert_called_once_with(**kwargs) + mock_value_serializer.return_value.assert_called_once_with( value, mock_topic_context.return_value, ) @@ -187,51 +225,62 @@ def setUp(self): self.topic = ATopic() @patch("django_kafka.topic.AvroSerializer") - def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client): - key_serializer = self.topic.key_serializer + def test_get_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client): + kwargs = { + "schema_str": "", + "conf": {}, + } + key_serializer = self.topic.get_key_serializer(**kwargs) # returns AvroSerializer instance self.assertEqual(key_serializer, mock_avro_serializer.return_value) # instance was initialized with right arguments mock_avro_serializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema, - conf=self.topic.serializer_conf, + **kwargs, ) @patch("django_kafka.topic.AvroDeserializer") - def test_key_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client): - key_deserializer = self.topic.key_deserializer + def test_get_key_deserializer( + self, mock_avro_deserializer, mock_kafka_schema_client): + kwargs = {} + key_deserializer = self.topic.get_key_deserializer(**kwargs) # returns mock_AvroDeserializer instance self.assertEqual(key_deserializer, mock_avro_deserializer.return_value) # instance was initialized with right arguments mock_avro_deserializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema, + **kwargs, ) @patch("django_kafka.topic.AvroSerializer") - def test_value_serializer(self, mock_avro_serializer, mock_kafka_schema_client): - value_serializer = self.topic.value_serializer + def test_get_value_serializer( + self, mock_avro_serializer, mock_kafka_schema_client): + kwargs = { + "schema_str": "", + "conf": {}, + } + value_serializer = self.topic.get_value_serializer(**kwargs) # returns AvroSerializer instance self.assertEqual(value_serializer, mock_avro_serializer.return_value) # instance was initialized with right arguments mock_avro_serializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema, - conf=self.topic.serializer_conf, + **kwargs, ) @patch("django_kafka.topic.AvroDeserializer") - def test_value_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client): - value_deserializer = self.topic.value_deserializer + def test_get_value_deserializer( + self, mock_avro_deserializer, mock_kafka_schema_client): + kwargs = {} + value_deserializer = self.topic.get_value_deserializer(**kwargs) # returns mock_AvroDeserializer instance self.assertEqual(value_deserializer, mock_avro_deserializer.return_value) # instance was initialized with right arguments mock_avro_deserializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema, + **kwargs, ) diff --git a/django_kafka/topic.py b/django_kafka/topic.py index b27aa9e..1c5e5f7 100644 --- a/django_kafka/topic.py +++ b/django_kafka/topic.py @@ -1,9 +1,8 @@ import logging from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Type from confluent_kafka import cimpl -from confluent_kafka.schema_registry import Schema from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer from confluent_kafka.serialization import ( Deserializer, @@ -21,66 +20,83 @@ class Topic(ABC): - key_serializer: Serializer = StringSerializer() - key_deserializer: Deserializer = StringDeserializer() + key_serializer: Type[Serializer] = StringSerializer + value_serializer: Type[Serializer] = StringSerializer - value_serializer: Serializer = StringSerializer() - value_deserializer: Deserializer = StringDeserializer() + key_deserializer: Type[Deserializer] = StringDeserializer + value_deserializer: Type[Deserializer] = StringDeserializer @property @abstractmethod def name(self) -> str: """Define Kafka topic name""" - @abstractmethod def consume(self, msg: cimpl.Message): """Implement message processing""" + raise NotImplementedError def produce(self, value: any, **kwargs): + key_serializer_kwargs = kwargs.pop("key_serializer_kwargs", {}) or {} + value_serializer_kwargs = kwargs.pop("value_serializer_kwargs", {}) or {} headers = kwargs.get("headers") if "key" in kwargs: - kwargs["key"] = self.serialize(kwargs["key"], MessageField.KEY, headers) + kwargs["key"] = self.serialize( + kwargs["key"], MessageField.KEY, headers, **key_serializer_kwargs) kafka.producer.produce( self.name, - self.serialize(value, MessageField.VALUE, headers), + self.serialize( + value, MessageField.VALUE, headers, **value_serializer_kwargs, + ), **kwargs, ) - def deserialize( + def serialize( self, value, field: MessageField, headers: Optional[dict | list] = None, + **kwargs, ): if field == MessageField.VALUE: - return self.value_deserializer( - value, - self.context(MessageField.VALUE, headers), - ) + serializer = self.get_value_serializer(**kwargs) + return serializer(value, self.context(MessageField.VALUE, headers)) if field == MessageField.KEY: - return self.key_deserializer(value, self.context(MessageField.KEY, headers)) + serializer = self.get_key_serializer(**kwargs) + return serializer(value, self.context(MessageField.KEY, headers)) - raise DjangoKafkaError(f"Unsupported deserialization field {field}.") + raise DjangoKafkaError(f"Unsupported serialization field {field}.") - def serialize( + def deserialize( self, value, field: MessageField, headers: Optional[dict | list] = None, + **kwargs, ): if field == MessageField.VALUE: - return self.value_serializer( - value, - self.context(MessageField.VALUE, headers), - ) + deserializer = self.get_value_deserializer(**kwargs) + return deserializer(value, self.context(MessageField.VALUE, headers)) if field == MessageField.KEY: - return self.key_serializer(value, self.context(MessageField.KEY, headers)) + deserializer = self.get_key_deserializer(**kwargs) + return deserializer(value, self.context(MessageField.KEY, headers)) - raise DjangoKafkaError(f"Unsupported serialization field {field}.") + raise DjangoKafkaError(f"Unsupported deserialization field {field}.") + + def get_key_serializer(self, **kwargs): + return self.key_serializer(**kwargs) + + def get_value_serializer(self, **kwargs): + return self.value_serializer(**kwargs) + + def get_key_deserializer(self, **kwargs): + return self.key_deserializer(**kwargs) + + def get_value_deserializer(self, **kwargs): + return self.value_deserializer(**kwargs) def context( self, @@ -90,37 +106,51 @@ def context( return SerializationContext(self.name, field, headers=headers) -class AvroTopic(Topic): - serializer_conf: dict = None - - @property - def key_schema(self) -> Optional[Schema | str]: - return None - - @property - def value_schema(self) -> Optional[Schema | str]: - return None - - @property - def key_serializer(self): - return AvroSerializer( - kafka.schema_client, - schema_str=self.key_schema, - conf=self.serializer_conf, - ) - - @property - def value_serializer(self): - return AvroSerializer( - kafka.schema_client, - schema_str=self.value_schema, - conf=self.serializer_conf, - ) - - @property - def key_deserializer(self): - return AvroDeserializer(kafka.schema_client, schema_str=self.key_schema) - - @property - def value_deserializer(self): - return AvroDeserializer(kafka.schema_client, schema_str=self.value_schema) +class AvroTopic(Topic, ABC): + """ + Consume. + Defining schemas is not necessary as it gets retrieved automatically from the Schema Registry. + + Produce. + Defining `value_schema` is required (`key_schema` is required when using keys). + It gets submitted to the Schema Registry + + Multiple schemas and one Topic: + `AvroTopic.produce` takes `serializer_kwargs` kw argument. + `AvroSerializer` then gets initialized with the provided kwargs. + When producing you can tell which schema to use for your message: + ```python + schema = { + "type": "record", + "name": "ValueTest", + "fields": [ + {"name": "value", "type": "string"}, + ] + } + topic.produce({"value": 1}, value_serializer_kwargs={"schema_str": json.dumps(schema)}) + ``` + + [Cofluent AvroSerializer Config](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#avroserializer) + [Avro schema definition](https://avro.apache.org/docs/1.11.1/specification/) + """ # noqa: E501 + key_schema: str + value_schema: str + schema_config: dict + + def get_key_serializer(self, **kwargs): + kwargs.setdefault("schema_str", getattr(self, "key_schema", None)) + kwargs.setdefault("conf", getattr(self, "schema_config", None)) + + return AvroSerializer(kafka.schema_client, **kwargs) + + def get_value_serializer(self, **kwargs): + kwargs.setdefault("schema_str", getattr(self, "value_schema", None)) + kwargs.setdefault("conf", getattr(self, "schema_config", None)) + + return AvroSerializer(kafka.schema_client, **kwargs) + + def get_key_deserializer(self, **kwargs): + return AvroDeserializer(kafka.schema_client, **kwargs) + + def get_value_deserializer(self, **kwargs): + return AvroDeserializer(kafka.schema_client, **kwargs) From 318f1656f6cd23b3f57f51ce810cb36985ea987d Mon Sep 17 00:00:00 2001 From: Bogdan Radko Date: Tue, 13 Aug 2024 08:15:11 +0000 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=200.0.4=20=E2=86=92=200.1.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.toml | 2 +- django_kafka/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.toml b/.bumpversion.toml index f82c3c7..ab9b67a 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "0.0.4" +current_version = "0.1.0" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index ba3fa91..9f30a8c 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -__version__ = "0.0.4" +__version__ = "0.1.0" __all__ = [ "autodiscover", diff --git a/pyproject.toml b/pyproject.toml index c07f31d..b0a977d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "django-kafka" -version = "0.0.4" +version = "0.1.0" dependencies = [ "django>=4.0,<6.0", "confluent-kafka[avro, schema-registry]==2.4.0"