Skip to content

Commit

Permalink
Merge branch 'tickets/DM-43948'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed May 13, 2024
2 parents 5ee6a39 + 86e1e1f commit 9863090
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 65 deletions.
178 changes: 119 additions & 59 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import lsst.utils.timer
from lsst.resources import ResourcePath
import lsst.sphgeom
import lsst.afw.cameraGeom
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
Expand Down Expand Up @@ -437,73 +438,18 @@ def prep_butler(self) -> None:
with lsst.utils.timer.time_this(_log, msg="prep_butler", level=logging.DEBUG):
_log.info(f"Preparing Butler for visit {self.visit!r}")

detector = self.camera[self.visit.detector]
wcs = self._predict_wcs(detector)
center, radius = self._detector_bounding_circle(detector, wcs)
self._write_region_time() # Must be done before preprocessing pipeline

# repos may have been modified by other MWI instances.
# TODO: get a proper synchronization API for Butler
self.central_butler.registry.refresh()
self.butler.registry.refresh()

with time_this_to_bundle(bundle, action_id, "prep_butlerSearchTime"):
with lsst.utils.timer.time_this(_log, msg="prep_butler (find refcats)",
level=logging.DEBUG):
refcat_datasets = set(self._export_refcats(center, radius))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find templates)",
level=logging.DEBUG):
template_datasets = set(self._export_skymap_and_templates(
center, detector, wcs, self.visit.filters))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find calibs)",
level=logging.DEBUG):
calib_datasets = set(self._export_calibs(self.visit.detector, self.visit.filters))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find ML models)",
level=logging.DEBUG):
model_datasets = set(self._export_ml_models())
all_datasets, calib_datasets = self._find_data_to_preload()

with time_this_to_bundle(bundle, action_id, "prep_butlerTransferTime"):
with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer datasets)",
level=logging.DEBUG):
all_datasets = refcat_datasets | template_datasets | calib_datasets | model_datasets
transferred = self.butler.transfer_from(self.central_butler,
all_datasets,
transfer="copy",
skip_missing=True,
register_dataset_types=True,
transfer_dimensions=True,
)
if len(transferred) != len(all_datasets):
_log.warning("Downloaded only %d datasets out of %d; missing %s.",
len(transferred), len(all_datasets),
all_datasets - set(transferred))

with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer collections)",
level=logging.DEBUG):
# VALIDITY-HACK: ensure local <instrument>/calibs is a
# chain even if central collection isn't.
self.butler.registry.registerCollection(
self.instrument.makeCalibrationCollectionName(),
CollectionType.CHAINED,
)
self._export_collections(self._collection_template)
self._export_collections(self.instrument.makeUmbrellaCollectionName())
with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer associations)",
level=logging.DEBUG):
self._export_calib_associations(self.instrument.makeCalibrationCollectionName(),
calib_datasets)

# Temporary workarounds until we have a prompt-processing default top-level collection
# in shared repos, and raw collection in dev repo, and then we can organize collections
# without worrying about DRP use cases.
self.butler.collection_chains.prepend_chain(
self.instrument.makeUmbrellaCollectionName(),
[self._collection_template,
self.instrument.makeDefaultRawIngestRunName(),
# VALIDITY-HACK: account for case where source
# collection was CALIBRATION or omitted from
# umbrella.
self.instrument.makeCalibrationCollectionName(),
])
self._transfer_data(all_datasets, calib_datasets)

# IMPORTANT: do not remove or rename entries in this list. New entries can be added as needed.
enforce_schema(bundle, {action_id: ["prep_butlerTotalTime",
Expand All @@ -523,6 +469,36 @@ def prep_butler(self) -> None:
detector=self.visit.detector,
group=self.visit.groupId)

def _find_data_to_preload(self):
"""Identify the datasets to export from the central repo.
The returned datasets are a superset of those needed by any pipeline,
but exclude any datasets that are already present in the local repo.
Returns
-------
datasets : set [`~lsst.daf.butler.DatasetRef`]
The datasets to be exported, after any filtering.
calibs : set [`~lsst.daf.butler.DatasetRef`]
The subset of ``datasets`` representing calibs.
"""
detector = self.camera[self.visit.detector]
wcs = self._predict_wcs(detector)
center, radius = self._detector_bounding_circle(detector, wcs)

with lsst.utils.timer.time_this(_log, msg="prep_butler (find refcats)", level=logging.DEBUG):
refcat_datasets = set(self._export_refcats(center, radius))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find templates)", level=logging.DEBUG):
template_datasets = set(self._export_skymap_and_templates(
center, detector, wcs, self.visit.filters))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find calibs)", level=logging.DEBUG):
calib_datasets = set(self._export_calibs(self.visit.detector, self.visit.filters))
with lsst.utils.timer.time_this(_log, msg="prep_butler (find ML models)", level=logging.DEBUG):
model_datasets = set(self._export_ml_models())
return (refcat_datasets | template_datasets | calib_datasets | model_datasets,
calib_datasets,
)

def _export_refcats(self, center, radius):
"""Identify the refcats to export from the central butler.
Expand Down Expand Up @@ -678,6 +654,56 @@ def _export_ml_models(self):
_log.debug("Found %d new ML model datasets.", len(models))
return models

def _transfer_data(self, datasets, calibs):
"""Transfer datasets and all associated collections from the central
repo to the local repo.
Parameters
----------
datasets : set [`~lsst.daf.butler.DatasetRef`]
The datasets to transfer into the local repo.
calibs : set [`~lsst.daf.butler.DatasetRef`]
The calibs to re-certify into the local repo.
"""
with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer datasets)", level=logging.DEBUG):
transferred = self.butler.transfer_from(self.central_butler,
datasets,
transfer="copy",
skip_missing=True,
register_dataset_types=True,
transfer_dimensions=True,
)
if len(transferred) != len(datasets):
_log.warning("Downloaded only %d datasets out of %d; missing %s.",
len(transferred), len(datasets),
datasets - set(transferred))

with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer collections)", level=logging.DEBUG):
# VALIDITY-HACK: ensure local <instrument>/calibs is a
# chain even if central collection isn't.
self.butler.registry.registerCollection(
self.instrument.makeCalibrationCollectionName(),
CollectionType.CHAINED,
)
self._export_collections(self._collection_template)
self._export_collections(self.instrument.makeUmbrellaCollectionName())

with lsst.utils.timer.time_this(_log, msg="prep_butler (transfer associations)", level=logging.DEBUG):
self._export_calib_associations(self.instrument.makeCalibrationCollectionName(), calibs)

# Temporary workarounds until we have a prompt-processing default top-level collection
# in shared repos, and raw collection in dev repo, and then we can organize collections
# without worrying about DRP use cases.
self.butler.collection_chains.prepend_chain(
self.instrument.makeUmbrellaCollectionName(),
[self._collection_template,
self.instrument.makeDefaultRawIngestRunName(),
# VALIDITY-HACK: account for case where source
# collection was CALIBRATION or omitted from
# umbrella.
self.instrument.makeCalibrationCollectionName(),
])

def _export_collections(self, collection):
"""Export the collection and all its children.
Expand Down Expand Up @@ -786,6 +812,38 @@ def get_key(ref):
for k, g in itertools.groupby(ordered, key=get_key):
yield k, len(list(g))

def _write_region_time(self):
"""Store the approximate sky region and timespan for this
object's visit.
"""
detector = self.camera[self.visit.detector]
wcs = self._predict_wcs(detector)

# TODO: unify with other region estimates on DM-43712
center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS))
corners = wcs.pixelToSky(detector.getCorners(lsst.afw.cameraGeom.PIXELS))
padded = [c.offset(center.bearingTo(c), self.padding) for c in corners]
region = lsst.sphgeom.ConvexPolygon.convexHull([c.getVector() for c in padded])

# Assume a padded interval that's centered on the most probable time
# TODO: replace with self.visit.startTime after DM-38635
start = astropy.time.Time(self.visit.private_sndStamp, format="unix_tai")
end = start + 3.0 * self.visit.duration * astropy.units.second
timespan = Timespan(start, end)

self.butler.registry.registerDatasetType(DatasetType(
"regionTimeInfo",
dimensions={"instrument", "group", "detector"},
storageClass="RegionTimeInfo",
universe=self.butler.dimensions,
))
self.butler.put(lsst.pipe.base.utils.RegionTimeInfo(region=region, timespan=timespan),
"regionTimeInfo",
run=self._get_preload_run(self._day_obs),
instrument=self.instrument.getName(),
detector=self.visit.detector,
group=self.visit.groupId)

def _get_output_chain(self,
date: str) -> str:
"""Generate a deterministic output chain name that avoids
Expand Down Expand Up @@ -1040,10 +1098,12 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
pipeline = self._prep_pipeline(pipeline_file)
except FileNotFoundError as e:
raise RuntimeError from e
preload_run = self._get_preload_run(self._day_obs)
init_output_run = self._get_init_output_run(pipeline_file, self._day_obs)
output_run = self._get_output_run(pipeline_file, self._day_obs)
exec_butler = Butler(butler=self.butler,
collections=[output_run, init_output_run] + list(self.butler.collections),
collections=[output_run, init_output_run, preload_run]
+ list(self.butler.collections),
run=output_run)
factory = lsst.ctrl.mpexec.TaskFactory()
executor = SeparablePipelineExecutor(
Expand Down
30 changes: 24 additions & 6 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_init(self):
self.assertEqual(self.interface.rawIngestTask.config.failFast, True)
self.assertEqual(self.interface.rawIngestTask.config.transfer, "copy")

def _check_imports(self, butler, detector, expected_shards, expected_date):
def _check_imports(self, butler, group, detector, expected_shards, expected_date):
"""Test that the butler has the expected supporting data.
"""
self.assertEqual(butler.get('camera',
Expand Down Expand Up @@ -276,6 +276,21 @@ def _check_imports(self, butler, detector, expected_shards, expected_date):
collections=self.umbrella)
)

# Check that preloaded datasets have been generated
date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12)))
preload_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}/" \
"Preload/prompt-proto-service-042"
self.assertTrue(
butler.exists('promptPreload_metrics', instrument=instname, group=group, detector=detector,
full_check=True,
collections=preload_collection)
)
self.assertTrue(
butler.exists('regionTimeInfo', instrument=instname, group=group, detector=detector,
full_check=True,
collections=preload_collection)
)

def test_prep_butler(self):
"""Test that the butler has all necessary data for the next visit.
"""
Expand All @@ -286,7 +301,7 @@ def test_prep_butler(self):
# TODO DM-34112: check these shards again with some plots, once I've
# determined whether ci_hits2015 actually has enough shards.
expected_shards = {157394, 157401, 157405}
self._check_imports(self.interface.butler, detector=56,
self._check_imports(self.interface.butler, group="1", detector=56,
expected_shards=expected_shards, expected_date="20150218T000000Z")

def test_prep_butler_olddate(self):
Expand All @@ -305,9 +320,9 @@ def test_prep_butler_olddate(self):
expected_shards = {157394, 157401, 157405}
with self.assertRaises((AssertionError, lsst.daf.butler.registry.MissingCollectionError)):
# 20150218T000000Z run should not be imported
self._check_imports(self.interface.butler, detector=56,
self._check_imports(self.interface.butler, group="1", detector=56,
expected_shards=expected_shards, expected_date="20150218T000000Z")
self._check_imports(self.interface.butler, detector=56,
self._check_imports(self.interface.butler, group="1", detector=56,
expected_shards=expected_shards, expected_date="20150313T000000Z")

# TODO: prep_butler doesn't know what kinds of calibs to expect, so can't
Expand Down Expand Up @@ -345,7 +360,7 @@ def test_prep_butler_twice(self):

second_interface.prep_butler()
expected_shards = {157394, 157401, 157405}
self._check_imports(second_interface.butler, detector=56,
self._check_imports(second_interface.butler, group="2", detector=56,
expected_shards=expected_shards, expected_date="20150218T000000Z")

# Third visit with different detector and coordinates.
Expand All @@ -362,7 +377,7 @@ def test_prep_butler_twice(self):
prefix="file://")
third_interface.prep_butler()
expected_shards.update({157393, 157395})
self._check_imports(third_interface.butler, detector=5,
self._check_imports(third_interface.butler, group="3", detector=5,
expected_shards=expected_shards, expected_date="20150218T000000Z")

def test_ingest_image(self):
Expand Down Expand Up @@ -940,6 +955,9 @@ def _simulate_run(self):
butler_tests.addDatasetType(self.interface.central_butler, "promptPreload_metrics",
{"instrument", "group", "detector"},
"MetricMeasurementBundle")
butler_tests.addDatasetType(self.interface.central_butler, "regionTimeInfo",
{"instrument", "group", "detector"},
"RegionTimeInfo")
butler_tests.addDatasetType(self.interface.central_butler, "calexp",
{"instrument", "visit", "detector"},
"ExposureF")
Expand Down

0 comments on commit 9863090

Please sign in to comment.