From 3abfdc97f9382ebf704b2295e3da8d77a6d56aad Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 25 Jul 2023 17:53:33 +0200 Subject: [PATCH 1/4] Support external kernels --- jupyter_client/manager.py | 14 ++++++- jupyter_client/multikernelmanager.py | 59 +++++++++++++++++++++++++++- jupyter_client/utils.py | 33 ++++++++++++++++ 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index f04bd987..564966ed 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -85,7 +85,8 @@ async def wrapper(self, *args, **kwargs): out = await method(self, *args, **kwargs) # Add a small sleep to ensure tests can capture the state before done await asyncio.sleep(0.01) - self._ready.set_result(None) + if self.owns_kernel: + self._ready.set_result(None) return out except Exception as e: self._ready.set_exception(e) @@ -105,6 +106,7 @@ class KernelManager(ConnectionFileMixin): def __init__(self, *args, **kwargs): """Initialize a kernel manager.""" + self._owns_kernel = kwargs.pop("owns_kernel", True) super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset self._attempted_start = False @@ -495,6 +497,9 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ + if not self.owns_kernel: + return + self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -558,6 +563,10 @@ async def _async_restart_kernel( restart_kernel = run_sync(_async_restart_kernel) + @property + def owns_kernel(self) -> bool: + return self._owns_kernel + @property def has_kernel(self) -> bool: """Has a kernel process been started that we are actively managing.""" @@ -646,6 +655,9 @@ async def _async_signal_kernel(self, signum: int) -> None: async def _async_is_alive(self) -> bool: """Is the kernel process still running?""" + if not self.owns_kernel: + return True + if self.has_kernel: assert self.provisioner is not None ret = await self.provisioner.poll() diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index c3ae1e7b..2f58ea6e 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -2,11 +2,13 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. import asyncio +import json import os import socket import typing as t import uuid from functools import wraps +from pathlib import Path import zmq from traitlets import Any, Bool, Dict, DottedObjectName, Instance, Unicode, default, observe @@ -15,7 +17,7 @@ from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager from .manager import KernelManager -from .utils import ensure_async, run_sync +from .utils import ensure_async, run_sync, utcnow class DuplicateKernelError(Exception): @@ -105,9 +107,14 @@ def _context_default(self) -> zmq.Context: return zmq.Context() connection_dir = Unicode("") + external_connection_dir = Unicode("") _kernels = Dict() + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.kernel_id_to_connection_file = {} + def __del__(self): """Handle garbage collection. Destroy context if applicable.""" if self._created_context and self.context and not self.context.closed: @@ -123,6 +130,56 @@ def __del__(self): def list_kernel_ids(self) -> t.List[str]: """Return a list of the kernel ids of the active kernels.""" + if self.external_connection_dir: + connection_files = [ + p for p in Path(self.external_connection_dir).iterdir() if p.is_file() + ] + + # remove kernels (whose connection file has disappeared) from our list + k = list(self.kernel_id_to_connection_file.keys()) + v = list(self.kernel_id_to_connection_file.values()) + for connection_file in list(self.kernel_id_to_connection_file.values()): + if connection_file not in connection_files: + kernel_id = k[v.index(connection_file)] + del self.kernel_id_to_connection_file[kernel_id] + del self._kernels[kernel_id] + + # add kernels (whose connection file appeared) to our list + for connection_file in connection_files: + if connection_file in self.kernel_id_to_connection_file.values(): + continue + try: + connection_info = json.loads(connection_file.read_text()) + except Exception: + continue + if not ("kernel_name" in connection_info and "key" in connection_info): + continue + # it looks like a connection file + kernel_id = self.new_kernel_id() + self.kernel_id_to_connection_file[kernel_id] = connection_file + km = self.kernel_manager_factory( + parent=self, + log=self.log, + owns_kernel=False, + ) + km.last_activity = utcnow() + km.execution_state = "idle" + km.connections = 1 + km.kernel_id = kernel_id + km.shell_port = connection_info["shell_port"] + km.iopub_port = connection_info["iopub_port"] + km.stdin_port = connection_info["stdin_port"] + km.control_port = connection_info["control_port"] + km.hb_port = connection_info["hb_port"] + km.ip = connection_info["ip"] + km.transport = connection_info["transport"] + km.session.key = connection_info["key"].encode() + km.session.signature_scheme = connection_info["signature_scheme"] + km.kernel_name = connection_info["kernel_name"] + km.ready.set_result(None) + + self._kernels[kernel_id] = km + # Create a copy so we can iterate over kernels in operations # that delete keys. return list(self._kernels.keys()) diff --git a/jupyter_client/utils.py b/jupyter_client/utils.py index eafdd328..ab1cbcaa 100644 --- a/jupyter_client/utils.py +++ b/jupyter_client/utils.py @@ -4,6 +4,7 @@ - vendor functions from ipython_genutils that should be retired at some point. """ import os +from datetime import datetime, timedelta, tzinfo from jupyter_core.utils import ensure_async, run_sync # noqa: F401 # noqa: F401 @@ -83,3 +84,35 @@ def _expand_path(s): if os.name == "nt": s = s.replace("IPYTHON_TEMP", "$\\") return s + + +# constant for zero offset +ZERO = timedelta(0) + + +class tzUTC(tzinfo): # noqa + """tzinfo object for UTC (zero offset)""" + + def utcoffset(self, d): + """Compute utcoffset.""" + return ZERO + + def dst(self, d): + """Compute dst.""" + return ZERO + + +UTC = tzUTC() # type:ignore + + +def utc_aware(unaware): + """decorator for adding UTC tzinfo to datetime's utcfoo methods""" + + def utc_method(*args, **kwargs): + dt = unaware(*args, **kwargs) + return dt.replace(tzinfo=UTC) + + return utc_method + + +utcnow = utc_aware(datetime.utcnow) From 1553aa70ca3b1a09256bfd076ce443b6eab13e28 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 26 Jul 2023 09:21:12 +0200 Subject: [PATCH 2/4] Ignore linter rule --- jupyter_client/multikernelmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 2f58ea6e..d70286e0 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -150,7 +150,7 @@ def list_kernel_ids(self) -> t.List[str]: continue try: connection_info = json.loads(connection_file.read_text()) - except Exception: + except Exception: # noqa: S112 continue if not ("kernel_name" in connection_info and "key" in connection_info): continue From 6894d7eccfb7cb6d24f231dffcf92bbdf4989b58 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 26 Jul 2023 10:43:42 +0200 Subject: [PATCH 3/4] Check if connection directory exists --- jupyter_client/multikernelmanager.py | 100 +++++++++++++-------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index d70286e0..df2fa015 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -107,7 +107,7 @@ def _context_default(self) -> zmq.Context: return zmq.Context() connection_dir = Unicode("") - external_connection_dir = Unicode("") + external_connection_dir = Unicode(None, allow_none=True) _kernels = Dict() @@ -130,55 +130,55 @@ def __del__(self): def list_kernel_ids(self) -> t.List[str]: """Return a list of the kernel ids of the active kernels.""" - if self.external_connection_dir: - connection_files = [ - p for p in Path(self.external_connection_dir).iterdir() if p.is_file() - ] - - # remove kernels (whose connection file has disappeared) from our list - k = list(self.kernel_id_to_connection_file.keys()) - v = list(self.kernel_id_to_connection_file.values()) - for connection_file in list(self.kernel_id_to_connection_file.values()): - if connection_file not in connection_files: - kernel_id = k[v.index(connection_file)] - del self.kernel_id_to_connection_file[kernel_id] - del self._kernels[kernel_id] - - # add kernels (whose connection file appeared) to our list - for connection_file in connection_files: - if connection_file in self.kernel_id_to_connection_file.values(): - continue - try: - connection_info = json.loads(connection_file.read_text()) - except Exception: # noqa: S112 - continue - if not ("kernel_name" in connection_info and "key" in connection_info): - continue - # it looks like a connection file - kernel_id = self.new_kernel_id() - self.kernel_id_to_connection_file[kernel_id] = connection_file - km = self.kernel_manager_factory( - parent=self, - log=self.log, - owns_kernel=False, - ) - km.last_activity = utcnow() - km.execution_state = "idle" - km.connections = 1 - km.kernel_id = kernel_id - km.shell_port = connection_info["shell_port"] - km.iopub_port = connection_info["iopub_port"] - km.stdin_port = connection_info["stdin_port"] - km.control_port = connection_info["control_port"] - km.hb_port = connection_info["hb_port"] - km.ip = connection_info["ip"] - km.transport = connection_info["transport"] - km.session.key = connection_info["key"].encode() - km.session.signature_scheme = connection_info["signature_scheme"] - km.kernel_name = connection_info["kernel_name"] - km.ready.set_result(None) - - self._kernels[kernel_id] = km + if self.external_connection_dir is not None: + external_connection_dir = Path(self.external_connection_dir) + if external_connection_dir.is_dir(): + connection_files = [p for p in external_connection_dir.iterdir() if p.is_file()] + + # remove kernels (whose connection file has disappeared) from our list + k = list(self.kernel_id_to_connection_file.keys()) + v = list(self.kernel_id_to_connection_file.values()) + for connection_file in list(self.kernel_id_to_connection_file.values()): + if connection_file not in connection_files: + kernel_id = k[v.index(connection_file)] + del self.kernel_id_to_connection_file[kernel_id] + del self._kernels[kernel_id] + + # add kernels (whose connection file appeared) to our list + for connection_file in connection_files: + if connection_file in self.kernel_id_to_connection_file.values(): + continue + try: + connection_info = json.loads(connection_file.read_text()) + except Exception: # noqa: S112 + continue + if not ("kernel_name" in connection_info and "key" in connection_info): + continue + # it looks like a connection file + kernel_id = self.new_kernel_id() + self.kernel_id_to_connection_file[kernel_id] = connection_file + km = self.kernel_manager_factory( + parent=self, + log=self.log, + owns_kernel=False, + ) + km.last_activity = utcnow() + km.execution_state = "idle" + km.connections = 1 + km.kernel_id = kernel_id + km.shell_port = connection_info["shell_port"] + km.iopub_port = connection_info["iopub_port"] + km.stdin_port = connection_info["stdin_port"] + km.control_port = connection_info["control_port"] + km.hb_port = connection_info["hb_port"] + km.ip = connection_info["ip"] + km.transport = connection_info["transport"] + km.session.key = connection_info["key"].encode() + km.session.signature_scheme = connection_info["signature_scheme"] + km.kernel_name = connection_info["kernel_name"] + km.ready.set_result(None) + + self._kernels[kernel_id] = km # Create a copy so we can iterate over kernels in operations # that delete keys. From f357194c2b8198143994422db0dd071b2d11412a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 28 Jul 2023 09:12:29 +0200 Subject: [PATCH 4/4] Use load_connection_info() --- jupyter_client/multikernelmanager.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index df2fa015..16dbe410 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -15,6 +15,7 @@ from traitlets.config.configurable import LoggingConfigurable from traitlets.utils.importstring import import_item +from .connect import KernelConnectionInfo from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager from .manager import KernelManager from .utils import ensure_async, run_sync, utcnow @@ -149,9 +150,12 @@ def list_kernel_ids(self) -> t.List[str]: if connection_file in self.kernel_id_to_connection_file.values(): continue try: - connection_info = json.loads(connection_file.read_text()) + connection_info: KernelConnectionInfo = json.loads( + connection_file.read_text() + ) except Exception: # noqa: S112 continue + self.log.debug("Loading connection file %s", connection_file) if not ("kernel_name" in connection_info and "key" in connection_info): continue # it looks like a connection file @@ -162,19 +166,11 @@ def list_kernel_ids(self) -> t.List[str]: log=self.log, owns_kernel=False, ) + km.load_connection_info(connection_info) km.last_activity = utcnow() km.execution_state = "idle" km.connections = 1 km.kernel_id = kernel_id - km.shell_port = connection_info["shell_port"] - km.iopub_port = connection_info["iopub_port"] - km.stdin_port = connection_info["stdin_port"] - km.control_port = connection_info["control_port"] - km.hb_port = connection_info["hb_port"] - km.ip = connection_info["ip"] - km.transport = connection_info["transport"] - km.session.key = connection_info["key"].encode() - km.session.signature_scheme = connection_info["signature_scheme"] km.kernel_name = connection_info["kernel_name"] km.ready.set_result(None)