diff --git a/arroyo/processing/strategies/produce.py b/arroyo/processing/strategies/produce.py index 3b3b5a8a..cc9fef7d 100644 --- a/arroyo/processing/strategies/produce.py +++ b/arroyo/processing/strategies/produce.py @@ -87,7 +87,11 @@ def submit( future: Optional[Future[BrokerValue[TStrategyPayload]]] = None if not isinstance(message.payload, FilteredPayload): - future = self.__producer.produce(self.__topic, message.payload) + try: + future = self.__producer.produce(self.__topic, message.payload) + except BufferError as exc: + logger.exception(exc) + raise MessageRejected from exc self.__queue.append((message, future)) diff --git a/tests/processing/strategies/test_produce.py b/tests/processing/strategies/test_produce.py index 212ac70e..27959ce9 100644 --- a/tests/processing/strategies/test_produce.py +++ b/tests/processing/strategies/test_produce.py @@ -1,8 +1,11 @@ from unittest import mock +import pytest + from arroyo.backends.kafka import KafkaPayload from arroyo.backends.local.backend import LocalBroker from arroyo.backends.local.storages.memory import MemoryMessageStorage +from arroyo.processing.strategies.abstract import MessageRejected from arroyo.processing.strategies.produce import Produce from arroyo.types import Message, Partition, Topic, Value from arroyo.utils.clock import TestingClock @@ -19,7 +22,7 @@ def test_produce() -> None: producer = broker.get_producer() next_step = mock.Mock() - strategy = Produce(producer, result_topic, next_step) + strategy = Produce(producer, result_topic, next_step, 2) value = b'{"something": "something"}' data = KafkaPayload(None, value, []) @@ -41,4 +44,10 @@ def test_produce() -> None: strategy.poll() assert next_step.submit.call_count == 2 assert next_step.poll.call_count == 2 + + # Backpressure if buffer size = 2 exceeded + with pytest.raises(MessageRejected): + for _ in range(3): + strategy.submit(message) + strategy.join()