From 1baecc289d18872064d3e8dfdf3f62226e3761b4 Mon Sep 17 00:00:00 2001 From: Stephen Macke Date: Thu, 25 May 2023 08:26:43 -0700 Subject: [PATCH 1/5] go back to using nest_asyncio for run_sync utility --- jupyter_core/utils/__init__.py | 70 +++++----------------- jupyter_core/utils/patched_nest_asyncio.py | 54 +++++++++++++++++ pyproject.toml | 1 + 3 files changed, 71 insertions(+), 54 deletions(-) create mode 100644 jupyter_core/utils/patched_nest_asyncio.py diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 68973e6..2ccfe68 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -2,12 +2,10 @@ # Distributed under the terms of the Modified BSD License. import asyncio -import atexit import errno import inspect import os import sys -import threading import warnings from pathlib import Path from types import FrameType @@ -92,43 +90,6 @@ def deprecation(message: str, internal: Union[str, List[str]] = "jupyter_core/") T = TypeVar("T") -class _TaskRunner: - """A task runner that runs an asyncio event loop on a background thread.""" - - def __init__(self): - self.__io_loop: Optional[asyncio.AbstractEventLoop] = None - self.__runner_thread: Optional[threading.Thread] = None - self.__lock = threading.Lock() - atexit.register(self._close) - - def _close(self): - if self.__io_loop: - self.__io_loop.stop() - - def _runner(self): - loop = self.__io_loop - assert loop is not None # noqa - try: - loop.run_forever() - finally: - loop.close() - - def run(self, coro): - """Synchronously run a coroutine on a background thread.""" - with self.__lock: - name = f"{threading.current_thread().name} - runner" - if self.__io_loop is None: - self.__io_loop = asyncio.new_event_loop() - self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name) - self.__runner_thread.start() - fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) - return fut.result(None) - - -_runner_map = {} -_loop_map = {} - - def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: """Wraps coroutine in a function that blocks until it has executed. @@ -147,23 +108,24 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: raise AssertionError def wrapped(*args, **kwargs): - name = threading.current_thread().name - inner = coro(*args, **kwargs) try: - # If a loop is currently running in this thread, - # use a task runner. - asyncio.get_running_loop() - if name not in _runner_map: - _runner_map[name] = _TaskRunner() - return _runner_map[name].run(inner) + loop = asyncio.get_running_loop() except RuntimeError: - pass - - # Run the loop for this thread. - if name not in _loop_map: - _loop_map[name] = asyncio.new_event_loop() - loop = _loop_map[name] - return loop.run_until_complete(inner) + # Workaround for bugs.python.org/issue39529. + try: + loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + from jupyter_core.utils import patched_nest_asyncio + + patched_nest_asyncio.apply(loop) + future = asyncio.ensure_future(coro(*args, **kwargs), loop=loop) + try: + return loop.run_until_complete(future) + except BaseException as e: + future.cancel() + raise e wrapped.__doc__ = coro.__doc__ return wrapped diff --git a/jupyter_core/utils/patched_nest_asyncio.py b/jupyter_core/utils/patched_nest_asyncio.py new file mode 100644 index 0000000..76e12fa --- /dev/null +++ b/jupyter_core/utils/patched_nest_asyncio.py @@ -0,0 +1,54 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +# Note: copied from https://github.com/ipyflow/ipyflow/blob/8e4bc5cb8d4231b9b69f4f9dce867b8101164ac5/core/ipyflow/kernel/patched_nest_asyncio.py +import asyncio +import sys + + +def apply(loop=None): + import nest_asyncio + + # ref: https://github.com/erdewit/nest_asyncio/issues/14 + nest_asyncio._patch_task = _patched_patch_task + + # ref: https://github.com/erdewit/nest_asyncio + nest_asyncio.apply(loop=loop) + + +def _patched_patch_task(): + """Patch the Task's step and enter/leave methods to make it reentrant.""" + + def step(task, exc=None): + curr_task = curr_tasks.get(task._loop) + try: + step_orig(task, exc) + finally: + if curr_task is None: + curr_tasks.pop(task._loop, None) + else: + curr_tasks[task._loop] = curr_task + + Task = asyncio.Task + if sys.version_info >= (3, 7, 0): + + def enter_task(loop, task): + curr_tasks[loop] = task + + def leave_task(loop, task): + curr_tasks.pop(loop, None) + + asyncio.tasks._enter_task = enter_task + asyncio.tasks._leave_task = leave_task + curr_tasks = asyncio.tasks._current_tasks + else: + curr_tasks = Task._current_tasks + try: + step_orig = Task._Task__step + Task._Task__step = step + except AttributeError: + try: + step_orig = Task.__step + Task.__step = step + except AttributeError: + step_orig = Task._step + Task._step = step diff --git a/pyproject.toml b/pyproject.toml index dc33f54..5c537ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ classifiers = [ ] requires-python = ">=3.8" dependencies = [ + "nest_asyncio", "platformdirs>=2.5", "traitlets>=5.3", "pywin32>=300 ; sys_platform == 'win32' and platform_python_implementation != 'PyPy'" From 11107c78a159ae3290bfa12a7b3a7459a90ba4d6 Mon Sep 17 00:00:00 2001 From: Stephen Macke Date: Thu, 25 May 2023 08:39:46 -0700 Subject: [PATCH 2/5] add noqas to make linter happy --- jupyter_core/utils/patched_nest_asyncio.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/jupyter_core/utils/patched_nest_asyncio.py b/jupyter_core/utils/patched_nest_asyncio.py index 76e12fa..fee2530 100644 --- a/jupyter_core/utils/patched_nest_asyncio.py +++ b/jupyter_core/utils/patched_nest_asyncio.py @@ -39,16 +39,16 @@ def leave_task(loop, task): asyncio.tasks._enter_task = enter_task asyncio.tasks._leave_task = leave_task - curr_tasks = asyncio.tasks._current_tasks + curr_tasks = asyncio.tasks._current_tasks # noqa else: - curr_tasks = Task._current_tasks + curr_tasks = Task._current_tasks # noqa try: - step_orig = Task._Task__step - Task._Task__step = step + step_orig = Task._Task__step # noqa + Task._Task__step = step # noqa except AttributeError: try: - step_orig = Task.__step - Task.__step = step + step_orig = Task.__step # noqa + Task.__step = step # noqa except AttributeError: - step_orig = Task._step - Task._step = step + step_orig = Task._step # noqa + Task._step = step # noqa From 2649b726da71306c5158e813523b8c06a4597f50 Mon Sep 17 00:00:00 2001 From: Stephen Macke Date: Thu, 25 May 2023 08:49:01 -0700 Subject: [PATCH 3/5] replace noqa with type: ignore --- jupyter_core/utils/patched_nest_asyncio.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/jupyter_core/utils/patched_nest_asyncio.py b/jupyter_core/utils/patched_nest_asyncio.py index fee2530..8555847 100644 --- a/jupyter_core/utils/patched_nest_asyncio.py +++ b/jupyter_core/utils/patched_nest_asyncio.py @@ -39,16 +39,16 @@ def leave_task(loop, task): asyncio.tasks._enter_task = enter_task asyncio.tasks._leave_task = leave_task - curr_tasks = asyncio.tasks._current_tasks # noqa + curr_tasks = asyncio.tasks._current_tasks # type: ignore else: - curr_tasks = Task._current_tasks # noqa + curr_tasks = Task._current_tasks # type: ignore try: - step_orig = Task._Task__step # noqa - Task._Task__step = step # noqa + step_orig = Task._Task__step # type: ignore + Task._Task__step = step # type: ignore except AttributeError: try: - step_orig = Task.__step # noqa - Task.__step = step # noqa + step_orig = Task.__step # type: ignore + Task.__step = step # type: ignore except AttributeError: - step_orig = Task._step # noqa - Task._step = step # noqa + step_orig = Task._step # type: ignore + Task._step = step # type: ignore From 8bc645ff453920eca626d1a85440b83ab8d6c12c Mon Sep 17 00:00:00 2001 From: Stephen Macke Date: Thu, 25 May 2023 10:23:50 -0700 Subject: [PATCH 4/5] dont allow repeated patching --- jupyter_core/utils/patched_nest_asyncio.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/jupyter_core/utils/patched_nest_asyncio.py b/jupyter_core/utils/patched_nest_asyncio.py index 8555847..5c19c33 100644 --- a/jupyter_core/utils/patched_nest_asyncio.py +++ b/jupyter_core/utils/patched_nest_asyncio.py @@ -29,6 +29,8 @@ def step(task, exc=None): curr_tasks[task._loop] = curr_task Task = asyncio.Task + if hasattr(Task, '_nest_patched'): + return if sys.version_info >= (3, 7, 0): def enter_task(loop, task): @@ -52,3 +54,4 @@ def leave_task(loop, task): except AttributeError: step_orig = Task._step # type: ignore Task._step = step # type: ignore + Task._nest_patched = True # type: ignore From a9700a4dfd663d8c2b5d5d37656ba0c6e428c305 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 25 May 2023 17:24:24 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- jupyter_core/utils/patched_nest_asyncio.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/jupyter_core/utils/patched_nest_asyncio.py b/jupyter_core/utils/patched_nest_asyncio.py index 5c19c33..e24711a 100644 --- a/jupyter_core/utils/patched_nest_asyncio.py +++ b/jupyter_core/utils/patched_nest_asyncio.py @@ -2,7 +2,6 @@ # Distributed under the terms of the Modified BSD License. # Note: copied from https://github.com/ipyflow/ipyflow/blob/8e4bc5cb8d4231b9b69f4f9dce867b8101164ac5/core/ipyflow/kernel/patched_nest_asyncio.py import asyncio -import sys def apply(loop=None): @@ -31,19 +30,15 @@ def step(task, exc=None): Task = asyncio.Task if hasattr(Task, '_nest_patched'): return - if sys.version_info >= (3, 7, 0): + def enter_task(loop, task): + curr_tasks[loop] = task - def enter_task(loop, task): - curr_tasks[loop] = task + def leave_task(loop, task): + curr_tasks.pop(loop, None) - def leave_task(loop, task): - curr_tasks.pop(loop, None) - - asyncio.tasks._enter_task = enter_task - asyncio.tasks._leave_task = leave_task - curr_tasks = asyncio.tasks._current_tasks # type: ignore - else: - curr_tasks = Task._current_tasks # type: ignore + asyncio.tasks._enter_task = enter_task + asyncio.tasks._leave_task = leave_task + curr_tasks = asyncio.tasks._current_tasks # type: ignore try: step_orig = Task._Task__step # type: ignore Task._Task__step = step # type: ignore