diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index ce08c771b..39599a4e5 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -235,17 +235,20 @@ async def create_topic( ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" - await self._thread.create_topic( - topic, - partitions, - replication, - config=config, - timeout=timeout, - retention=retention, - compacting=compacting, - deleting=deleting, - ensure_created=ensure_created, - ) + if self.app.conf.topic_allow_declare: + await self._thread.create_topic( + topic, + partitions, + replication, + config=config, + timeout=timeout, + retention=retention, + compacting=compacting, + deleting=deleting, + ensure_created=ensure_created, + ) + else: + logger.warning(f"Topic creation disabled! Can't create topic {topic}") def _new_topicpartition(self, topic: str, partition: int) -> TP: return cast(TP, _TopicPartition(topic, partition)) @@ -1031,27 +1034,30 @@ async def create_topic( ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" - transport = cast(Transport, self.consumer.transport) - _consumer = self._ensure_consumer() - _retention = int(want_seconds(retention) * 1000.0) if retention else None - if len(topic) > TOPIC_LENGTH_MAX: - raise ValueError( - f"Topic name {topic!r} is too long (max={TOPIC_LENGTH_MAX})" + if self.app.conf.topic_allow_declare: + transport = cast(Transport, self.consumer.transport) + _consumer = self._ensure_consumer() + _retention = int(want_seconds(retention) * 1000.0) if retention else None + if len(topic) > TOPIC_LENGTH_MAX: + raise ValueError( + f"Topic name {topic!r} is too long (max={TOPIC_LENGTH_MAX})" + ) + await self.call_thread( + transport._create_topic, + self, + _consumer._client, + topic, + partitions, + replication, + config=config, + timeout=int(want_seconds(timeout) * 1000.0), + retention=_retention, + compacting=compacting, + deleting=deleting, + ensure_created=ensure_created, ) - await self.call_thread( - transport._create_topic, - self, - _consumer._client, - topic, - partitions, - replication, - config=config, - timeout=int(want_seconds(timeout) * 1000.0), - retention=_retention, - compacting=compacting, - deleting=deleting, - ensure_created=ensure_created, - ) + else: + logger.warning(f"Topic creation disabled! Can't create topic {topic}") def key_partition( self, topic: str, key: Optional[bytes], partition: Optional[int] = None @@ -1264,22 +1270,25 @@ async def create_topic( ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" - _retention = int(want_seconds(retention) * 1000.0) if retention else None - producer = self._ensure_producer() - await cast(Transport, self.transport)._create_topic( - self, - producer.client, - topic, - partitions, - replication, - config=config, - timeout=int(want_seconds(timeout) * 1000.0), - retention=_retention, - compacting=compacting, - deleting=deleting, - ensure_created=ensure_created, - ) - await producer.client.force_metadata_update() # Fixes #499 + if self.app.conf.topic_allow_declare: + _retention = int(want_seconds(retention) * 1000.0) if retention else None + producer = self._ensure_producer() + await cast(Transport, self.transport)._create_topic( + self, + producer.client, + topic, + partitions, + replication, + config=config, + timeout=int(want_seconds(timeout) * 1000.0), + retention=_retention, + compacting=compacting, + deleting=deleting, + ensure_created=ensure_created, + ) + await producer.client.force_metadata_update() # Fixes #499 + else: + logger.warning(f"Topic creation disabled! Can't create topic {topic}") def _ensure_producer(self) -> aiokafka.AIOKafkaProducer: if self._producer is None: