Skip to content

Commit

Permalink
Support external kernels (#961)
Browse files Browse the repository at this point in the history
* Support external kernels

* Ignore linter rule

* Check if connection directory exists

* Use load_connection_info()
  • Loading branch information
davidbrochart authored Aug 29, 2023
1 parent c08b87c commit b4f7d94
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
14 changes: 13 additions & 1 deletion jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async def wrapper(self, *args, **kwargs):
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
if self.owns_kernel:
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
Expand All @@ -105,6 +106,7 @@ class KernelManager(ConnectionFileMixin):

def __init__(self, *args, **kwargs):
"""Initialize a kernel manager."""
self._owns_kernel = kwargs.pop("owns_kernel", True)
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
self._attempted_start = False
Expand Down Expand Up @@ -495,6 +497,9 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
if not self.owns_kernel:
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -558,6 +563,10 @@ async def _async_restart_kernel(

restart_kernel = run_sync(_async_restart_kernel)

@property
def owns_kernel(self) -> bool:
return self._owns_kernel

@property
def has_kernel(self) -> bool:
"""Has a kernel process been started that we are actively managing."""
Expand Down Expand Up @@ -646,6 +655,9 @@ async def _async_signal_kernel(self, signum: int) -> None:

async def _async_is_alive(self) -> bool:
"""Is the kernel process still running?"""
if not self.owns_kernel:
return True

if self.has_kernel:
assert self.provisioner is not None
ret = await self.provisioner.poll()
Expand Down
55 changes: 54 additions & 1 deletion jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import json
import os
import socket
import typing as t
import uuid
from functools import wraps
from pathlib import Path

import zmq
from traitlets import Any, Bool, Dict, DottedObjectName, Instance, Unicode, default, observe
from traitlets.config.configurable import LoggingConfigurable
from traitlets.utils.importstring import import_item

from .connect import KernelConnectionInfo
from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager
from .manager import KernelManager
from .utils import ensure_async, run_sync
from .utils import ensure_async, run_sync, utcnow


class DuplicateKernelError(Exception):
Expand Down Expand Up @@ -105,9 +108,14 @@ def _context_default(self) -> zmq.Context:
return zmq.Context()

connection_dir = Unicode("")
external_connection_dir = Unicode(None, allow_none=True)

_kernels = Dict()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.kernel_id_to_connection_file = {}

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
Expand All @@ -123,6 +131,51 @@ def __del__(self):

def list_kernel_ids(self) -> t.List[str]:
"""Return a list of the kernel ids of the active kernels."""
if self.external_connection_dir is not None:
external_connection_dir = Path(self.external_connection_dir)
if external_connection_dir.is_dir():
connection_files = [p for p in external_connection_dir.iterdir() if p.is_file()]

# remove kernels (whose connection file has disappeared) from our list
k = list(self.kernel_id_to_connection_file.keys())
v = list(self.kernel_id_to_connection_file.values())
for connection_file in list(self.kernel_id_to_connection_file.values()):
if connection_file not in connection_files:
kernel_id = k[v.index(connection_file)]
del self.kernel_id_to_connection_file[kernel_id]
del self._kernels[kernel_id]

# add kernels (whose connection file appeared) to our list
for connection_file in connection_files:
if connection_file in self.kernel_id_to_connection_file.values():
continue
try:
connection_info: KernelConnectionInfo = json.loads(
connection_file.read_text()
)
except Exception: # noqa: S112
continue
self.log.debug("Loading connection file %s", connection_file)
if not ("kernel_name" in connection_info and "key" in connection_info):
continue
# it looks like a connection file
kernel_id = self.new_kernel_id()
self.kernel_id_to_connection_file[kernel_id] = connection_file
km = self.kernel_manager_factory(
parent=self,
log=self.log,
owns_kernel=False,
)
km.load_connection_info(connection_info)
km.last_activity = utcnow()
km.execution_state = "idle"
km.connections = 1
km.kernel_id = kernel_id
km.kernel_name = connection_info["kernel_name"]
km.ready.set_result(None)

self._kernels[kernel_id] = km

# Create a copy so we can iterate over kernels in operations
# that delete keys.
return list(self._kernels.keys())
Expand Down
33 changes: 33 additions & 0 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- vendor functions from ipython_genutils that should be retired at some point.
"""
import os
from datetime import datetime, timedelta, tzinfo

from jupyter_core.utils import ensure_async, run_sync # noqa: F401 # noqa: F401

Expand Down Expand Up @@ -83,3 +84,35 @@ def _expand_path(s):
if os.name == "nt":
s = s.replace("IPYTHON_TEMP", "$\\")
return s


# constant for zero offset
ZERO = timedelta(0)


class tzUTC(tzinfo): # noqa
"""tzinfo object for UTC (zero offset)"""

def utcoffset(self, d):
"""Compute utcoffset."""
return ZERO

def dst(self, d):
"""Compute dst."""
return ZERO


UTC = tzUTC() # type:ignore


def utc_aware(unaware):
"""decorator for adding UTC tzinfo to datetime's utcfoo methods"""

def utc_method(*args, **kwargs):
dt = unaware(*args, **kwargs)
return dt.replace(tzinfo=UTC)

return utc_method


utcnow = utc_aware(datetime.utcnow)

0 comments on commit b4f7d94

Please sign in to comment.