diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 17c805f1..5c91ab4c 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -195,6 +195,8 @@ def _close_strategy() -> None: ) self.__processing_strategy = None self.__message = None # avoid leaking buffered messages across assignments + self.__is_paused = False + self._clear_backpressure() self.__metrics_buffer.incr_timing( "arroyo.consumer.shutdown.time", time.time() - start_close @@ -262,7 +264,7 @@ def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> Non If force is passed, commit immediately and do not throttle. This should be used during consumer shutdown where we do not want to wait before committing. """ - for (partition, offset) in offsets.items(): + for partition, offset in offsets.items(): self.__buffered_messages.pop(partition, offset - 1) self.__consumer.stage_offsets(offsets) @@ -306,6 +308,14 @@ def run(self) -> None: logger.info("Processor terminated") raise + def _clear_backpressure(self) -> None: + if self.__backpressure_timestamp is not None: + self.__metrics_buffer.incr_timing( + "arroyo.consumer.paused.time", + time.time() - self.__backpressure_timestamp, + ) + self.__backpressure_timestamp = None + def _handle_invalid_message(self, exc: InvalidMessage) -> None: # Do not "carry over" message if it is the invalid one. Every other # message should be re-submitted to the strategy. @@ -419,12 +429,7 @@ def _run_once(self) -> None: self.__is_paused = False # Clear backpressure timestamp if it is set - if self.__backpressure_timestamp is not None: - self.__metrics_buffer.incr_timing( - "arroyo.consumer.paused.time", - time.time() - self.__backpressure_timestamp, - ) - self.__backpressure_timestamp = None + self._clear_backpressure() self.__message = None else: