Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio.exceptions.CancelledError and consumer stops #772

Open
1 of 2 tasks
lachuta opened this issue Nov 29, 2022 · 1 comment
Open
1 of 2 tasks

asyncio.exceptions.CancelledError and consumer stops #772

lachuta opened this issue Nov 29, 2022 · 1 comment

Comments

@lachuta
Copy link

lachuta commented Nov 29, 2022

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Have not been able to reproduce

Expected behavior

Not have the consumers stop processing

Actual behavior

Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). Have seen the consumer get the exception below and stoped consuming messages, with the following showing in the logs:

Some consumer configurations:
broker_heartbeat_interval: 6
broker_request_timeout: 180
broker_session_timeout: 120
broker_max_poll_records: 300

Full traceback

2022-11-29T15:20:36.894Z mss_sa_refresh_stream_processor INFO [request_id={173e3da6-6ee3-4fa9-ba16-460a14961ba8} transaction_id={5443682e-7eb8-4012-8f8b-8efe6cca45cc} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6220141. Duration: 71 ms
2022-11-29T15:20:36.895Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6191034
2022-11-29T15:20:36.968Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6191034. Duration: 73 ms
2022-11-29T15:20:36.969Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6190948
2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6190948. Duration: 74 ms
2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6221813
2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Exception in thread
2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Thread-1
2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/utils/locks.py", line 76, in wait
2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] await fut
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] asyncio.exceptions
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] .
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] CancelledError
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run()
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 603, in run_until_complete
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run_forever()
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 570, in run_forever
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self._run_once()
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 1859, in _run_once
2022-11-29T15:20:37.145Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6221813. Duration: 102 ms
2022-11-29T15:20:37.146Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6434463
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] handle._run()
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] SystemError
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] PyEval_EvalFrameEx returned a result with an error set
2022-11-29T15:20:37.222Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6434463. Duration: 77 ms
2022-11-29T15:25:37.943Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (started at Tue Nov 29 15:20:37 2022) Replaying logs...
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (Tue Nov 29 15:20:37 2022) +consumer.commit()
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] -End of log-
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] Task traceback
2022-11-29T15:25:37.946Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='<coroutine object Consumer._commit_handler at 0x7f88c9e183c0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 783, in _commit_handler
    await self.commit()
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
    return await self.force_commit(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
    did_commit = await self._commit_tps(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
    return await self._commit_offsets(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
    return await self._thread.commit(offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
    return await self.call_thread(self._commit, offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise

Versions

  • Python version: 3.8.11
  • Faust version: 1.10.4
  • Operating system: Linux 3.10.0-693.el7.x86_64
  • Kafka version: 2.6.0
  • aiohttp version: 3.8.3
@wbarnha
Copy link

wbarnha commented Jan 13, 2023

Try https://github.com/faust-streaming/faust, we actively maintain that fork.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants