Skip to content

Commit

Permalink
Merge branch 'tickets/DM-40434'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Jul 22, 2024
2 parents bb8b3d3 + 48bbde1 commit 7a1fb03
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 92 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ ENV APP_HOME /app
ENV PROMPT_PROCESSING_DIR $APP_HOME
# Normally defined in the Kubernetes config.
ENV WORKER_RESTART_FREQ ${WORKER_RESTART_FREQ:-0}
ENV WORKER_TIMEOUT ${WORKER_TIMEOUT:-0}
ENV WORKER_GRACE_PERIOD ${WORKER_GRACE_PERIOD:-30}
ARG PORT
WORKDIR $APP_HOME
COPY python/activator activator/
COPY pipelines pipelines/
CMD source /opt/lsst/software/stack/loadLSST.bash \
&& setup lsst_distrib \
&& exec gunicorn --workers 1 --threads 1 --timeout 0 --max-requests $WORKER_RESTART_FREQ \
&& exec gunicorn --workers 1 --threads 1 --timeout $WORKER_TIMEOUT --max-requests $WORKER_RESTART_FREQ \
--graceful-timeout $WORKER_GRACE_PERIOD \
--bind :$PORT activator.activator:app
217 changes: 141 additions & 76 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

__all__ = ["check_for_snap", "next_visit_handler"]

import collections.abc
import json
import logging
import os
import sys
import time
from typing import Optional, Tuple
import signal
import uuid

import boto3
Expand All @@ -37,7 +38,7 @@
from werkzeug.exceptions import ServiceUnavailable

from .config import PipelinesConfig
from .exception import NonRetriableError, RetriableError
from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError
from .logger import setup_usdf_logger
from .middleware_interface import get_central_butler, flush_local_repo, \
make_local_repo, make_local_cache, MiddlewareInterface
Expand Down Expand Up @@ -125,9 +126,58 @@ def find_local_repos(base_path):
sys.exit(3)


def _graceful_shutdown(signum: int, stack_frame):
"""Signal handler for cases where the service should gracefully shut down.
Parameters
----------
signum : `int`
The signal received.
stack_frame : `frame` or `None`
The "current" stack frame.
Raises
------
GracefulShutdownInterrupt
Raised unconditionally.
"""
signame = signal.Signals(signum).name
_log.info("Signal %s detected, cleaning up and shutting down.", signame)
# TODO DM-45339: raising in signal handlers is dangerous; can we get a way
# for pipeline processing to check for interrupts?
raise GracefulShutdownInterrupt(f"Received signal {signame}.")


def with_signal(signum: int,
handler: collections.abc.Callable | signal.Handlers,
) -> collections.abc.Callable:
"""A decorator that registers a signal handler for the duration of a
function call.
Parameters
----------
signum : `int`
The signal for which to register a handler; see `signal.signal`.
handler : callable or `signal.Handlers`
The handler to register.
"""
def decorator(func):
def wrapper(*args, **kwargs):
old_handler = signal.signal(signum, handler)
try:
return func(*args, **kwargs)
finally:
if old_handler is not None:
signal.signal(signum, old_handler)
else:
signal.signal(signum, signal.SIG_DFL)
return wrapper
return decorator


def check_for_snap(
instrument: str, group: int, snap: int, detector: int
) -> Optional[str]:
) -> str | None:
"""Search for new raw files matching a particular data ID.
The search is performed in the active image bucket.
Expand Down Expand Up @@ -266,7 +316,9 @@ def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logg


@app.route("/next-visit", methods=["POST"])
def next_visit_handler() -> Tuple[str, int]:
@with_signal(signal.SIGHUP, _graceful_shutdown)
@with_signal(signal.SIGTERM, _graceful_shutdown)
def next_visit_handler() -> tuple[str, int]:
"""A Flask view function for handling next-visit events.
Like all Flask handlers, this function accepts input through the
Expand Down Expand Up @@ -302,76 +354,83 @@ def next_visit_handler() -> Tuple[str, int]:
survey=expected_visit.survey,
detector=expected_visit.detector,
):
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
try:
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
except GracefulShutdownInterrupt as e:
_log.exception("Processing interrupted before pipeline execution")
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"The server aborted processing, but it can be retried: {e}",
retry_after=10) from None

if expid_set:
with log_factory.add_context(exposures=expid_set):
Expand All @@ -392,7 +451,7 @@ def next_visit_handler() -> Tuple[str, int]:
from e
except RetriableError as e:
error = e.nested if e.nested else e
_log.error("Processing failed: ", exc_info=error)
_log.error("Processing failed but can be retried: ", exc_info=error)
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"A temporary error occurred during processing: {error}",
Expand All @@ -414,14 +473,20 @@ def next_visit_handler() -> Tuple[str, int]:
else:
_log.error("Timed out waiting for images.")
return "Timed out waiting for images", 500
except GracefulShutdownInterrupt:
# Safety net to minimize chance of interrupt propagating out of the worker.
# Ideally, this would be a Flask.errorhandler, but Flask ignores BaseExceptions.
_log.error("Service interrupted. Shutting down *without* syncing to the central repo.")
return "The worker was interrupted before it could complete the request. " \
"Retrying the request may not be safe.", 500
finally:
consumer.unsubscribe()
# Want to know when the handler exited for any reason.
_log.info("next_visit handling completed.")


@app.errorhandler(500)
def server_error(e) -> Tuple[str, int]:
def server_error(e) -> tuple[str, int]:
_log.exception("An error occurred during a request.")
return (
f"""
Expand Down
13 changes: 12 additions & 1 deletion python/activator/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["NonRetriableError", "RetriableError"]
__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt"]


class NonRetriableError(Exception):
Expand Down Expand Up @@ -72,3 +72,14 @@ def nested(self):
return self.__context__
else:
return None


# See https://docs.python.org/3.11/library/exceptions.html#KeyboardInterrupt
# for why interrupts should not subclass Exception.
class GracefulShutdownInterrupt(BaseException):
"""An interrupt indicating that the service should shut down gracefully.
Like all interrupts, ``GracefulShutdownInterrupt`` can be raised between
*any* two bytecode instructions, and handling it requires special care. See
`the Python docs <https://docs.python.org/3.11/library/signal.html#handlers-and-exceptions>`__.
"""
Loading

0 comments on commit 7a1fb03

Please sign in to comment.