Skip to content

Commit

Permalink
feat: add producer.suppress for global message suppression, refs #27
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-cardnell-rh committed Oct 18, 2024
1 parent 3a075b8 commit caf8096
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 48 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.5.2 (2024-10-17)
* Added `producer.suppress` decorator.
* Renamed `KafkaSkipModel` to `KafkaConnectSkipModel`.
* Renamed `KafkaConnectSkipQueryset` to `KafkaConnectSkipQueryset`

## 0.5.1 (2024-10-16)
* `ModelTopicConsumer.sync` returns now the results of the `update_or_create` method.
* Add `days_from_epoch_to_date` function to convert `io.debezium.time.Date` to python `datetime.date`.
Expand Down
87 changes: 68 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ When the consumption of a message in a retryable topic fails, the message is re-

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Connectors
## Connectors:

Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`.

Expand Down Expand Up @@ -378,56 +378,105 @@ Prefix which will be added to the connector name when publishing the connector.
Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`.
## Bidirectional data sync with no infinite event loop.
## Suppressing producers:
**For example, you want to keep a User table in sync in multiple systems.**
`django-kafka` provides two ways to suppress producers:
### `producer.suppress`
The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time.
- Producer should respect `kafka_skip=True` and do not produce new events when `True`.
- Any updates to the User table, which are happening outside the consumer, should set `kafka_skip=False` which will allow the producer to create an event again.
Use the `producer.suppress` decorator and context manager to suppress the producing of messages generated by the `Producer` class during a particular context.
This way the chronology is strictly kept and the infinite events loop is avoided.
```python
from django_kafka import producer
The disadvantage is that each system will still consume its own message.
#### There are 2 classes for django Model and for QuerySet:
@producer.suppress(["topic1"]) # suppress producers to topic1
def my_function():
...
def my_function_two():
with producer.suppress(["topic1"]): # suppress producers to topic1
...
```
`producer.suppress` can take a list of topic names, or no arguments to suppress producers of all topics.
#### KafkaSkipModel
Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when updating model instances (if not explicitly set).
Use `producer.unsuppress` to deactivate any set suppression during a specific context.
### `KafkaConnectSkipModel.kafka_skip`
Pythonic suppression methods will not suffice when using Kafka Connect to directly produce events from database changes. In this scenario, it's more appropriate to add a flag to the model database table which indicates if the connector should generate an event. Two classes are provided subclassing Django's Model and QuerySet to manage this flag:
#### KafkaConnectSkipModel
Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when saving instances (if not explicitly set).
Usage:
```python
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.models import PermissionsMixin
from django_kafka.models import KafkaSkipModel
from django_kafka.connect.models import KafkaConnectSkipModel
class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser):
class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
```
#### KafkaSkipQueryset
If you have defined a custom manager on your model then you should inherit it from `KafkaSkipQueryset`. It adds `kafka_skip=False` when using `update` method.
#### KafkaConnectSkipQueryset
If you have defined a custom manager on your model then you should inherit it from `KafkaConnectSkipQueryset`. It adds `kafka_skip=False` when using the `update` method.
**Note:** `kafka_skip=False` is only set when it's not provided to the `update` kwargs. E.g. `User.objects.update(first_name="John", kafka_skip=True)` will not be changed to `kafka_skip=False`.
Usage:
```python
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
from django.contrib.auth.models import PermissionsMixin
from django_kafka.models import KafkaSkipModel, KafkaSkipQueryset
from django_kafka.connect.models import KafkaConnectSkipModel, KafkaConnectSkipQueryset
class UserManager(BaseUserManager.from_queryset(KafkaSkipQueryset)):
# ...
class UserManager(BaseUserManager.from_queryset(KafkaConnectSkipQueryset)):
class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
objects = UserManager()
```
## Bidirectional data sync with no infinite event loop:
**For example, you want to keep a User table in sync in multiple systems.**
### Infinite loop
You are likely to encounter infinite message generation when syncing data between multiple systems. Message suppression helps overcome this issue.
For purely pythonic producers and consumers, the `produce.suppress` decorator can be used suppress to messages produced during consumption. If you wish to do this globally for all consuming, use the decorator in your `Consumer` class:
```python
from django_kafka import producer
from django_kafka.consumer import Consumer
class MyConsumer(Consumer):
@producer.suppress
def consume(self, *args, **kwargs):
super().consume(*args, **kwargs)
```
When producing with Kafka Connect, the `KafkaConnectSkipModel` provides the `kafka_skip` flag; the record should be manually marked with `kafka_skip=True` at consumption time and the connector should be configured not to send events when this flag is set.
### Global message ordering
To maintain global message ordering between systems, all events for the same database table should be sent to the same topic. The disadvantage is that each system will still consume its own message.
## Making a new release
- [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases.
Expand Down
28 changes: 14 additions & 14 deletions django_kafka/models.py → django_kafka/connect/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@
from django.utils.translation import gettext_lazy as _


class KafkaSkipQueryset(models.QuerySet):
class KafkaConnectSkipQueryset(models.QuerySet):
def update(self, **kwargs) -> int:
kwargs.setdefault("kafka_skip", False)
return super().update(**kwargs)


class KafkaSkipModel(models.Model):
class KafkaConnectSkipModel(models.Model):
"""
For models (tables) which are synced with other database(s) in both directions.
For models (tables) which have Kafka Connect source connectors attached and require
a flag to suppress message production.
Every update which happens from within the system should set `kafka_skip=False`,
global producer (kafka connect, django post_save signal, etc.) will then create
a new event.
The Kafka Connect connector should filter out events based on the kafka_skip flag
provided in this model.
When db update comes from the consumed event, then the row should be manually
marked for skip `kafka_skip=True`, and kafka connector or global python producer
should not generate a new one by filtering it out based on `kafka_skip` field.
Any update to the model instance will reset the kafka_skip flag to False, if not
explicitly set.
This flag can help overcome infinite event loops during bidirectional data sync when
using Kafka. See README.md for more information.
"""

kafka_skip = models.BooleanField(
_("Kafka skip"),
help_text=_(
"Wont generate an event if `True`."
"\nThis field is used to filter out the events to break the infinite loop"
" of message generation when synchronizing 2+ databases."
"\nGets reset to False on .save() method call.",
"Used by Kafka Connect to suppress event creation."
"\nGets reset to False on .save() method call, unless explicitly set.",
),
default=False,
)
objects = KafkaSkipQueryset.as_manager()
objects = KafkaConnectSkipQueryset.as_manager()

class Meta:
abstract = True
Expand Down
7 changes: 5 additions & 2 deletions django_kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Topics:

def __init__(self, *topic_consumers: "TopicConsumer"):
self._topic_consumers = topic_consumers
self._match: dict[str, "TopicConsumer"] = {}
self._match: dict[str, TopicConsumer] = {}

def get(self, topic_name: str) -> "TopicConsumer":
if topic_name not in self._match:
Expand Down Expand Up @@ -123,13 +123,16 @@ def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer":
def log_error(self, error):
logger.error(error, exc_info=True)

def consume(self, msg):
self.get_topic_consumer(msg).consume(msg)

def process_message(self, msg: cimpl.Message):
if msg_error := msg.error():
self.log_error(msg_error)
return

try:
self.get_topic_consumer(msg).consume(msg)
self.consume(msg)
# ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances)
except Exception as exc:
self.handle_exception(msg, exc)
Expand Down
52 changes: 51 additions & 1 deletion django_kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from contextlib import ContextDecorator
from contextvars import ContextVar
from pydoc import locate
from typing import Optional
from typing import Callable, Optional

from confluent_kafka import Producer as ConfluentProducer

Expand Down Expand Up @@ -41,6 +43,10 @@ def __init__(self, config: Optional[dict] = None, **kwargs):
**kwargs,
)

def produce(self, name, *args, **kwargs):
if not Suppression.active(name):
self._producer.produce(name, *args, **kwargs)

def __getattr__(self, name):
"""
proxy producer methods.
Expand All @@ -51,3 +57,47 @@ def __getattr__(self, name):
# the initialization will fail because `_consumer` is not yet set.
return getattr(self._producer, name)
raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'")


class Suppression(ContextDecorator):
"""context manager to help suppress producing messages to desired Kafka topics"""

_var = ContextVar(f"{__name__}.suppression", default=[])

@classmethod
def active(cls, topic: str):
"""returns if suppression is enabled for the given topic"""
topics = cls._var.get()
if topics is None:
return True # all topics
return topic in topics

def __init__(self, topics: Optional[list[str]], deactivate=False):
current = self._var.get()
if deactivate:
self.topics = []
elif topics is None or current is None:
self.topics = None # indicates all topics
elif isinstance(topics, list):
self.topics = current + topics
else:
raise ValueError(f"invalid producer suppression setting {topics}")

def __enter__(self):
self.token = self._var.set(self.topics)
return self

def __exit__(self, *args, **kwargs):
self._var.reset(self.token)


def suppress(topics: Optional[Callable | list[str]] = None):
if callable(topics):
return Suppression(None)(topics)
return Suppression(topics)


def unsuppress(fn: Optional[Callable] = None):
if fn:
return Suppression(None, deactivate=True)(fn)
return Suppression(None, deactivate=True)
4 changes: 2 additions & 2 deletions django_kafka/retry/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ def build(cls, consumer_cls: Type["Consumer"]) -> Optional[Type["RetryConsumer"]

return type[RetryConsumer](
f"{consumer_cls.__name__}Retry",
(cls,),
(consumer_cls, cls),
{
"topics": RetryTopics(group_id, *retryable_tcs),
"config": {
**getattr(cls, "config", {}),
"group.id": f"{group_id}.retry",
},
"topics": RetryTopics(group_id, *retryable_tcs),
},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from unittest.mock import patch

from django_kafka.models import KafkaSkipModel
from django_kafka.connect.models import KafkaConnectSkipModel
from django_kafka.tests.models import AbstractModelTestCase


class KafkaSkipModelTestCase(AbstractModelTestCase):
abstract_model = KafkaSkipModel
model: type[KafkaSkipModel]
class KafkaConnectSkipModelTestCase(AbstractModelTestCase):
abstract_model = KafkaConnectSkipModel
model: type[KafkaConnectSkipModel]

def test_save__direct_instance_respects_set_kafka_skip(self):
"""test `save` on directly created instances will not ignore set kafka_skip"""
Expand Down
3 changes: 2 additions & 1 deletion django_kafka/tests/retry/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ class SomeRetryConsumer(RetryConsumer):

def test_build(self):
consumer_cls = self._get_retryable_consumer_cls()

retry_consumer_cls = RetryConsumer.build(consumer_cls)

self.assertTrue(issubclass(retry_consumer_cls, RetryConsumer))

self.assertTrue(issubclass(retry_consumer_cls, consumer_cls))
self.assertEqual(
retry_consumer_cls.config["group.id"],
f"{consumer_cls.build_config()['group.id']}.retry",
Expand Down
Loading

0 comments on commit caf8096

Please sign in to comment.