diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 4452d85ff..6d1f12a5b 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -435,6 +435,7 @@ class Consumer(Service, ConsumerT): flow_active: bool = True can_resume_flow: Event + suspend_flow: Event def __init__( self, @@ -475,6 +476,7 @@ def __init__( self._end_offset_monitor_interval = self.commit_interval * 2 self.randomly_assigned_topics = set() self.can_resume_flow = Event() + self.suspend_flow = Event() self._reset_state() super().__init__(loop=loop or self.transport.loop, **kwargs) self.transactions = self.transport.create_transaction_manager( @@ -497,6 +499,7 @@ def _reset_state(self) -> None: self._paused_partitions = set() self._buffered_partitions = set() self.can_resume_flow.clear() + self.suspend_flow.clear() self.flow_active = True self._time_start = monotonic() @@ -573,11 +576,13 @@ def stop_flow(self) -> None: """Block consumer from processing any more messages.""" self.flow_active = False self.can_resume_flow.clear() + self.suspend_flow.set() def resume_flow(self) -> None: """Allow consumer to process messages.""" self.flow_active = True self.can_resume_flow.set() + self.suspend_flow.clear() def pause_partitions(self, tps: Iterable[TP]) -> None: """Pause fetching from partitions.""" @@ -1120,7 +1125,9 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover if self._n_acked >= commit_every: self._n_acked = 0 await self.commit() - await callback(message) + await self.wait_first( + callback(message), self.suspend_flow.wait() + ) set_read_offset(tp, offset) else: self.log.dev(