From 0ec2533328b69bf0e4932e92beb29b244d21898e Mon Sep 17 00:00:00 2001 From: Stefan Cardnell Date: Fri, 2 Aug 2024 17:34:11 +0200 Subject: [PATCH 1/2] fix: utilize automatic schema registration and retrieval, refs #4 --- django_kafka/tests/test_topic.py | 35 +++++--------------------- django_kafka/topic.py | 43 ++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 48 deletions(-) diff --git a/django_kafka/tests/test_topic.py b/django_kafka/tests/test_topic.py index d45435b..dddd294 100644 --- a/django_kafka/tests/test_topic.py +++ b/django_kafka/tests/test_topic.py @@ -186,31 +186,6 @@ class AvroTopicTestCase(TestCase): def setUp(self): self.topic = ATopic() - def test_key_schema(self, mock_kafka_schema_client): - schema = self.topic.key_schema - - # return value of the schema.get_latest_version method call - self.assertEqual( - schema, - mock_kafka_schema_client.get_latest_version.return_value, - ) - # get_latest_version called with right arguments - mock_kafka_schema_client.get_latest_version.assert_called_once_with( - f"{self.topic.name}-key", - ) - - def test_value_schema(self, mock_kafka_schema_client): - schema = self.topic.value_schema - # return value of the schema.get_latest_version method call - self.assertEqual( - schema, - mock_kafka_schema_client.get_latest_version.return_value, - ) - # called with right arguments - mock_kafka_schema_client.get_latest_version.assert_called_once_with( - f"{self.topic.name}-value", - ) - @patch("django_kafka.topic.AvroSerializer") def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client): key_serializer = self.topic.key_serializer @@ -220,7 +195,8 @@ def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client): # instance was initialized with right arguments mock_avro_serializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema.schema.schema_str, + schema_str=self.topic.key_schema, + conf=self.topic.serializer_conf, ) @patch("django_kafka.topic.AvroDeserializer") @@ -232,7 +208,7 @@ def test_key_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client # instance was initialized with right arguments mock_avro_deserializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema.schema.schema_str, + schema_str=self.topic.key_schema, ) @patch("django_kafka.topic.AvroSerializer") @@ -244,7 +220,8 @@ def test_value_serializer(self, mock_avro_serializer, mock_kafka_schema_client): # instance was initialized with right arguments mock_avro_serializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema.schema.schema_str, + schema_str=self.topic.key_schema, + conf=self.topic.serializer_conf, ) @patch("django_kafka.topic.AvroDeserializer") @@ -256,5 +233,5 @@ def test_value_deserializer(self, mock_avro_deserializer, mock_kafka_schema_clie # instance was initialized with right arguments mock_avro_deserializer.assert_called_once_with( mock_kafka_schema_client, - schema_str=self.topic.key_schema.schema.schema_str, + schema_str=self.topic.key_schema, ) diff --git a/django_kafka/topic.py b/django_kafka/topic.py index 1fd43e9..b27aa9e 100644 --- a/django_kafka/topic.py +++ b/django_kafka/topic.py @@ -3,6 +3,7 @@ from typing import Optional from confluent_kafka import cimpl +from confluent_kafka.schema_registry import Schema from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer from confluent_kafka.serialization import ( Deserializer, @@ -48,7 +49,10 @@ def produce(self, value: any, **kwargs): ) def deserialize( - self, value, field: MessageField, headers: Optional[dict | list] = None, + self, + value, + field: MessageField, + headers: Optional[dict | list] = None, ): if field == MessageField.VALUE: return self.value_deserializer( @@ -62,7 +66,10 @@ def deserialize( raise DjangoKafkaError(f"Unsupported deserialization field {field}.") def serialize( - self, value, field: MessageField, headers: Optional[dict | list] = None, + self, + value, + field: MessageField, + headers: Optional[dict | list] = None, ): if field == MessageField.VALUE: return self.value_serializer( @@ -84,38 +91,36 @@ def context( class AvroTopic(Topic): + serializer_conf: dict = None + @property - def key_schema(self): - return kafka.schema_client.get_latest_version(f"{self.name}-key") + def key_schema(self) -> Optional[Schema | str]: + return None @property - def value_schema(self): - return kafka.schema_client.get_latest_version(f"{self.name}-value") + def value_schema(self) -> Optional[Schema | str]: + return None @property def key_serializer(self): return AvroSerializer( kafka.schema_client, - schema_str=self.key_schema.schema.schema_str, - ) - - @property - def key_deserializer(self): - return AvroDeserializer( - kafka.schema_client, - schema_str=self.key_schema.schema.schema_str, + schema_str=self.key_schema, + conf=self.serializer_conf, ) @property def value_serializer(self): return AvroSerializer( kafka.schema_client, - schema_str=self.value_schema.schema.schema_str, + schema_str=self.value_schema, + conf=self.serializer_conf, ) + @property + def key_deserializer(self): + return AvroDeserializer(kafka.schema_client, schema_str=self.key_schema) + @property def value_deserializer(self): - return AvroDeserializer( - kafka.schema_client, - schema_str=self.value_schema.schema.schema_str, - ) + return AvroDeserializer(kafka.schema_client, schema_str=self.value_schema) From 3a1a5607094cdaed3c5b5f31013a30b3ba818b4a Mon Sep 17 00:00:00 2001 From: Stefan Cardnell Date: Fri, 2 Aug 2024 17:38:12 +0200 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=200.0.3=20=E2=86=92=200.0.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.toml | 2 +- django_kafka/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.toml b/.bumpversion.toml index 3e5cbc9..f82c3c7 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "0.0.3" +current_version = "0.0.4" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index e792442..ba3fa91 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -__version__ = "0.0.3" +__version__ = "0.0.4" __all__ = [ "autodiscover", diff --git a/pyproject.toml b/pyproject.toml index a322730..c07f31d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "django-kafka" -version = "0.0.3" +version = "0.0.4" dependencies = [ "django>=4.0,<6.0", "confluent-kafka[avro, schema-registry]==2.4.0"