Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40434: Prompt Processing continues processing after timeout or termination #182

Merged
merged 9 commits into from
Jul 22, 2024
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ ENV APP_HOME /app
ENV PROMPT_PROCESSING_DIR $APP_HOME
# Normally defined in the Kubernetes config.
ENV WORKER_RESTART_FREQ ${WORKER_RESTART_FREQ:-0}
ENV WORKER_TIMEOUT ${WORKER_TIMEOUT:-0}
ENV WORKER_GRACE_PERIOD ${WORKER_GRACE_PERIOD:-30}
ARG PORT
WORKDIR $APP_HOME
COPY python/activator activator/
COPY pipelines pipelines/
CMD source /opt/lsst/software/stack/loadLSST.bash \
&& setup lsst_distrib \
&& exec gunicorn --workers 1 --threads 1 --timeout 0 --max-requests $WORKER_RESTART_FREQ \
&& exec gunicorn --workers 1 --threads 1 --timeout $WORKER_TIMEOUT --max-requests $WORKER_RESTART_FREQ \
--graceful-timeout $WORKER_GRACE_PERIOD \
--bind :$PORT activator.activator:app
217 changes: 141 additions & 76 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

__all__ = ["check_for_snap", "next_visit_handler"]

import collections.abc
import json
import logging
import os
import sys
import time
from typing import Optional, Tuple
import signal
import uuid

import boto3
Expand All @@ -37,7 +38,7 @@
from werkzeug.exceptions import ServiceUnavailable

from .config import PipelinesConfig
from .exception import NonRetriableError, RetriableError
from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError
from .logger import setup_usdf_logger
from .middleware_interface import get_central_butler, flush_local_repo, \
make_local_repo, make_local_cache, MiddlewareInterface
Expand Down Expand Up @@ -125,9 +126,58 @@ def find_local_repos(base_path):
sys.exit(3)


def _graceful_shutdown(signum: int, stack_frame):
"""Signal handler for cases where the service should gracefully shut down.

Parameters
----------
signum : `int`
The signal received.
stack_frame : `frame` or `None`
The "current" stack frame.

Raises
------
GracefulShutdownInterrupt
Raised unconditionally.
"""
signame = signal.Signals(signum).name
_log.info("Signal %s detected, cleaning up and shutting down.", signame)
# TODO DM-45339: raising in signal handlers is dangerous; can we get a way
# for pipeline processing to check for interrupts?
raise GracefulShutdownInterrupt(f"Received signal {signame}.")


def with_signal(signum: int,
handler: collections.abc.Callable | signal.Handlers,
) -> collections.abc.Callable:
"""A decorator that registers a signal handler for the duration of a
function call.

Parameters
----------
signum : `int`
The signal for which to register a handler; see `signal.signal`.
handler : callable or `signal.Handlers`
The handler to register.
"""
def decorator(func):
def wrapper(*args, **kwargs):
old_handler = signal.signal(signum, handler)
try:
return func(*args, **kwargs)
finally:
if old_handler is not None:
signal.signal(signum, old_handler)
else:
signal.signal(signum, signal.SIG_DFL)
return wrapper
return decorator


def check_for_snap(
instrument: str, group: int, snap: int, detector: int
) -> Optional[str]:
) -> str | None:
"""Search for new raw files matching a particular data ID.

The search is performed in the active image bucket.
Expand Down Expand Up @@ -266,7 +316,9 @@ def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logg


@app.route("/next-visit", methods=["POST"])
def next_visit_handler() -> Tuple[str, int]:
@with_signal(signal.SIGHUP, _graceful_shutdown)
@with_signal(signal.SIGTERM, _graceful_shutdown)
def next_visit_handler() -> tuple[str, int]:
"""A Flask view function for handling next-visit events.

Like all Flask handlers, this function accepts input through the
Expand Down Expand Up @@ -302,76 +354,83 @@ def next_visit_handler() -> Tuple[str, int]:
survey=expected_visit.survey,
detector=expected_visit.detector,
):
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
try:
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
except GracefulShutdownInterrupt as e:
_log.exception("Processing interrupted before pipeline execution")
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"The server aborted processing, but it can be retried: {e}",
retry_after=10) from None

if expid_set:
with log_factory.add_context(exposures=expid_set):
Expand All @@ -392,7 +451,7 @@ def next_visit_handler() -> Tuple[str, int]:
from e
except RetriableError as e:
error = e.nested if e.nested else e
_log.error("Processing failed: ", exc_info=error)
_log.error("Processing failed but can be retried: ", exc_info=error)
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"A temporary error occurred during processing: {error}",
Expand All @@ -414,14 +473,20 @@ def next_visit_handler() -> Tuple[str, int]:
else:
_log.error("Timed out waiting for images.")
return "Timed out waiting for images", 500
except GracefulShutdownInterrupt:
# Safety net to minimize chance of interrupt propagating out of the worker.
# Ideally, this would be a Flask.errorhandler, but Flask ignores BaseExceptions.
_log.error("Service interrupted. Shutting down *without* syncing to the central repo.")
return "The worker was interrupted before it could complete the request. " \
"Retrying the request may not be safe.", 500
finally:
consumer.unsubscribe()
# Want to know when the handler exited for any reason.
_log.info("next_visit handling completed.")


@app.errorhandler(500)
def server_error(e) -> Tuple[str, int]:
def server_error(e) -> tuple[str, int]:
_log.exception("An error occurred during a request.")
return (
f"""
Expand Down
13 changes: 12 additions & 1 deletion python/activator/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["NonRetriableError", "RetriableError"]
__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt"]


class NonRetriableError(Exception):
Expand Down Expand Up @@ -72,3 +72,14 @@ def nested(self):
return self.__context__
else:
return None


# See https://docs.python.org/3.11/library/exceptions.html#KeyboardInterrupt
# for why interrupts should not subclass Exception.
class GracefulShutdownInterrupt(BaseException):
"""An interrupt indicating that the service should shut down gracefully.

Like all interrupts, ``GracefulShutdownInterrupt`` can be raised between
*any* two bytecode instructions, and handling it requires special care. See
`the Python docs <https://docs.python.org/3.11/library/signal.html#handlers-and-exceptions>`__.
"""
Loading
Loading