Skip to content

Commit

Permalink
Merge pull request #9 from RegioHelden/8-retry-dead-letter-queue
Browse files Browse the repository at this point in the history
feat: add retry and dead letter topic behaviour, refs #8
  • Loading branch information
stefan-cardnell-rh authored Sep 12, 2024
2 parents 3cc697e + 61c9b35 commit a050244
Show file tree
Hide file tree
Showing 30 changed files with 1,692 additions and 281 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.1.0"
current_version = "0.2.0"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changelog

## 0.2.0 (2024-09-05)
* Add decorator for topic retry and dead letter topic, see `README.md`
* Separate `Topic` class in to `TopicProducer` and `TopicConsumer` classes.
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Topic1(Topic):
# ... process values
```

`Topic` inherits from the `TopicProducer` and `TopicConsumer` classes. If you only need to consume or produce messages, inherit from one of these classes instead to avoid defining unnecessary abstract methods.

### Define a Consumer:

Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.
Expand Down Expand Up @@ -83,7 +85,7 @@ Or you can use `DjangoKafka` class API.
```python
from django_kafka import kafka

kafka.start_consumers()
kafka.run_consumers()
```
Check [Confluent Python Consumer](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#consumer) for API documentation.

Expand Down Expand Up @@ -115,6 +117,31 @@ DJANGO_KAFKA = {

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Non-Blocking Retries:

Add non-blocking retry behaviour to a topic by using the `retry` decorator:

```python
from django_kafka import kafka
from django_kafka.topic import Topic


@kafka.retry(max_retries=3, delay=120, include=[ValueError])
class RetryableTopic(Topic):
name = "topic"
...
```

When the consumption of a message in a retryable topic fails, the message is re-sent to a topic with a name combined of the consumer group id, the original topic name, a `.retry` suffix, and the retry number. Subsequent failed retries will then be sent to retry topics of incrementing retry number until the maximum attempts are reached, after which it will be sent to a dead letter topic suffixed by `.dlt`. So for a failed message in topic `topic` received by consumer group `group`, the expected topic sequence would be:

1. `topic`
2. `group.topic.retry.1`
3. `group.topic.retry.2`
4. `group.topic.retry.3`
5. `group.topic.dlt`

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Settings:

**Defaults:**
Expand Down Expand Up @@ -230,7 +257,7 @@ docker compose run --rm app bump-my-version bump <major|minor|patch>
```
This will update version major/minor/patch version respectively and add a tag for release.

- Push including new tag to publish the release to pypi.
- Once the changes are approved and merged, push the tag to publish the release to pypi.
```bash
git push origin tag <tag_name>
```
13 changes: 8 additions & 5 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry
from django_kafka.retry import RetrySettings

logger = logging.getLogger(__name__)

__version__ = "0.1.0"
__version__ = "0.2.0"

__all__ = [
"autodiscover",
Expand All @@ -28,6 +29,7 @@ def autodiscover():

class DjangoKafka:
consumers = ConsumersRegistry()
retry = RetrySettings

@cached_property
def producer(self) -> Producer:
Expand All @@ -45,14 +47,15 @@ def schema_client(self) -> SchemaRegistryClient:

return SchemaRegistryClient(settings.SCHEMA_REGISTRY)

def start_consumer(self, consumer: str):
self.consumers[consumer]().start()
def run_consumer(self, consumer_key: str):
consumer = self.consumers[consumer_key]()
consumer.run()

def start_consumers(self, consumers: Optional[list[str]] = None):
def run_consumers(self, consumers: Optional[list[str]] = None):
consumers = consumers or list(self.consumers)
with Pool(processes=len(consumers)) as pool:
try:
pool.map(self.start_consumer, consumers)
pool.map(self.run_consumer, consumers)
except KeyboardInterrupt:
# Stops the worker processes immediately without completing
# outstanding work.
Expand Down
7 changes: 7 additions & 0 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
"GLOBAL_CONFIG": {},
"PRODUCER_CONFIG": {},
"CONSUMER_CONFIG": {},
"RETRY_CONSUMER_CONFIG": {
"auto.offset.reset": "earliest",
"enable.auto.offset.store": False,
"topic.metadata.refresh.interval.ms": 10000,
},
"RETRY_TOPIC_SUFFIX": "retry",
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
}
Expand Down
158 changes: 111 additions & 47 deletions django_kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
import logging
import traceback
from pydoc import locate
from typing import Optional
from typing import TYPE_CHECKING, Optional

from confluent_kafka import Consumer as ConfluentConsumer
from confluent_kafka import cimpl

from django_kafka.conf import settings
from django_kafka.topic import Topic
from django_kafka.exceptions import DjangoKafkaError

if TYPE_CHECKING:
from django_kafka.topic import TopicConsumer

logger = logging.getLogger(__name__)


class Topics(dict):
def __init__(self, *topics: Topic):
for topic in topics:
self[topic.name] = topic
class Topics:
_topic_consumers: tuple["TopicConsumer", ...]
_match: dict[str, "TopicConsumer"]

def __init__(self, *topic_consumers: "TopicConsumer"):
self._topic_consumers = topic_consumers
self._match: dict[str, "TopicConsumer"] = {}

def get(self, topic_name: str) -> "TopicConsumer":
if topic_name not in self._match:
topic_consumer = next((t for t in self if t.matches(topic_name)), None)
if not topic_consumer:
raise DjangoKafkaError(f"No topic registered for `{topic_name}`")
self._match[topic_name] = topic_consumer

return self._match[topic_name]

@property
def names(self) -> list[str]:
return [topic.name for topic in self]

def __iter__(self):
yield from self._topic_consumers


class Consumer:
Expand All @@ -29,67 +52,108 @@ class Consumer:
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-consumer
"""

topics: Topics[str, Topic]
topics: Topics
config: dict

polling_freq = settings.POLLING_FREQUENCY
default_logger = logger
default_error_handler = settings.ERROR_HANDLER

def __init__(self, config: Optional[dict] = None, **kwargs):
kwargs.setdefault("logger", self.default_logger)
kwargs.setdefault("error_cb", locate(self.default_error_handler)())
def __init__(self):
self.config = self.build_config()
self._consumer = ConfluentConsumer(self.config)

self.config = {
def __getattr__(self, name):
"""proxy consumer methods."""
return getattr(self._consumer, name)

@classmethod
def build_config(cls):
return {
"client.id": settings.CLIENT_ID,
**settings.GLOBAL_CONFIG,
**settings.CONSUMER_CONFIG,
**getattr(self, "config", {}),
**(config or {}),
"logger": cls.default_logger,
"error_cb": locate(cls.default_error_handler)(),
**getattr(cls, "config", {}),
}

self._consumer = ConfluentConsumer(self.config, **kwargs)
@property
def group_id(self) -> str:
return self.config["group.id"]

def __getattr__(self, name):
"""proxy consumer methods."""
if name not in {"config"}:
# For cases when `Consumer.config` is not set and
# `getattr(self, "config", {})` is called on `__init__`,
# the initialization will fail because `_consumer` is not yet set.
return getattr(self._consumer, name)
raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'")
def commit_offset(self, msg: cimpl.Message):
if not self.config.get("enable.auto.offset.store"):
# Store the offset associated with msg to a local cache.
# Stored offsets are committed to Kafka by a background
# thread every 'auto.commit.interval.ms'.
# Explicitly storing offsets after processing gives at-least once semantics.
self.store_offsets(msg)

def start(self):
# define topics
self.subscribe(topics=list(self.topics))
while True:
# poll every self.polling_freq seconds
if msg := self.poll(timeout=self.polling_freq):
self.process_message(msg)
def retry_msg(self, msg: cimpl.Message, exc: Exception) -> bool:
from django_kafka.retry.topic import RetryTopicProducer

def stop(self):
self.close()
topic_consumer = self.get_topic_consumer(msg)
if not topic_consumer.retry_settings:
return False

return RetryTopicProducer(
group_id=self.group_id,
retry_settings=topic_consumer.retry_settings,
msg=msg,
).retry(exc=exc)

def dead_letter_msg(self, msg: cimpl.Message, exc: Exception):
from django_kafka.dead_letter.topic import DeadLetterTopicProducer

DeadLetterTopicProducer(group_id=self.group_id, msg=msg).produce_for(
header_message=str(exc),
header_detail=traceback.format_exc(),
)

def handle_exception(self, msg: cimpl.Message, exc: Exception):
retried = self.retry_msg(msg, exc)
if not retried:
self.dead_letter_msg(msg, exc)
self.log_error(exc)

def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer":
return self.topics.get(topic_name=msg.topic())

def log_error(self, error):
logger.error(error, exc_info=True)

def process_message(self, msg: cimpl.Message):
if msg_error := msg.error():
self.handle_error(msg_error)
self.log_error(msg_error)
return

try:
self.topics[msg.topic()].consume(msg)
# ruff: noqa: BLE001 (we do not want consumer to stop if message processing is failing in any circumstances)
except Exception as error:
self.handle_error(error)
else:
self.commit_offset(msg)
self.get_topic_consumer(msg).consume(msg)
# ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances)
except Exception as exc:
self.handle_exception(msg, exc)

def commit_offset(self, msg: cimpl.Message):
if not self.config.get("enable.auto.offset.store"):
# Store the offset associated with msg to a local cache.
# Stored offsets are committed to Kafka by a background
# thread every 'auto.commit.interval.ms'.
# Explicitly storing offsets after processing gives at-least once semantics.
self.store_offsets(msg)
self.commit_offset(msg)

def handle_error(self, error):
logger.error(error, exc_info=True)
def poll(self) -> Optional[cimpl.Message]:
# poll for self.polling_freq seconds
return self._consumer.poll(timeout=self.polling_freq)

def start(self):
self.subscribe(topics=self.topics.names)

def run(self):
try:
self.start()
while True:
if msg := self.poll():
self.process_message(msg)
except Exception as exc:
self.log_error(exc)
raise
finally:
self.stop()

def stop(self):
self.close()
Empty file.
6 changes: 6 additions & 0 deletions django_kafka/dead_letter/headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import StrEnum


class DeadLetterHeader(StrEnum):
MESSAGE = "DEAD_LETTER_MESSAGE"
DETAIL = "DEAD_LETTER_DETAIL"
44 changes: 44 additions & 0 deletions django_kafka/dead_letter/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import re
from typing import TYPE_CHECKING

from django_kafka import settings
from django_kafka.dead_letter.headers import DeadLetterHeader
from django_kafka.retry.topic import RetryTopicProducer
from django_kafka.serialization import NoOpSerializer
from django_kafka.topic import TopicProducer

if TYPE_CHECKING:
from confluent_kafka import cimpl


class DeadLetterTopicProducer(TopicProducer):
key_serializer = NoOpSerializer
value_serializer = NoOpSerializer

def __init__(self, group_id: str, msg: "cimpl.Message"):
self.group_id = group_id
self.msg = msg
super().__init__()

@classmethod
def suffix(cls):
return settings.DEAD_LETTER_TOPIC_SUFFIX

@property
def name(self) -> str:
topic = self.msg.topic()

if re.search(RetryTopicProducer.pattern(), topic):
return re.sub(RetryTopicProducer.pattern(), self.suffix(), topic)
return f"{self.group_id}.{topic}.{self.suffix()}"

def produce_for(self, header_message, header_detail):
headers = [
(DeadLetterHeader.MESSAGE, header_message),
(DeadLetterHeader.DETAIL, header_detail),
]
self.produce(
key=self.msg.key(),
value=self.msg.value(),
headers=headers,
)
2 changes: 1 addition & 1 deletion django_kafka/management/commands/kafka_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ def add_arguments(self, parser):
)

def handle(self, consumers: Optional[list[str]] = None, *args, **options):
kafka.start_consumers(consumers)
kafka.run_consumers(consumers)
Loading

0 comments on commit a050244

Please sign in to comment.