Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Oct 24, 2023
1 parent e7a3339 commit 3113027
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
47 changes: 21 additions & 26 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def _run_once(self) -> None:
raise InvalidStateError(
"received message when consumer was expected to be paused"
)
if not message_carried_over:
elif not message_carried_over:
# Otherwise, we need to try fetch a new message from the consumer,
# even if there is no active assignment and/or processing strategy.
try:
Expand Down Expand Up @@ -391,20 +391,20 @@ def _run_once(self) -> None:
# If the processing strategy rejected our message, we need
# to pause the consumer and hold the message until it is
# accepted, at which point we can resume consuming.
if not message_carried_over:
if self.__backpressure_timestamp is None:
self.__backpressure_timestamp = time.time()

if not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
self.__message,
)
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True
# if not message_carried_over:
if self.__backpressure_timestamp is None:
self.__backpressure_timestamp = time.time()

elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
self.__message,
)
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True

else:
time.sleep(0.01)
Expand All @@ -413,22 +413,17 @@ def _run_once(self) -> None:
self._handle_invalid_message(e)

else:
# If we were trying to submit a message that failed to be
# submitted on a previous run, we can resume accepting new
# messages.
if (
message_carried_over
and self.__backpressure_timestamp is not None
):
if self.__is_paused:
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False
# Resume if we are currently in a paused state
if self.__is_paused:
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False

# Clear backpressure timestamp if it is set
if self.__backpressure_timestamp is not None:
self.__metrics_buffer.incr_timing(
"arroyo.consumer.paused.time",
time.time() - self.__backpressure_timestamp,
)

self.__backpressure_timestamp = None

self.__message = None
Expand Down
2 changes: 2 additions & 0 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def test_stream_processor_lifecycle() -> None:
with assert_changes(lambda: int(consumer.pause.call_count), 0, 1):
processor._run_once()
assert strategy.submit.call_args_list[-1] == mock.call(message)
time.sleep(1)
processor._run_once() # Should pause now

# If ``Consumer.poll`` returns a message when we expect it to be paused,
# we should raise an exception.
Expand Down

0 comments on commit 3113027

Please sign in to comment.