Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage asyncio loop creation manually #7591

Merged
merged 35 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e60c1fc
Manage asyncio loop creation manually
philippjfr Jan 6, 2025
95921e2
Handle async task scheduling manually
philippjfr Jan 6, 2025
40c8f72
Small fixes
philippjfr Jan 6, 2025
2bb86d9
Do not use async tests for threaded executions
philippjfr Jan 6, 2025
559f058
Do not set up global event loop
philippjfr Jan 6, 2025
de6bef1
Update async tests consistently
philippjfr Jan 6, 2025
4096d4a
Declare async tests
philippjfr Jan 6, 2025
a1c8af3
Ensure an event loop is set
philippjfr Jan 6, 2025
2682b26
More async tests
philippjfr Jan 6, 2025
44fee87
More changes
philippjfr Jan 6, 2025
821f9ef
Fix all async chat tests
philippjfr Jan 6, 2025
e23a297
Do not create Tornado IOLoop
philippjfr Jan 6, 2025
36977ce
Fix test
philippjfr Jan 6, 2025
76ed9d4
Small adjustments
philippjfr Jan 6, 2025
f620db9
Cleanup thread pool for ioloops
philippjfr Jan 6, 2025
3442e8d
Ensure streamz event loop gets shut down
philippjfr Jan 6, 2025
85d1ada
Revert changes to async_execute
philippjfr Jan 6, 2025
0e3c9a4
Make doc tests async
philippjfr Jan 6, 2025
6caabf9
Re-enable test
philippjfr Jan 6, 2025
bd21e64
fix types
philippjfr Jan 6, 2025
93ad2fc
Remove loop creation and cleanup
philippjfr Jan 7, 2025
933273c
Fixes
philippjfr Jan 7, 2025
9dabd29
Disable autouse on asyncio_loop
philippjfr Jan 7, 2025
78aaa36
Do not reuse ioloop
philippjfr Jan 7, 2025
bc4586a
Explicit asyncio loop for subprocess tests
philippjfr Jan 7, 2025
89fc247
Avoid creating Tornado IOLoop again
philippjfr Jan 7, 2025
50af057
Minor fixes
philippjfr Jan 7, 2025
e16a019
Updates to jupyterlite pins
philippjfr Jan 7, 2025
55c6a28
Bump lock version
philippjfr Jan 7, 2025
51a106d
Fix pins
philippjfr Jan 7, 2025
fda7cfd
Do not use custom bokeh wheel for now
philippjfr Jan 7, 2025
32ba68e
Do not piggyback on Tornado event loop for periodic callbacks
philippjfr Jan 7, 2025
ae6a11b
Make tests more robust
philippjfr Jan 7, 2025
62e3564
Use async test instead of explicit loop
philippjfr Jan 7, 2025
35ff450
Fix PeriodicCallback.stop()
philippjfr Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions panel/chat/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ def _init_widgets(self):
self._input_layout = input_layout

def _wrap_callbacks(
self,
callback: Callable | None = None,
post_callback: Callable | None = None,
name: str = ""
):
self,
callback: Callable | None = None,
post_callback: Callable | None = None,
name: str = ""
):
"""
Wrap the callback and post callback around the default callback.
"""
Expand Down Expand Up @@ -654,7 +654,6 @@ async def _cleanup_response(self):
await super()._cleanup_response()
await self._update_input_disabled()


def send(
self,
value: ChatMessage | dict | Any,
Expand Down
18 changes: 6 additions & 12 deletions panel/io/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,16 @@ def start(self):
finally:
self._updating = False
self._start_time = time.time()
if state._is_pyodide:
self._cb = asyncio.create_task(
self._async_repeat(self._periodic_callback)
)
elif state.curdoc and state.curdoc.session_context:
if state.curdoc and state.curdoc.session_context and not state._is_pyodide:
self._doc = state.curdoc
if state._unblocked(state.curdoc):
self._cb = self._doc.add_periodic_callback(self._periodic_callback, self.period)
else:
self._doc.add_next_tick_callback(self.start)
else:
from tornado.ioloop import PeriodicCallback
self._cb = PeriodicCallback(lambda: asyncio.create_task(self._periodic_callback()), self.period)
self._cb.start()
self._cb = asyncio.create_task(
self._async_repeat(self._periodic_callback)
)

def stop(self):
"""
Expand All @@ -197,15 +193,13 @@ def stop(self):
with param.discard_events(self):
self.counter = 0
self._timeout = None
if state._is_pyodide and self._cb:
self._cb.cancel()
elif self._doc and self._cb:
if self._doc and self._cb and not state._is_pyodide:
if self._doc._session_context:
self._doc.callbacks.remove_session_callback(self._cb)
elif self._cb in self._doc.callbacks.session_callbacks:
self._doc.callbacks._session_callbacks.remove(self._cb)
elif self._cb:
self._cb.stop()
self._cb.cancel()
self._cb = None
doc = self._doc or curdoc_locked()
if doc:
Expand Down
29 changes: 24 additions & 5 deletions panel/io/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pathlib
import signal
import sys
import threading
import uuid

from collections.abc import Callable, Mapping
Expand Down Expand Up @@ -111,19 +112,37 @@ def _server_url(url: str, port: int) -> str:
else:
return 'http://%s:%d%s' % (url.split(':')[0], port, "/")

_tasks = set()

def async_execute(func: Callable[..., None]) -> None:
"""
Wrap async event loop scheduling to ensure that with_lock flag
is propagated from function to partial wrapping it.
"""
if not state.curdoc or not state.curdoc.session_context:
ioloop = IOLoop.current()
event_loop = ioloop.asyncio_loop # type: ignore
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Avoid creating IOLoop if one is not already associated
# with the asyncio loop or we're on a child thread
if hasattr(IOLoop, '_ioloop_for_asyncio') and loop in IOLoop._ioloop_for_asyncio:
ioloop = IOLoop._ioloop_for_asyncio[loop]
elif threading.current_thread() is not threading.main_thread():
ioloop = IOLoop.current()
else:
ioloop = None
wrapper = state._handle_exception_wrapper(func)
if event_loop.is_running():
ioloop.add_callback(wrapper)
if loop.is_running():
if ioloop is None:
task = asyncio.ensure_future(wrapper())
_tasks.add(task)
task.add_done_callback(_tasks.discard)
else:
ioloop.add_callback(wrapper)
else:
event_loop.run_until_complete(wrapper())
loop.run_until_complete(wrapper())
return

if isinstance(func, partial) and hasattr(func.func, 'lock'):
Expand Down
Loading
Loading