Skip to content

Commit

Permalink
Merge pull request #33 from lsst-sqre/tickets/DM-34864
Browse files Browse the repository at this point in the history
DM-34864: Harden JupyterLab lifecycle handling
  • Loading branch information
jonathansick authored May 24, 2022
2 parents 73cab95 + a39ae1b commit ff08698
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 340 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:
uses: pre-commit/[email protected]

- name: Install tox
run: pip install tox
run: |
pip install -U pip
pip install tox
- name: Cache tox environments
id: cache-tox
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
Change log
##########

0.3.0 (2022-05-24)
==================

Improved handling of the JupyterLab pod for noteburst workers:

- If the JupyterLab pod goes away (such as if it is culled), the Noteburst workers shuts down so that Kubernetes creates a new worker with a new JupyterLab pod. A lost JupyterLab pod is detected by a 400-class response when submitting a notebook for execution.

- If a worker starts up and a JupyterLab pod already exists for an unclaimed identity, the noteburst worker will continue to cycle through available worker identities until the JupyterLab start up is successful. This handles cases where a Noteburst worker restarts, but the JupyterLab pod did not shut down and thus is "orphaned."

- Each JupyterLab worker runs a "keep alive" function that exercises the JupyterLab pod's Python kernel. This is meant to counter the "culler" that deletes dormant JupyterLab pods in the Rubin Science Platform. Currently the keep alive function runs every 30 seconds.

- The default arq job execution timeout is now configurable with the ``NOTEBURST_WORKER_JOB_TIMEOUT`` environment variable. By default it is 300 seconds (5 minutes).

0.2.0 (2022-03-14)
==================

Expand Down
368 changes: 183 additions & 185 deletions requirements/dev.txt

Large diffs are not rendered by default.

240 changes: 118 additions & 122 deletions requirements/main.txt

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/noteburst/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ class WorkerConfig(Config):
"redis://localhost:6379/1", env="NOTEBURST_WORKER_LOCK_REDIS_URL"
)

job_timeout: int = Field(
300,
env="NOTEBURST_WORKER_JOB_TIMEOUT",
description=(
"The timeout, in seconds, for a job until it is timed out."
),
)

@property
def aioredlock_redis_config(self) -> List[str]:
"""Redis configurations for aioredlock."""
Expand Down
2 changes: 0 additions & 2 deletions src/noteburst/jupyterclient/jupyterlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@ def __str__(self) -> str:
f"{self.username}: status {self.status} ({self.reason}) from"
f" {self.method} {self.url}"
)
if self.body:
result += f"\nBody:\n{self.body}\n"
return result


Expand Down
3 changes: 2 additions & 1 deletion src/noteburst/worker/functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["ping", "nbexec", "run_python"]
__all__ = ["ping", "nbexec", "run_python", "keep_alive"]

from .keepalive import keep_alive
from .nbexec import nbexec
from .ping import ping
from .runpython import run_python
42 changes: 42 additions & 0 deletions src/noteburst/worker/functions/keepalive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Execute Python in the JupyterLab kernel to prevent it from being culled."""

from __future__ import annotations

import sys
from typing import Any, Dict

from noteburst.jupyterclient.jupyterlab import JupyterError


async def keep_alive(ctx: Dict[Any, Any]) -> str:
"""Execute Python code in a JupyterLab pod with a specific Jupyter kernel.
Parameters
----------
ctx
Arq worker context.
Returns
-------
result : str
The standard-out
"""
logger = ctx["logger"].bind(task="keep_alive")
logger.info("Running keep_alive")

jupyter_client = ctx["jupyter_client"]
try:
async with jupyter_client.open_lab_session(
kernel_name="LSST"
) as session:
await session.run_python("print('alive')")
except JupyterError as e:
logger.error("keep_alive error", jupyter_status=e.status)
if e.status >= 400 and e.status < 500:
logger.error(
"Authentication error to Jupyter. Forcing worker shutdown",
jupyter_status=e.status,
)
sys.exit("400 class error from Jupyter")

return "alive"
29 changes: 25 additions & 4 deletions src/noteburst/worker/functions/nbexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,42 @@
from __future__ import annotations

import json
import sys
from typing import Any, Dict

from arq import Retry

from noteburst.jupyterclient.jupyterlab import JupyterError


async def nbexec(
ctx: Dict[Any, Any], *, ipynb: str, kernel_name: str = "LSST"
) -> str:
logger = ctx["logger"].bind(task="nbexec")
logger = ctx["logger"].bind(
task="nbexec", job_attempt=ctx.get("job_try", -1)
)
logger.info("Running nbexec")

jupyter_client = ctx["jupyter_client"]

parsed_notebook = json.loads(ipynb)
logger.debug("Got ipynb", ipynb=parsed_notebook)
executed_notebook = await jupyter_client.execute_notebook(
parsed_notebook, kernel_name=kernel_name
)
try:
executed_notebook = await jupyter_client.execute_notebook(
parsed_notebook, kernel_name=kernel_name
)
logger.debug("nbexec success")
except JupyterError as e:
logger.error("nbexec error", jupyter_status=e.status)
if e.status >= 400 and e.status < 500:
logger.error(
"Authentication error to Jupyter. Forcing worker shutdown",
jupyter_status=e.status,
)
sys.exit("400 class error from Jupyter")
else:
# trigger re-try with increasing back-off
logger.warning("Triggering retry")
raise Retry(defer=ctx["job_try"] * 5)

return json.dumps(executed_notebook)
44 changes: 40 additions & 4 deletions src/noteburst/worker/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,19 @@ def from_config(cls, config: WorkerConfig) -> IdentityManager:

async def close(self) -> None:
"""Release any claimed identity and connection to Redis."""
await self._release_identity()
await self.lock_manager.destroy()
self._logger.info("Shut down identity manager")

async def _release_identity(self) -> None:
if self._current_identity is not None:
await self._current_identity.release()
self._current_identity = None
self._logger.info("Released worker user identity")
await self.lock_manager.destroy()
self._logger.info("Shut down identity manager")

async def get_identity(self) -> IdentityClaim:
async def get_identity(
self, _identities: Optional[List[IdentityModel]] = None
) -> IdentityClaim:
"""Get a unique identity (either claiming a new identity or providing
the already-claimed identity).
Expand All @@ -138,13 +143,18 @@ async def get_identity(self) -> IdentityClaim:
`IdentityClaim`
Information about the Science Platform identity.
"""
if _identities:
identities = _identities
else:
identities = self.identities

if self._current_identity:
if self._current_identity.valid:
return self._current_identity
else:
self._current_identity = None

for identity in self.identities:
for identity in identities:
try:
# We don't set the timeout argument on lock; in doing so we
# use aioredlock's built-in watchdog that renews locks.
Expand All @@ -164,3 +174,29 @@ async def get_identity(self) -> IdentityClaim:
raise IdentityClaimError(
"Could not claim an Science Platform identity (none available)."
)

async def get_next_identity(
self, prev_identity: IdentityClaim
) -> IdentityClaim:
"""Get the next available identity if the existing identity claim
did not result in a successful JupyterLab launch.
If a worker exits and the JupyterLab pod does not successfully close,
it becomes orphaned. If a new worker picks up the identity of the
orphaned JupyterLab pod, its start-up sequence will fail. This method
provides a way for the worker to try the next available identity in
that circumstance.
"""
await self._release_identity()

for i, identity in enumerate(self.identities):
if identity.username == prev_identity.username:
break

if i + 1 >= len(self.identities):
raise IdentityClaimError(
"Could not claim an Science Platform identity (none "
"available)."
)

return await self.get_identity(_identities=self.identities[i + 1 :])
60 changes: 39 additions & 21 deletions src/noteburst/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@

import httpx
import structlog
from arq import cron
from safir.logging import configure_logging

from noteburst.config import WorkerConfig
from noteburst.jupyterclient.jupyterlab import (
JupyterClient,
JupyterConfig,
JupyterError,
JupyterImageSelector,
)
from noteburst.jupyterclient.user import User

from .functions import nbexec, ping, run_python
from .functions import keep_alive, nbexec, ping, run_python
from .identity import IdentityManager

config = WorkerConfig()
Expand Down Expand Up @@ -44,33 +46,40 @@ async def startup(ctx: Dict[Any, Any]) -> None:
identity_manager = IdentityManager.from_config(config)
ctx["identity_manager"] = identity_manager

identity = await identity_manager.get_identity()

logger = logger.bind(worker_username=identity.username)

http_client = httpx.AsyncClient()
ctx["http_client"] = http_client

user = User(username=identity.username, uid=identity.uid)
authed_user = await user.login(
scopes=["exec:notebook"], http_client=http_client
)
logger.info("Authenticated the worker's user.")

jupyter_config = JupyterConfig(
image_selector=JupyterImageSelector.RECOMMENDED
)
jupyter_client = JupyterClient(
user=authed_user, logger=logger, config=jupyter_config
)
await jupyter_client.log_into_hub()
image_info = await jupyter_client.spawn_lab()
logger = logger.bind(image_ref=image_info.reference)
async for progress in jupyter_client.spawn_progress():
continue
await jupyter_client.log_into_lab()
ctx["jupyter_client"] = jupyter_client

identity = await identity_manager.get_identity()

while True:
logger = logger.bind(worker_username=identity.username)

user = User(username=identity.username, uid=identity.uid)
authed_user = await user.login(
scopes=["exec:notebook"], http_client=http_client
)
logger.info("Authenticated the worker's user.")

jupyter_client = JupyterClient(
user=authed_user, logger=logger, config=jupyter_config
)
await jupyter_client.log_into_hub()
try:
image_info = await jupyter_client.spawn_lab()
logger = logger.bind(image_ref=image_info.reference)
async for progress in jupyter_client.spawn_progress():
continue
await jupyter_client.log_into_lab()
break
except JupyterError:
logger.warning("Error spawning pod, will re-try with new identity")
identity = await identity_manager.get_next_identity(identity)

ctx["jupyter_client"] = jupyter_client
ctx["logger"] = logger

logger.info("Start up complete")
Expand Down Expand Up @@ -104,6 +113,11 @@ async def shutdown(ctx: Dict[Any, Any]) -> None:
logger.info("Worker shutdown complete.")


# For info on ignoring the type checking here, see
# https://github.com/samuelcolvin/arq/issues/249
cron_jobs = [cron(keep_alive, second={0, 30}, unique=False)] # type: ignore


class WorkerSettings:
"""Configuration for a Noteburst worker.
Expand All @@ -112,10 +126,14 @@ class WorkerSettings:

functions = [ping, nbexec, run_python]

cron_jobs = cron_jobs

redis_settings = config.arq_redis_settings

queue_name = config.queue_name

on_startup = startup

on_shutdown = shutdown

job_timeout = config.job_timeout

0 comments on commit ff08698

Please sign in to comment.