Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go back to using nest_asyncio for run_sync utility #353

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 16 additions & 54 deletions jupyter_core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import errno
import inspect
import os
import sys
import threading
import warnings
from pathlib import Path
from types import FrameType
Expand Down Expand Up @@ -92,43 +90,6 @@ def deprecation(message: str, internal: Union[str, List[str]] = "jupyter_core/")
T = TypeVar("T")


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)

def _close(self):
if self.__io_loop:
self.__io_loop.stop()

def _runner(self):
loop = self.__io_loop
assert loop is not None # noqa
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
name = f"{threading.current_thread().name} - runner"
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
return fut.result(None)


_runner_map = {}
_loop_map = {}


def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
"""Wraps coroutine in a function that blocks until it has executed.

Expand All @@ -147,23 +108,24 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
raise AssertionError

def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# If a loop is currently running in this thread,
# use a task runner.
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
return _runner_map[name].run(inner)
loop = asyncio.get_running_loop()
except RuntimeError:
pass

# Run the loop for this thread.
if name not in _loop_map:
_loop_map[name] = asyncio.new_event_loop()
loop = _loop_map[name]
return loop.run_until_complete(inner)
# Workaround for bugs.python.org/issue39529.
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
from jupyter_core.utils import patched_nest_asyncio

patched_nest_asyncio.apply(loop)
future = asyncio.ensure_future(coro(*args, **kwargs), loop=loop)
try:
return loop.run_until_complete(future)
except BaseException as e:
future.cancel()
raise e

wrapped.__doc__ = coro.__doc__
return wrapped
Expand Down
52 changes: 52 additions & 0 deletions jupyter_core/utils/patched_nest_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
# Note: copied from https://github.com/ipyflow/ipyflow/blob/8e4bc5cb8d4231b9b69f4f9dce867b8101164ac5/core/ipyflow/kernel/patched_nest_asyncio.py
import asyncio


def apply(loop=None):
import nest_asyncio

# ref: https://github.com/erdewit/nest_asyncio/issues/14
nest_asyncio._patch_task = _patched_patch_task

# ref: https://github.com/erdewit/nest_asyncio
nest_asyncio.apply(loop=loop)


def _patched_patch_task():
"""Patch the Task's step and enter/leave methods to make it reentrant."""

def step(task, exc=None):
curr_task = curr_tasks.get(task._loop)
try:
step_orig(task, exc)
finally:
if curr_task is None:
curr_tasks.pop(task._loop, None)
else:
curr_tasks[task._loop] = curr_task

Task = asyncio.Task
if hasattr(Task, '_nest_patched'):
return
def enter_task(loop, task):
curr_tasks[loop] = task

def leave_task(loop, task):
curr_tasks.pop(loop, None)

asyncio.tasks._enter_task = enter_task
asyncio.tasks._leave_task = leave_task
curr_tasks = asyncio.tasks._current_tasks # type: ignore
try:
step_orig = Task._Task__step # type: ignore
Task._Task__step = step # type: ignore
except AttributeError:
try:
step_orig = Task.__step # type: ignore
Task.__step = step # type: ignore
except AttributeError:
step_orig = Task._step # type: ignore
Task._step = step # type: ignore
Task._nest_patched = True # type: ignore
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ classifiers = [
]
requires-python = ">=3.8"
dependencies = [
"nest_asyncio",
"platformdirs>=2.5",
"traitlets>=5.3",
"pywin32>=300 ; sys_platform == 'win32' and platform_python_implementation != 'PyPy'"
Expand Down