Skip to content

Commit

Permalink
Manage asyncio loop creation manually (#7591)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr authored Jan 7, 2025
1 parent d48c45f commit 400cc44
Show file tree
Hide file tree
Showing 18 changed files with 315 additions and 317 deletions.
11 changes: 5 additions & 6 deletions panel/chat/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ def _init_widgets(self):
self._input_layout = input_layout

def _wrap_callbacks(
self,
callback: Callable | None = None,
post_callback: Callable | None = None,
name: str = ""
):
self,
callback: Callable | None = None,
post_callback: Callable | None = None,
name: str = ""
):
"""
Wrap the callback and post callback around the default callback.
"""
Expand Down Expand Up @@ -654,7 +654,6 @@ async def _cleanup_response(self):
await super()._cleanup_response()
await self._update_input_disabled()


def send(
self,
value: ChatMessage | dict | Any,
Expand Down
18 changes: 6 additions & 12 deletions panel/io/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,16 @@ def start(self):
finally:
self._updating = False
self._start_time = time.time()
if state._is_pyodide:
self._cb = asyncio.create_task(
self._async_repeat(self._periodic_callback)
)
elif state.curdoc and state.curdoc.session_context:
if state.curdoc and state.curdoc.session_context and not state._is_pyodide:
self._doc = state.curdoc
if state._unblocked(state.curdoc):
self._cb = self._doc.add_periodic_callback(self._periodic_callback, self.period)
else:
self._doc.add_next_tick_callback(self.start)
else:
from tornado.ioloop import PeriodicCallback
self._cb = PeriodicCallback(lambda: asyncio.create_task(self._periodic_callback()), self.period)
self._cb.start()
self._cb = asyncio.create_task(
self._async_repeat(self._periodic_callback)
)

def stop(self):
"""
Expand All @@ -197,15 +193,13 @@ def stop(self):
with param.discard_events(self):
self.counter = 0
self._timeout = None
if state._is_pyodide and self._cb:
self._cb.cancel()
elif self._doc and self._cb:
if self._doc and self._cb and not state._is_pyodide:
if self._doc._session_context:
self._doc.callbacks.remove_session_callback(self._cb)
elif self._cb in self._doc.callbacks.session_callbacks:
self._doc.callbacks._session_callbacks.remove(self._cb)
elif self._cb:
self._cb.stop()
self._cb.cancel()
self._cb = None
doc = self._doc or curdoc_locked()
if doc:
Expand Down
29 changes: 24 additions & 5 deletions panel/io/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pathlib
import signal
import sys
import threading
import uuid

from collections.abc import Callable, Mapping
Expand Down Expand Up @@ -111,19 +112,37 @@ def _server_url(url: str, port: int) -> str:
else:
return 'http://%s:%d%s' % (url.split(':')[0], port, "/")

_tasks = set()

def async_execute(func: Callable[..., None]) -> None:
"""
Wrap async event loop scheduling to ensure that with_lock flag
is propagated from function to partial wrapping it.
"""
if not state.curdoc or not state.curdoc.session_context:
ioloop = IOLoop.current()
event_loop = ioloop.asyncio_loop # type: ignore
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Avoid creating IOLoop if one is not already associated
# with the asyncio loop or we're on a child thread
if hasattr(IOLoop, '_ioloop_for_asyncio') and loop in IOLoop._ioloop_for_asyncio:
ioloop = IOLoop._ioloop_for_asyncio[loop]
elif threading.current_thread() is not threading.main_thread():
ioloop = IOLoop.current()
else:
ioloop = None
wrapper = state._handle_exception_wrapper(func)
if event_loop.is_running():
ioloop.add_callback(wrapper)
if loop.is_running():
if ioloop is None:
task = asyncio.ensure_future(wrapper())
_tasks.add(task)
task.add_done_callback(_tasks.discard)
else:
ioloop.add_callback(wrapper)
else:
event_loop.run_until_complete(wrapper())
loop.run_until_complete(wrapper())
return

if isinstance(func, partial) and hasattr(func.func, 'lock'):
Expand Down
Loading

0 comments on commit 400cc44

Please sign in to comment.