Skip to content

Commit

Permalink
fix: Ensure carried over message is in buffer (#283)
Browse files Browse the repository at this point in the history
Since `_run_once` can early return (https://github.com/getsentry/arroyo/blob/6286c7921978065edeb5ce72d829031f952d2e5e/arroyo/processing/processor.py#L369) it was possible that a message was never placed in `self.buffered_messages`. If we try to retreive it later, it can crash the consumer.

This is suspected to be the cause of the `Invalid message not found in buffer` messages we saw in prod.
  • Loading branch information
lynnagara authored Aug 29, 2023
1 parent 6286c79 commit 542cb34
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ def _run_once(self) -> None:
try:
start_poll = time.time()
self.__message = self.__consumer.poll(timeout=1.0)
if self.__message:
self.__buffered_messages.append(self.__message)
self.__metrics_buffer.incr_timing(
"arroyo.consumer.poll.time", time.time() - start_poll
)
Expand All @@ -377,8 +379,6 @@ def _run_once(self) -> None:
message = (
Message(self.__message) if self.__message is not None else None
)
if not message_carried_over:
self.__buffered_messages.append(self.__message)
self.__processing_strategy.submit(message)

self.__metrics_buffer.incr_timing(
Expand Down

0 comments on commit 542cb34

Please sign in to comment.