Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async start hook to ExtensionApp API #1417

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docs/source/developers/extensions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,29 @@ Then add this handler to Jupyter Server's Web Application through the ``_load_ju
serverapp.web_app.add_handlers(".*$", handlers)


Starting asynchronous tasks from an extension
---------------------------------------------

.. versionadded:: 2.15.0

Jupyter Server offers a simple API for starting asynchronous tasks from a server extension. This is useful for calling
async tasks after the event loop is running.

The function should be named ``_start_jupyter_server_extension`` and found next to the ``_load_jupyter_server_extension`` function.

Here is basic example:

.. code-block:: python

import asyncio

async def _start_jupyter_server_extension(serverapp: jupyter_server.serverapp.ServerApp):
"""
This function is called after the server's event loop is running.
"""
await asyncio.sleep(.1)


Making an extension discoverable
--------------------------------

Expand Down Expand Up @@ -117,6 +140,7 @@ An ExtensionApp:
- has an entrypoint, ``jupyter <name>``.
- can serve static content from the ``/static/<name>/`` endpoint.
- can add new endpoints to the Jupyter Server.
- can start asynchronous tasks after the server has started.

The basic structure of an ExtensionApp is shown below:

Expand Down Expand Up @@ -156,6 +180,11 @@ The basic structure of an ExtensionApp is shown below:
...
# Change the jinja templating environment

async def _start_jupyter_server_extension(self):
...
# Extend this method to start any (e.g. async) tasks
# after the main Server's Event Loop is running.

async def stop_extension(self):
...
# Perform any required shut down steps
Expand All @@ -171,6 +200,7 @@ Methods
* ``initialize_settings()``: adds custom settings to the Tornado Web Application.
* ``initialize_handlers()``: appends handlers to the Tornado Web Application.
* ``initialize_templates()``: initialize the templating engine (e.g. jinja2) for your frontend.
* ``_start_jupyter_server_extension()``: enables the extension to start (async) tasks _after_ the server's main Event Loop has started.
* ``stop_extension()``: called on server shut down.

Properties
Expand Down Expand Up @@ -320,6 +350,43 @@ pointing at the ``load_classic_server_extension`` method:
If the extension is enabled, the extension will be loaded when the server starts.


Starting asynchronous tasks from an ExtensionApp
------------------------------------------------

.. versionadded:: 2.15.0


An ``ExtensionApp`` can start asynchronous tasks after Jupyter Server's event loop is started by overriding its
``_start_jupyter_server_extension()`` method. This can be helpful for setting up e.g. background tasks.

Here is a basic (pseudo) code example:

.. code-block:: python

import asyncio
import time


async def log_time_periodically(log, dt=1):
"""Log the current time from a periodic loop."""
while True:
current_time = time.time()
log.info(current_time)
await sleep(dt)


class MyExtension(ExtensionApp):
...

async def _start_jupyter_server_extension(self):
self.my_background_task = asyncio.create_task(
log_time_periodically(self.log)
)

async def stop_extension(self):
self.my_background_task.cancel()


Distributing a server extension
===============================

Expand Down
12 changes: 12 additions & 0 deletions jupyter_server/extension/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,18 @@ def _load_jupyter_server_extension(cls, serverapp):
extension.initialize()
return extension

async def _start_jupyter_server_extension(self, serverapp):
"""
An async hook to start e.g. tasks from the extension after
the server's event loop is running.

Override this method (no need to call `super()`) to
start (async) tasks from an extension.

This is useful for starting e.g. background tasks from
an extension.
"""

@classmethod
def load_classic_server_extension(cls, serverapp):
"""Enables extension to be loaded as classic Notebook (jupyter/notebook) extension."""
Expand Down
60 changes: 59 additions & 1 deletion jupyter_server/extension/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ def _get_loader(self):
loader = get_loader(loc)
return loader

def _get_starter(self):
"""Get a starter function."""
if self.app:
linker = self.app._start_jupyter_server_extension
else:

async def _noop_start(serverapp):
return

linker = getattr(
self.module,
# Search for a _start_jupyter_extension
"_start_jupyter_server_extension",
# Otherwise return a no-op function.
_noop_start,
)
return linker

def validate(self):
"""Check that both a linker and loader exists."""
try:
Expand Down Expand Up @@ -150,6 +168,13 @@ def load(self, serverapp):
loader = self._get_loader()
return loader(serverapp)

def start(self, serverapp):
"""Call's the extensions 'start' hook where it can
start (possibly async) tasks _after_ the event loop is running.
"""
starter = self._get_starter()
return starter(serverapp)


class ExtensionPackage(LoggingConfigurable):
"""An API for interfacing with a Jupyter Server extension package.
Expand Down Expand Up @@ -222,6 +247,11 @@ def load_point(self, point_name, serverapp):
point = self.extension_points[point_name]
return point.load(serverapp)

def start_point(self, point_name, serverapp):
"""Load an extension point."""
point = self.extension_points[point_name]
return point.start(serverapp)

def link_all_points(self, serverapp):
"""Link all extension points."""
for point_name in self.extension_points:
Expand All @@ -231,9 +261,14 @@ def load_all_points(self, serverapp):
"""Load all extension points."""
return [self.load_point(point_name, serverapp) for point_name in self.extension_points]

async def start_all_points(self, serverapp):
"""Load all extension points."""
for point_name in self.extension_points:
await self.start_point(point_name, serverapp)


class ExtensionManager(LoggingConfigurable):
"""High level interface for findind, validating,
"""High level interface for finding, validating,
linking, loading, and managing Jupyter Server extensions.

Usage:
Expand Down Expand Up @@ -367,6 +402,22 @@ def load_extension(self, name):
else:
self.log.info("%s | extension was successfully loaded.", name)

async def start_extension(self, name):
"""Start an extension by name."""
extension = self.extensions.get(name)

if extension and extension.enabled:
try:
await extension.start_all_points(self.serverapp)
except Exception as e:
if self.serverapp and self.serverapp.reraise_server_extension_failures:
raise
self.log.warning(
"%s | extension failed starting with message: %r", name, e, exc_info=True
)
else:
self.log.debug("%s | extension was successfully started.", name)

async def stop_extension(self, name, apps):
"""Call the shutdown hooks in the specified apps."""
for app in apps:
Expand All @@ -392,6 +443,13 @@ def load_all_extensions(self):
for name in self.sorted_extensions:
self.load_extension(name)

async def start_all_extensions(self):
"""Start all enabled extensions."""
# Sort the extension names to enforce deterministic loading
# order.
for name in self.sorted_extensions:
await self.start_extension(name)

async def stop_all_extensions(self):
"""Call the shutdown hooks in all extensions."""
await multi(list(starmap(self.stop_extension, sorted(dict(self.extension_apps).items()))))
Expand Down
120 changes: 66 additions & 54 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2999,6 +2999,72 @@ def start_app(self) -> None:
)
self.exit(1)

self.write_server_info_file()

if not self.no_browser_open_file:
self.write_browser_open_files()

# Handle the browser opening.
if self.open_browser and not self.sock:
self.launch_browser()

async def _cleanup(self) -> None:
"""General cleanup of files, extensions and kernels created
by this instance ServerApp.
"""
self.remove_server_info_file()
self.remove_browser_open_files()
await self.cleanup_extensions()
await self.cleanup_kernels()
try:
await self.kernel_websocket_connection_class.close_all() # type:ignore[attr-defined]
except AttributeError:
# This can happen in two different scenarios:
#
# 1. During tests, where the _cleanup method is invoked without
# the corresponding initialize method having been invoked.
# 2. If the provided `kernel_websocket_connection_class` does not
# implement the `close_all` class method.
#
# In either case, we don't need to do anything and just want to treat
# the raised error as a no-op.
pass
if getattr(self, "kernel_manager", None):
self.kernel_manager.__del__()
if getattr(self, "session_manager", None):
self.session_manager.close()
if hasattr(self, "http_server"):
# Stop a server if its set.
self.http_server.stop()

def start_ioloop(self) -> None:
"""Start the IO Loop."""
if sys.platform.startswith("win"):
# add no-op to wake every 5s
# to handle signals that may be ignored by the inner loop
pc = ioloop.PeriodicCallback(lambda: None, 5000)
pc.start()
try:
self.io_loop.add_callback(self._post_start)
self.io_loop.start()
except KeyboardInterrupt:
self.log.info(_i18n("Interrupted..."))

def init_ioloop(self) -> None:
"""init self.io_loop so that an extension can use it by io_loop.call_later() to create background tasks"""
self.io_loop = ioloop.IOLoop.current()

async def _post_start(self):
"""Add an async hook to start tasks after the event loop is running.

This will also attempt to start all tasks found in
the `start_extension` method in Extension Apps.
"""
try:
await self.extension_manager.start_all_extensions()
except Exception as err:
self.log.error(err)

info = self.log.info
for line in self.running_server_info(kernel_count=False).split("\n"):
info(line)
Expand All @@ -3017,15 +3083,6 @@ def start_app(self) -> None:
)
)

self.write_server_info_file()

if not self.no_browser_open_file:
self.write_browser_open_files()

# Handle the browser opening.
if self.open_browser and not self.sock:
self.launch_browser()

if self.identity_provider.token and self.identity_provider.token_generated:
# log full URL with generated token, so there's a copy/pasteable link
# with auth info.
Expand Down Expand Up @@ -3066,51 +3123,6 @@ def start_app(self) -> None:

self.log.critical("\n".join(message))

async def _cleanup(self) -> None:
"""General cleanup of files, extensions and kernels created
by this instance ServerApp.
"""
self.remove_server_info_file()
self.remove_browser_open_files()
await self.cleanup_extensions()
await self.cleanup_kernels()
try:
await self.kernel_websocket_connection_class.close_all() # type:ignore[attr-defined]
except AttributeError:
# This can happen in two different scenarios:
#
# 1. During tests, where the _cleanup method is invoked without
# the corresponding initialize method having been invoked.
# 2. If the provided `kernel_websocket_connection_class` does not
# implement the `close_all` class method.
#
# In either case, we don't need to do anything and just want to treat
# the raised error as a no-op.
pass
if getattr(self, "kernel_manager", None):
self.kernel_manager.__del__()
if getattr(self, "session_manager", None):
self.session_manager.close()
if hasattr(self, "http_server"):
# Stop a server if its set.
self.http_server.stop()

def start_ioloop(self) -> None:
"""Start the IO Loop."""
if sys.platform.startswith("win"):
# add no-op to wake every 5s
# to handle signals that may be ignored by the inner loop
pc = ioloop.PeriodicCallback(lambda: None, 5000)
pc.start()
try:
self.io_loop.start()
except KeyboardInterrupt:
self.log.info(_i18n("Interrupted..."))

def init_ioloop(self) -> None:
"""init self.io_loop so that an extension can use it by io_loop.call_later() to create background tasks"""
self.io_loop = ioloop.IOLoop.current()

def start(self) -> None:
"""Start the Jupyter server app, after initialization

Expand Down
6 changes: 5 additions & 1 deletion tests/extension/mockextensions/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from jupyter_events import EventLogger
from jupyter_events.schema_registry import SchemaRegistryException
from traitlets import List, Unicode
from traitlets import Bool, List, Unicode

from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.extension.application import ExtensionApp, ExtensionAppJinjaMixin
Expand Down Expand Up @@ -50,6 +50,7 @@ class MockExtensionApp(ExtensionAppJinjaMixin, ExtensionApp):
static_paths = [STATIC_PATH] # type:ignore[assignment]
mock_trait = Unicode("mock trait", config=True)
loaded = False
started = Bool(False)

serverapp_config = {"jpserver_extensions": {"tests.extension.mockextensions.mock1": True}}

Expand All @@ -71,6 +72,9 @@ def initialize_handlers(self):
self.handlers.append(("/mock_template", MockExtensionTemplateHandler))
self.loaded = True

async def _start_jupyter_server_extension(self, serverapp):
self.started = True


if __name__ == "__main__":
MockExtensionApp.launch_instance()
Loading
Loading