Skip to content

Commit

Permalink
Add GracefulShutdownInterrupt handler to prep work.
Browse files Browse the repository at this point in the history
This handler lets us retry services that were shut down during
prep_butler or while waiting for raws. The outermost handler is still
needed to catch interrupts that slip through the cracks.
  • Loading branch information
kfindeisen committed Jul 19, 2024
1 parent 9c7f795 commit ae00209
Showing 1 changed file with 77 additions and 70 deletions.
147 changes: 77 additions & 70 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,76 +354,83 @@ def next_visit_handler() -> tuple[str, int]:
survey=expected_visit.survey,
detector=expected_visit.detector,
):
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
try:
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pre_pipelines,
main_pipelines,
skymap,
local_repo.name,
local_cache)
# Copy calibrations for this detector/visit
mwi.prep_butler()

# expected_visit.nimages == 0 means "not known in advance"; keep listening until timeout
expected_snaps = expected_visit.nimages if expected_visit.nimages else 100
# Heuristic: take the upcoming script's duration and multiply by 2 to
# include the currently executing script, then add time to transfer
# the last image.
timeout = expected_visit.duration * 2 + image_timeout
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
expected_visit.instrument,
expected_visit.groupId,
snap,
expected_visit.detector,
)
if oid:
_log.debug("Found object %s already present", oid)
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)

_log.debug("Waiting for snaps...")
start = time.time()
while len(expid_set) < expected_snaps and time.time() - start < timeout:
if startup_response:
response = startup_response
else:
time_remaining = max(0.0, timeout - (time.time() - start))
response = consumer.consume(num_messages=1, timeout=time_remaining + 1.0)
end = time.time()
messages = _filter_messages(response)
response = []
if len(messages) == 0 and end - start < timeout and not startup_response:
_log.debug(f"Empty consume after {end - start}s.")
continue
startup_response = []

# Not all notifications are for this group/detector
for received in messages:
for oid in _parse_bucket_notifications(received.value()):
try:
if is_path_consistent(oid, expected_visit):
_log.debug("Received %r", oid)
group_id = get_group_id_from_oid(oid)
if group_id == expected_visit.groupId:
# Ingest the snap
exp_id = mwi.ingest_image(oid)
expid_set.add(exp_id)
except ValueError:
_log.error(f"Failed to match object id '{oid}'")
# Commits are per-group, so this can't interfere with other
# workers. This may wipe messages associated with a next_visit
# that will later be assigned to this worker, but those cases
# should be caught by the "already arrived" check.
consumer.commit(message=received)
if len(expid_set) < expected_snaps:
_log.warning(f"Timed out waiting for image after receiving exposures {expid_set}.")
except GracefulShutdownInterrupt as e:
_log.exception("Processing interrupted before pipeline execution")
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"The server aborted processing, but it can be retried: {e}",
retry_after=10) from None

if expid_set:
with log_factory.add_context(exposures=expid_set):
Expand Down

0 comments on commit ae00209

Please sign in to comment.