diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index c0001bad..17c805f1 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -338,7 +338,7 @@ def _run_once(self) -> None: message_carried_over = self.__message is not None - if self.__is_paused: + if message_carried_over: # If a message was carried over from the previous run, there are two reasons: # # * MessageRejected. the consumer should be paused and not @@ -346,11 +346,11 @@ def _run_once(self) -> None: # * InvalidMessage. the message should be resubmitted. # _handle_invalid_message is responsible for clearing out # self.__message if it was the invalid one. - if self.__consumer.poll(timeout=0) is not None: + if self.__is_paused and self.__consumer.poll(timeout=0) is not None: raise InvalidStateError( "received message when consumer was expected to be paused" ) - elif not message_carried_over: + else: # Otherwise, we need to try fetch a new message from the consumer, # even if there is no active assignment and/or processing strategy. try: