Skip to content

Commit

Permalink
go back to using nest_asyncio for run_sync utility
Browse files Browse the repository at this point in the history
  • Loading branch information
smacke committed May 25, 2023
1 parent b1086f2 commit 1baecc2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 54 deletions.
70 changes: 16 additions & 54 deletions jupyter_core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import errno
import inspect
import os
import sys
import threading
import warnings
from pathlib import Path
from types import FrameType
Expand Down Expand Up @@ -92,43 +90,6 @@ def deprecation(message: str, internal: Union[str, List[str]] = "jupyter_core/")
T = TypeVar("T")


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)

def _close(self):
if self.__io_loop:
self.__io_loop.stop()

def _runner(self):
loop = self.__io_loop
assert loop is not None # noqa
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
name = f"{threading.current_thread().name} - runner"
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
return fut.result(None)


_runner_map = {}
_loop_map = {}


def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
"""Wraps coroutine in a function that blocks until it has executed.
Expand All @@ -147,23 +108,24 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
raise AssertionError

def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# If a loop is currently running in this thread,
# use a task runner.
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
return _runner_map[name].run(inner)
loop = asyncio.get_running_loop()
except RuntimeError:
pass

# Run the loop for this thread.
if name not in _loop_map:
_loop_map[name] = asyncio.new_event_loop()
loop = _loop_map[name]
return loop.run_until_complete(inner)
# Workaround for bugs.python.org/issue39529.
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
from jupyter_core.utils import patched_nest_asyncio

patched_nest_asyncio.apply(loop)
future = asyncio.ensure_future(coro(*args, **kwargs), loop=loop)
try:
return loop.run_until_complete(future)
except BaseException as e:
future.cancel()
raise e

wrapped.__doc__ = coro.__doc__
return wrapped
Expand Down
54 changes: 54 additions & 0 deletions jupyter_core/utils/patched_nest_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
# Note: copied from https://github.com/ipyflow/ipyflow/blob/8e4bc5cb8d4231b9b69f4f9dce867b8101164ac5/core/ipyflow/kernel/patched_nest_asyncio.py
import asyncio
import sys


def apply(loop=None):
import nest_asyncio

# ref: https://github.com/erdewit/nest_asyncio/issues/14
nest_asyncio._patch_task = _patched_patch_task

# ref: https://github.com/erdewit/nest_asyncio
nest_asyncio.apply(loop=loop)


def _patched_patch_task():
"""Patch the Task's step and enter/leave methods to make it reentrant."""

def step(task, exc=None):
curr_task = curr_tasks.get(task._loop)
try:
step_orig(task, exc)
finally:
if curr_task is None:
curr_tasks.pop(task._loop, None)
else:
curr_tasks[task._loop] = curr_task

Task = asyncio.Task
if sys.version_info >= (3, 7, 0):

def enter_task(loop, task):
curr_tasks[loop] = task

def leave_task(loop, task):
curr_tasks.pop(loop, None)

asyncio.tasks._enter_task = enter_task
asyncio.tasks._leave_task = leave_task
curr_tasks = asyncio.tasks._current_tasks
else:
curr_tasks = Task._current_tasks
try:
step_orig = Task._Task__step
Task._Task__step = step
except AttributeError:
try:
step_orig = Task.__step
Task.__step = step
except AttributeError:
step_orig = Task._step
Task._step = step
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ classifiers = [
]
requires-python = ">=3.8"
dependencies = [
"nest_asyncio",
"platformdirs>=2.5",
"traitlets>=5.3",
"pywin32>=300 ; sys_platform == 'win32' and platform_python_implementation != 'PyPy'"
Expand Down

0 comments on commit 1baecc2

Please sign in to comment.