diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 0fe77a1c..eacb0ec1 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -354,6 +354,8 @@ def _run_once(self) -> None: try: start_poll = time.time() self.__message = self.__consumer.poll(timeout=1.0) + if self.__message: + self.__buffered_messages.append(self.__message) self.__metrics_buffer.incr_timing( "arroyo.consumer.poll.time", time.time() - start_poll ) @@ -377,8 +379,6 @@ def _run_once(self) -> None: message = ( Message(self.__message) if self.__message is not None else None ) - if not message_carried_over: - self.__buffered_messages.append(self.__message) self.__processing_strategy.submit(message) self.__metrics_buffer.incr_timing(