Skip to content

Commit

Permalink
fix: Revert change when consumer is paused (#297)
Browse files Browse the repository at this point in the history
This reverts part of the change made in #296,
which removed the check for the carried over message, when validating that
the consumer is paused. This change was inadvertently introduced and is not actually
needed to achieve th goal of throttling calls to consumer pause/resume.

This caused the consumer to crash with InvalidStateError so let's revert this part.
  • Loading branch information
lynnagara authored Oct 30, 2023
1 parent 44d2d93 commit 7ba97b4
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,19 @@ def _run_once(self) -> None:

message_carried_over = self.__message is not None

if self.__is_paused:
if message_carried_over:
# If a message was carried over from the previous run, there are two reasons:
#
# * MessageRejected. the consumer should be paused and not
# returning any messages on ``poll``.
# * InvalidMessage. the message should be resubmitted.
# _handle_invalid_message is responsible for clearing out
# self.__message if it was the invalid one.
if self.__consumer.poll(timeout=0) is not None:
if self.__is_paused and self.__consumer.poll(timeout=0) is not None:
raise InvalidStateError(
"received message when consumer was expected to be paused"
)
elif not message_carried_over:
else:
# Otherwise, we need to try fetch a new message from the consumer,
# even if there is no active assignment and/or processing strategy.
try:
Expand Down

0 comments on commit 7ba97b4

Please sign in to comment.