Skip to content

Commit

Permalink
fix(produce): Apply backpressure instead of crashing
Browse files Browse the repository at this point in the history
If we get local queue full, let's raise MessageRejected to slow down
the consumer.
  • Loading branch information
lynnagara committed Aug 18, 2023
1 parent 6286c79 commit 7a28935
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion arroyo/processing/strategies/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ def submit(
future: Optional[Future[BrokerValue[TStrategyPayload]]] = None

if not isinstance(message.payload, FilteredPayload):
future = self.__producer.produce(self.__topic, message.payload)
try:
future = self.__producer.produce(self.__topic, message.payload)
except BufferError as exc:
logger.exception(exc)
raise MessageRejected from exc

self.__queue.append((message, future))

Expand Down

0 comments on commit 7a28935

Please sign in to comment.