From caf80963547d4c3907f7ddc8979b2fceb2aaf283 Mon Sep 17 00:00:00 2001 From: Stefan Cardnell Date: Thu, 17 Oct 2024 16:27:16 +0200 Subject: [PATCH] feat: add producer.suppress for global message suppression, refs #27 --- CHANGELOG.md | 5 + README.md | 87 ++++++++++---- django_kafka/{ => connect}/models.py | 28 ++--- django_kafka/consumer.py | 7 +- django_kafka/producer.py | 52 ++++++++- django_kafka/retry/consumer.py | 4 +- .../tests/{ => connect}/test_models.py | 8 +- django_kafka/tests/retry/test_consumer.py | 3 +- django_kafka/tests/test_producers.py | 107 ++++++++++++++++++ django_kafka/tests/topic/test_model.py | 8 +- django_kafka/topic/model.py | 4 +- 11 files changed, 265 insertions(+), 48 deletions(-) rename django_kafka/{ => connect}/models.py (68%) rename django_kafka/tests/{ => connect}/test_models.py (96%) create mode 100644 django_kafka/tests/test_producers.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ec7a25c..77fa219 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.5.2 (2024-10-17) +* Added `producer.suppress` decorator. +* Renamed `KafkaSkipModel` to `KafkaConnectSkipModel`. +* Renamed `KafkaConnectSkipQueryset` to `KafkaConnectSkipQueryset` + ## 0.5.1 (2024-10-16) * `ModelTopicConsumer.sync` returns now the results of the `update_or_create` method. * Add `days_from_epoch_to_date` function to convert `io.debezium.time.Date` to python `datetime.date`. diff --git a/README.md b/README.md index 4cec39f..da0c4a8 100644 --- a/README.md +++ b/README.md @@ -203,7 +203,7 @@ When the consumption of a message in a retryable topic fails, the message is re- When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic. -## Connectors +## Connectors: Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`. @@ -378,56 +378,105 @@ Prefix which will be added to the connector name when publishing the connector. Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`. -## Bidirectional data sync with no infinite event loop. +## Suppressing producers: -**For example, you want to keep a User table in sync in multiple systems.** +`django-kafka` provides two ways to suppress producers: + +### `producer.suppress` -The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time. -- Producer should respect `kafka_skip=True` and do not produce new events when `True`. -- Any updates to the User table, which are happening outside the consumer, should set `kafka_skip=False` which will allow the producer to create an event again. +Use the `producer.suppress` decorator and context manager to suppress the producing of messages generated by the `Producer` class during a particular context. -This way the chronology is strictly kept and the infinite events loop is avoided. +```python +from django_kafka import producer -The disadvantage is that each system will still consume its own message. -#### There are 2 classes for django Model and for QuerySet: +@producer.suppress(["topic1"]) # suppress producers to topic1 +def my_function(): + ... + + +def my_function_two(): + with producer.suppress(["topic1"]): # suppress producers to topic1 + ... +``` + +`producer.suppress` can take a list of topic names, or no arguments to suppress producers of all topics. -#### KafkaSkipModel -Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when updating model instances (if not explicitly set). +Use `producer.unsuppress` to deactivate any set suppression during a specific context. + + +### `KafkaConnectSkipModel.kafka_skip` + +Pythonic suppression methods will not suffice when using Kafka Connect to directly produce events from database changes. In this scenario, it's more appropriate to add a flag to the model database table which indicates if the connector should generate an event. Two classes are provided subclassing Django's Model and QuerySet to manage this flag: + +#### KafkaConnectSkipModel +Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when saving instances (if not explicitly set). Usage: + ```python from django.contrib.auth.base_user import AbstractBaseUser from django.contrib.auth.models import PermissionsMixin -from django_kafka.models import KafkaSkipModel +from django_kafka.connect.models import KafkaConnectSkipModel + -class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser): +class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser): # ... ``` -#### KafkaSkipQueryset -If you have defined a custom manager on your model then you should inherit it from `KafkaSkipQueryset`. It adds `kafka_skip=False` when using `update` method. +#### KafkaConnectSkipQueryset +If you have defined a custom manager on your model then you should inherit it from `KafkaConnectSkipQueryset`. It adds `kafka_skip=False` when using the `update` method. **Note:** `kafka_skip=False` is only set when it's not provided to the `update` kwargs. E.g. `User.objects.update(first_name="John", kafka_skip=True)` will not be changed to `kafka_skip=False`. Usage: + ```python from django.contrib.auth.base_user import AbstractBaseUser from django.contrib.auth.base_user import BaseUserManager from django.contrib.auth.models import PermissionsMixin -from django_kafka.models import KafkaSkipModel, KafkaSkipQueryset +from django_kafka.connect.models import KafkaConnectSkipModel, KafkaConnectSkipQueryset -class UserManager(BaseUserManager.from_queryset(KafkaSkipQueryset)): - # ... +class UserManager(BaseUserManager.from_queryset(KafkaConnectSkipQueryset)): -class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser): +# ... + + +class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser): # ... objects = UserManager() ``` +## Bidirectional data sync with no infinite event loop: + +**For example, you want to keep a User table in sync in multiple systems.** + +### Infinite loop + +You are likely to encounter infinite message generation when syncing data between multiple systems. Message suppression helps overcome this issue. + +For purely pythonic producers and consumers, the `produce.suppress` decorator can be used suppress to messages produced during consumption. If you wish to do this globally for all consuming, use the decorator in your `Consumer` class: + +```python +from django_kafka import producer +from django_kafka.consumer import Consumer + +class MyConsumer(Consumer): + + @producer.suppress + def consume(self, *args, **kwargs): + super().consume(*args, **kwargs) +``` + +When producing with Kafka Connect, the `KafkaConnectSkipModel` provides the `kafka_skip` flag; the record should be manually marked with `kafka_skip=True` at consumption time and the connector should be configured not to send events when this flag is set. + +### Global message ordering + +To maintain global message ordering between systems, all events for the same database table should be sent to the same topic. The disadvantage is that each system will still consume its own message. + ## Making a new release - [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases. diff --git a/django_kafka/models.py b/django_kafka/connect/models.py similarity index 68% rename from django_kafka/models.py rename to django_kafka/connect/models.py index 411e344..f2c26e0 100644 --- a/django_kafka/models.py +++ b/django_kafka/connect/models.py @@ -2,36 +2,36 @@ from django.utils.translation import gettext_lazy as _ -class KafkaSkipQueryset(models.QuerySet): +class KafkaConnectSkipQueryset(models.QuerySet): def update(self, **kwargs) -> int: kwargs.setdefault("kafka_skip", False) return super().update(**kwargs) -class KafkaSkipModel(models.Model): +class KafkaConnectSkipModel(models.Model): """ - For models (tables) which are synced with other database(s) in both directions. + For models (tables) which have Kafka Connect source connectors attached and require + a flag to suppress message production. - Every update which happens from within the system should set `kafka_skip=False`, - global producer (kafka connect, django post_save signal, etc.) will then create - a new event. + The Kafka Connect connector should filter out events based on the kafka_skip flag + provided in this model. - When db update comes from the consumed event, then the row should be manually - marked for skip `kafka_skip=True`, and kafka connector or global python producer - should not generate a new one by filtering it out based on `kafka_skip` field. + Any update to the model instance will reset the kafka_skip flag to False, if not + explicitly set. + + This flag can help overcome infinite event loops during bidirectional data sync when + using Kafka. See README.md for more information. """ kafka_skip = models.BooleanField( _("Kafka skip"), help_text=_( - "Wont generate an event if `True`." - "\nThis field is used to filter out the events to break the infinite loop" - " of message generation when synchronizing 2+ databases." - "\nGets reset to False on .save() method call.", + "Used by Kafka Connect to suppress event creation." + "\nGets reset to False on .save() method call, unless explicitly set.", ), default=False, ) - objects = KafkaSkipQueryset.as_manager() + objects = KafkaConnectSkipQueryset.as_manager() class Meta: abstract = True diff --git a/django_kafka/consumer.py b/django_kafka/consumer.py index d9b24cd..5347786 100644 --- a/django_kafka/consumer.py +++ b/django_kafka/consumer.py @@ -21,7 +21,7 @@ class Topics: def __init__(self, *topic_consumers: "TopicConsumer"): self._topic_consumers = topic_consumers - self._match: dict[str, "TopicConsumer"] = {} + self._match: dict[str, TopicConsumer] = {} def get(self, topic_name: str) -> "TopicConsumer": if topic_name not in self._match: @@ -123,13 +123,16 @@ def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer": def log_error(self, error): logger.error(error, exc_info=True) + def consume(self, msg): + self.get_topic_consumer(msg).consume(msg) + def process_message(self, msg: cimpl.Message): if msg_error := msg.error(): self.log_error(msg_error) return try: - self.get_topic_consumer(msg).consume(msg) + self.consume(msg) # ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances) except Exception as exc: self.handle_exception(msg, exc) diff --git a/django_kafka/producer.py b/django_kafka/producer.py index 9a91f2e..10ce1f8 100644 --- a/django_kafka/producer.py +++ b/django_kafka/producer.py @@ -1,6 +1,8 @@ import logging +from contextlib import ContextDecorator +from contextvars import ContextVar from pydoc import locate -from typing import Optional +from typing import Callable, Optional from confluent_kafka import Producer as ConfluentProducer @@ -41,6 +43,10 @@ def __init__(self, config: Optional[dict] = None, **kwargs): **kwargs, ) + def produce(self, name, *args, **kwargs): + if not Suppression.active(name): + self._producer.produce(name, *args, **kwargs) + def __getattr__(self, name): """ proxy producer methods. @@ -51,3 +57,47 @@ def __getattr__(self, name): # the initialization will fail because `_consumer` is not yet set. return getattr(self._producer, name) raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'") + + +class Suppression(ContextDecorator): + """context manager to help suppress producing messages to desired Kafka topics""" + + _var = ContextVar(f"{__name__}.suppression", default=[]) + + @classmethod + def active(cls, topic: str): + """returns if suppression is enabled for the given topic""" + topics = cls._var.get() + if topics is None: + return True # all topics + return topic in topics + + def __init__(self, topics: Optional[list[str]], deactivate=False): + current = self._var.get() + if deactivate: + self.topics = [] + elif topics is None or current is None: + self.topics = None # indicates all topics + elif isinstance(topics, list): + self.topics = current + topics + else: + raise ValueError(f"invalid producer suppression setting {topics}") + + def __enter__(self): + self.token = self._var.set(self.topics) + return self + + def __exit__(self, *args, **kwargs): + self._var.reset(self.token) + + +def suppress(topics: Optional[Callable | list[str]] = None): + if callable(topics): + return Suppression(None)(topics) + return Suppression(topics) + + +def unsuppress(fn: Optional[Callable] = None): + if fn: + return Suppression(None, deactivate=True)(fn) + return Suppression(None, deactivate=True) diff --git a/django_kafka/retry/consumer.py b/django_kafka/retry/consumer.py index 8de1eb8..83b2a14 100644 --- a/django_kafka/retry/consumer.py +++ b/django_kafka/retry/consumer.py @@ -44,13 +44,13 @@ def build(cls, consumer_cls: Type["Consumer"]) -> Optional[Type["RetryConsumer"] return type[RetryConsumer]( f"{consumer_cls.__name__}Retry", - (cls,), + (consumer_cls, cls), { + "topics": RetryTopics(group_id, *retryable_tcs), "config": { **getattr(cls, "config", {}), "group.id": f"{group_id}.retry", }, - "topics": RetryTopics(group_id, *retryable_tcs), }, ) diff --git a/django_kafka/tests/test_models.py b/django_kafka/tests/connect/test_models.py similarity index 96% rename from django_kafka/tests/test_models.py rename to django_kafka/tests/connect/test_models.py index bac0dcf..813b262 100644 --- a/django_kafka/tests/test_models.py +++ b/django_kafka/tests/connect/test_models.py @@ -1,12 +1,12 @@ from unittest.mock import patch -from django_kafka.models import KafkaSkipModel +from django_kafka.connect.models import KafkaConnectSkipModel from django_kafka.tests.models import AbstractModelTestCase -class KafkaSkipModelTestCase(AbstractModelTestCase): - abstract_model = KafkaSkipModel - model: type[KafkaSkipModel] +class KafkaConnectSkipModelTestCase(AbstractModelTestCase): + abstract_model = KafkaConnectSkipModel + model: type[KafkaConnectSkipModel] def test_save__direct_instance_respects_set_kafka_skip(self): """test `save` on directly created instances will not ignore set kafka_skip""" diff --git a/django_kafka/tests/retry/test_consumer.py b/django_kafka/tests/retry/test_consumer.py index bf7d7cb..361fa93 100644 --- a/django_kafka/tests/retry/test_consumer.py +++ b/django_kafka/tests/retry/test_consumer.py @@ -99,10 +99,11 @@ class SomeRetryConsumer(RetryConsumer): def test_build(self): consumer_cls = self._get_retryable_consumer_cls() + retry_consumer_cls = RetryConsumer.build(consumer_cls) self.assertTrue(issubclass(retry_consumer_cls, RetryConsumer)) - + self.assertTrue(issubclass(retry_consumer_cls, consumer_cls)) self.assertEqual( retry_consumer_cls.config["group.id"], f"{consumer_cls.build_config()['group.id']}.retry", diff --git a/django_kafka/tests/test_producers.py b/django_kafka/tests/test_producers.py new file mode 100644 index 0000000..0a72df7 --- /dev/null +++ b/django_kafka/tests/test_producers.py @@ -0,0 +1,107 @@ +from unittest import mock +from unittest.mock import call + +from django.test import TestCase + +from django_kafka.producer import Producer, suppress, unsuppress + + +@mock.patch("django_kafka.producer.ConfluentProducer") +class ProducerSuppressTestCase(TestCase): + def test_suppress_all(self, mock_confluent_producer): + producer = Producer() + + with suppress(): + producer.produce("topicA") + + mock_confluent_producer.return_value.produce.assert_not_called() + + def test_suppress_topic_list(self, mock_confluent_producer): + producer = Producer() + + with suppress(["topicA"]): + producer.produce("topicA") + producer.produce("topicB") + + mock_confluent_producer.return_value.produce.assert_called_once_with("topicB") + + def test_suppress_nested_usage(self, mock_confluent_producer): + """tests that message suppression lists are combined with later contexts""" + producer = Producer() + + with suppress(["topicA"]): + with suppress(["topicB"]): + producer.produce("topicA") + producer.produce("topicB") + producer.produce("topicC") + + mock_confluent_producer.return_value.produce.assert_called_once_with("topicC") + + def test_suppress_nested_usage_all(self, mock_confluent_producer): + """test that global message suppression is maintained by later contexts""" + producer = Producer() + + with suppress(): + with suppress(["topicA"]): + producer.produce("topicB") + + mock_confluent_producer.return_value.produce.assert_not_called() + + def test_unsuppress(self, mock_confluent_producer): + producer = Producer() + + with suppress(["topicA"]): + with unsuppress(): + producer.produce("topicA") + + mock_confluent_producer.return_value.produce.assert_called_once_with("topicA") + + def test_unsuppress__decorator(self, mock_confluent_producer): + producer = Producer() + + @suppress(["topicA"]) + @unsuppress() + def _produce_empty_args(): + producer.produce("topicA") + + @suppress(["topicA"]) + @unsuppress + def _produce_no_args(): + producer.produce("topicA") + + _produce_empty_args() + _produce_no_args() + + mock_confluent_producer.return_value.produce.assert_has_calls( + [call("topicA"), call("topicA")], + ) + + def test_suppress_resets(self, mock_confluent_producer): + producer = Producer() + + with suppress(["topicA", "topicB"]): + producer.produce("topicA") + producer.produce("topicB") + + mock_confluent_producer.return_value.produce.assert_called_once_with("topicB") + + def test_suppress_usable_as_decorator(self, mock_confluent_producer): + producer = Producer() + + @suppress(["topicA"]) + def _produce_args(): + producer.produce("topicA") + + @suppress() + def _produce_empty_args(): + producer.produce("topicA") + + @suppress + def _produce_no_args(): + producer.produce("topicA") + + _produce_args() + _produce_empty_args() + _produce_no_args() + + mock_confluent_producer.return_value.produce.assert_not_called() diff --git a/django_kafka/tests/topic/test_model.py b/django_kafka/tests/topic/test_model.py index 63f20f1..a3db550 100644 --- a/django_kafka/tests/topic/test_model.py +++ b/django_kafka/tests/topic/test_model.py @@ -3,8 +3,8 @@ from django.db.models import Model from django.test import TestCase +from django_kafka.connect.models import KafkaConnectSkipModel from django_kafka.exceptions import DjangoKafkaError -from django_kafka.models import KafkaSkipModel from django_kafka.topic.model import ModelTopicConsumer @@ -32,10 +32,12 @@ def test_get_defaults(self): def test_get_defaults__adds_kafka_skip(self): topic_consumer = self._get_model_topic_consumer() - class KafkaSkip(KafkaSkipModel): + class KafkaConnectSkip(KafkaConnectSkipModel): pass - defaults = topic_consumer.get_defaults(model=KafkaSkip, value={"name": 1}) + defaults = topic_consumer.get_defaults( + model=KafkaConnectSkip, value={"name": 1} + ) self.assertEqual(defaults, {"name": 1, "kafka_skip": True}) diff --git a/django_kafka/topic/model.py b/django_kafka/topic/model.py index babb111..4f86cf9 100644 --- a/django_kafka/topic/model.py +++ b/django_kafka/topic/model.py @@ -6,8 +6,8 @@ from django.core.exceptions import ObjectDoesNotExist from django.db.models import Model +from django_kafka.connect.models import KafkaConnectSkipModel from django_kafka.exceptions import DjangoKafkaError -from django_kafka.models import KafkaSkipModel from django_kafka.topic import TopicConsumer @@ -30,7 +30,7 @@ def get_defaults(self, model, value) -> dict: else: defaults[attr] = attr_value - if issubclass(model, KafkaSkipModel): + if issubclass(model, KafkaConnectSkipModel): defaults["kafka_skip"] = True return defaults