Skip to content

Commit

Permalink
Add more logic to aiokafka driver to prevent topics from being created (
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored Nov 4, 2022
1 parent c1398b8 commit 85a649a
Showing 1 changed file with 56 additions and 47 deletions.
103 changes: 56 additions & 47 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,20 @@ async def create_topic(
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
await self._thread.create_topic(
topic,
partitions,
replication,
config=config,
timeout=timeout,
retention=retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
if self.app.conf.topic_allow_declare:
await self._thread.create_topic(
topic,
partitions,
replication,
config=config,
timeout=timeout,
retention=retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")

def _new_topicpartition(self, topic: str, partition: int) -> TP:
return cast(TP, _TopicPartition(topic, partition))
Expand Down Expand Up @@ -1031,27 +1034,30 @@ async def create_topic(
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
transport = cast(Transport, self.consumer.transport)
_consumer = self._ensure_consumer()
_retention = int(want_seconds(retention) * 1000.0) if retention else None
if len(topic) > TOPIC_LENGTH_MAX:
raise ValueError(
f"Topic name {topic!r} is too long (max={TOPIC_LENGTH_MAX})"
if self.app.conf.topic_allow_declare:
transport = cast(Transport, self.consumer.transport)
_consumer = self._ensure_consumer()
_retention = int(want_seconds(retention) * 1000.0) if retention else None
if len(topic) > TOPIC_LENGTH_MAX:
raise ValueError(
f"Topic name {topic!r} is too long (max={TOPIC_LENGTH_MAX})"
)
await self.call_thread(
transport._create_topic,
self,
_consumer._client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
await self.call_thread(
transport._create_topic,
self,
_consumer._client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")

def key_partition(
self, topic: str, key: Optional[bytes], partition: Optional[int] = None
Expand Down Expand Up @@ -1264,22 +1270,25 @@ async def create_topic(
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
_retention = int(want_seconds(retention) * 1000.0) if retention else None
producer = self._ensure_producer()
await cast(Transport, self.transport)._create_topic(
self,
producer.client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
await producer.client.force_metadata_update() # Fixes #499
if self.app.conf.topic_allow_declare:
_retention = int(want_seconds(retention) * 1000.0) if retention else None
producer = self._ensure_producer()
await cast(Transport, self.transport)._create_topic(
self,
producer.client,
topic,
partitions,
replication,
config=config,
timeout=int(want_seconds(timeout) * 1000.0),
retention=_retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
await producer.client.force_metadata_update() # Fixes #499
else:
logger.warning(f"Topic creation disabled! Can't create topic {topic}")

def _ensure_producer(self) -> aiokafka.AIOKafkaProducer:
if self._producer is None:
Expand Down

0 comments on commit 85a649a

Please sign in to comment.