Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4 automatic schema registration retrieval #5

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.0.3"
current_version = "0.0.4"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
2 changes: 1 addition & 1 deletion django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.0.3"
__version__ = "0.0.4"

__all__ = [
"autodiscover",
Expand Down
35 changes: 6 additions & 29 deletions django_kafka/tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,6 @@ class AvroTopicTestCase(TestCase):
def setUp(self):
self.topic = ATopic()

def test_key_schema(self, mock_kafka_schema_client):
schema = self.topic.key_schema

# return value of the schema.get_latest_version method call
self.assertEqual(
schema,
mock_kafka_schema_client.get_latest_version.return_value,
)
# get_latest_version called with right arguments
mock_kafka_schema_client.get_latest_version.assert_called_once_with(
f"{self.topic.name}-key",
)

def test_value_schema(self, mock_kafka_schema_client):
schema = self.topic.value_schema
# return value of the schema.get_latest_version method call
self.assertEqual(
schema,
mock_kafka_schema_client.get_latest_version.return_value,
)
# called with right arguments
mock_kafka_schema_client.get_latest_version.assert_called_once_with(
f"{self.topic.name}-value",
)

@patch("django_kafka.topic.AvroSerializer")
def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
key_serializer = self.topic.key_serializer
Expand All @@ -220,7 +195,8 @@ def test_key_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema.schema.schema_str,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
)

@patch("django_kafka.topic.AvroDeserializer")
Expand All @@ -232,7 +208,7 @@ def test_key_deserializer(self, mock_avro_deserializer, mock_kafka_schema_client
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema.schema.schema_str,
schema_str=self.topic.key_schema,
)

@patch("django_kafka.topic.AvroSerializer")
Expand All @@ -244,7 +220,8 @@ def test_value_serializer(self, mock_avro_serializer, mock_kafka_schema_client):
# instance was initialized with right arguments
mock_avro_serializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema.schema.schema_str,
schema_str=self.topic.key_schema,
conf=self.topic.serializer_conf,
)

@patch("django_kafka.topic.AvroDeserializer")
Expand All @@ -256,5 +233,5 @@ def test_value_deserializer(self, mock_avro_deserializer, mock_kafka_schema_clie
# instance was initialized with right arguments
mock_avro_deserializer.assert_called_once_with(
mock_kafka_schema_client,
schema_str=self.topic.key_schema.schema.schema_str,
schema_str=self.topic.key_schema,
)
43 changes: 24 additions & 19 deletions django_kafka/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional

from confluent_kafka import cimpl
from confluent_kafka.schema_registry import Schema
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
from confluent_kafka.serialization import (
Deserializer,
Expand Down Expand Up @@ -48,7 +49,10 @@ def produce(self, value: any, **kwargs):
)

def deserialize(
self, value, field: MessageField, headers: Optional[dict | list] = None,
self,
value,
field: MessageField,
headers: Optional[dict | list] = None,
):
if field == MessageField.VALUE:
return self.value_deserializer(
Expand All @@ -62,7 +66,10 @@ def deserialize(
raise DjangoKafkaError(f"Unsupported deserialization field {field}.")

def serialize(
self, value, field: MessageField, headers: Optional[dict | list] = None,
self,
value,
field: MessageField,
headers: Optional[dict | list] = None,
):
if field == MessageField.VALUE:
return self.value_serializer(
Expand All @@ -84,38 +91,36 @@ def context(


class AvroTopic(Topic):
serializer_conf: dict = None

@property
def key_schema(self):
return kafka.schema_client.get_latest_version(f"{self.name}-key")
def key_schema(self) -> Optional[Schema | str]:
return None

@property
def value_schema(self):
return kafka.schema_client.get_latest_version(f"{self.name}-value")
def value_schema(self) -> Optional[Schema | str]:
return None

@property
def key_serializer(self):
return AvroSerializer(
kafka.schema_client,
schema_str=self.key_schema.schema.schema_str,
)

@property
def key_deserializer(self):
return AvroDeserializer(
kafka.schema_client,
schema_str=self.key_schema.schema.schema_str,
schema_str=self.key_schema,
conf=self.serializer_conf,
)

@property
def value_serializer(self):
return AvroSerializer(
kafka.schema_client,
schema_str=self.value_schema.schema.schema_str,
schema_str=self.value_schema,
conf=self.serializer_conf,
)

@property
def key_deserializer(self):
return AvroDeserializer(kafka.schema_client, schema_str=self.key_schema)

@property
def value_deserializer(self):
return AvroDeserializer(
kafka.schema_client,
schema_str=self.value_schema.schema.schema_str,
)
return AvroDeserializer(kafka.schema_client, schema_str=self.value_schema)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "django-kafka"
version = "0.0.3"
version = "0.0.4"
dependencies = [
"django>=4.0,<6.0",
"confluent-kafka[avro, schema-registry]==2.4.0"
Expand Down
Loading