Skip to content

Commit

Permalink
fix(run_task_in_threads): Commit offsets even when consumer is idle (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Aug 10, 2023
1 parent 0f9a02c commit e6c6ec1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
9 changes: 9 additions & 0 deletions arroyo/processing/strategies/run_task_in_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ def submit(

def poll(self) -> None:
self.__forward_invalid_offsets()

if not self.__queue:
# specifically for the case where we are idling, poll the next step
# so that committing can occur. if there is stuff in the queue and
# we are waiting for a future to be finished, we do not really need
# to forward polls.
self.__next_step.poll()
return

while self.__queue:
message, future = self.__queue[0]
next_message: Message[TResult]
Expand Down
19 changes: 19 additions & 0 deletions tests/processing/strategies/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,22 @@ def test_join(strategy_factory: StrategyFactory) -> None:
assert next_step.close.call_args_list == [call()]

assert next_step.join.call_args_list == [call(timeout=None)]


@pytest.mark.parametrize("strategy_factory", FACTORIES)
def test_poll_next_step(
request: pytest.FixtureRequest, strategy_factory: StrategyFactory
) -> None:
next_step = Mock()

step = strategy_factory(next_step)
request.addfinalizer(step.terminate)

# Ensure that polling a strategy forwards the poll unconditionally even if
# there are no messages to process, or no progress at all. Otherwise there
# are weird effects where all messages on a test topic (such as in
# benchmarking/QE) have been processed but the last batch of offsets never
# gets committed.
step.poll()

assert next_step.poll.call_args_list == [call()]

0 comments on commit e6c6ec1

Please sign in to comment.