Skip to content

Commit

Permalink
Merge branch 'tickets/DM-43949'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed May 7, 2024
2 parents 97b3864 + f21fdd1 commit ac8f25c
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 75 deletions.
24 changes: 12 additions & 12 deletions bin/prompt_processing_upload_raws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ set -e # Abort on any error

shopt -s expand_aliases
alias aws="singularity exec /sdf/sw/s3/aws-cli_latest.sif aws \
--endpoint-url https://s3dfrgw.slac.stanford.edu"
--endpoint-url https://s3dfrgw.slac.stanford.edu"

RAW_DIR="/sdf/data/rubin/ddn/ncsa-datasets/hsc/raw/ssp_pdr2/2016-03-07"
UPLOAD_BUCKET=rubin-pp-users/unobserved
UPLOAD_BUCKET=rubin-pp-dev-users/unobserved


# Need to copy to a temp.fits first before uploading to the bucket because
Expand All @@ -44,27 +44,27 @@ fi
# Filename format is defined in activator/raw.py:
# instrument/detector/group/snap/exposureId/filter/instrument-group-snap-exposureId-filter-detector
cp "${RAW_DIR}/HSCA05913553.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-0.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/0059134/0/0059134/HSC-G/HSC-0059134-0-0059134-HSC-G-0.fits
cp "${RAW_DIR}/HSCA05913542.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/4/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-4.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/4/0059134/0/0059134/HSC-G/HSC-0059134-0-0059134-HSC-G-4.fits
cp "${RAW_DIR}/HSCA05913543.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-5.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/0059134/0/0059134/HSC-G/HSC-0059134-0-0059134-HSC-G-5.fits

cp "${RAW_DIR}/HSCA05914353.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-0.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/0059142/0/0059142/HSC-G/HSC-0059142-0-0059142-HSC-G-0.fits
cp "${RAW_DIR}/HSCA05914343.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-5.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/0059142/0/0059142/HSC-G/HSC-0059142-0-0059142-HSC-G-5.fits
cp "${RAW_DIR}/HSCA05914337.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/11/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-11.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/11/0059142/0/0059142/HSC-G/HSC-0059142-0-0059142-HSC-G-11.fits

cp "${RAW_DIR}/HSCA05915112.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/50/2016-03-07T00:00:00.000003/0/0059150/HSC-G/HSC-2016-03-07T00:00:00.000003-0-0059150-HSC-G-50.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/50/0059150/0/0059150/HSC-G/HSC-0059150-0-0059150-HSC-G-50.fits
cp "${RAW_DIR}/HSCA05915116.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/58/2016-03-07T00:00:00.000003/0/0059150/HSC-G/HSC-2016-03-07T00:00:00.000003-0-0059150-HSC-G-58.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/58/0059150/0/0059150/HSC-G/HSC-0059150-0-0059150-HSC-G-58.fits

cp "${RAW_DIR}/HSCA05916109.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/43/2016-03-07T00:00:00.000004/0/0059160/HSC-G/HSC-2016-03-07T00:00:00.000004-0-0059160-HSC-G-43.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/43/0059160/0/0059160/HSC-G/HSC-0059160-0-0059160-HSC-G-43.fits
cp "${RAW_DIR}/HSCA05916113.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/51/2016-03-07T00:00:00.000004/0/0059160/HSC-G/HSC-2016-03-07T00:00:00.000004-0-0059160-HSC-G-51.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/51/0059160/0/0059160/HSC-G/HSC-0059160-0-0059160-HSC-G-51.fits

rm temp.fits
57 changes: 29 additions & 28 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import lsst.afw.cameraGeom
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
from lsst.daf.butler import Butler, CollectionType, Timespan
from lsst.daf.butler import Butler, CollectionType, DatasetType, Timespan
from lsst.daf.butler.registry import MissingDatasetTypeError
import lsst.dax.apdb
import lsst.geom
Expand Down Expand Up @@ -270,6 +270,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi

self._init_local_butler(local_repo, [self.instrument.makeUmbrellaCollectionName()], None)
self._prep_collections()
self._define_dimensions()
self._init_ingester()
self._init_visit_definer()

Expand Down Expand Up @@ -359,6 +360,16 @@ def _init_visit_definer(self):
define_visits_config = lsst.obs.base.DefineVisitsConfig()
self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler)

def _define_dimensions(self):
"""Define any dimensions that must be computed from this object's visit.
``self._init_local_butler`` must have already been run.
"""
self.butler.registry.syncDimensionData("group",
{"name": self.visit.groupId,
"instrument": self.instrument.getName(),
})

def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector) -> lsst.afw.geom.SkyWcs:
"""Calculate the expected detector WCS for an incoming observation.
Expand Down Expand Up @@ -418,8 +429,6 @@ def prep_butler(self) -> None:
``visit`` dimensions, respectively. It may contain other data that would
not be loaded when processing the visit.
"""
# Timing metrics can't be saved to Butler (exposure/visit might not be
# defined), so manage them purely in-memory.
action_id = "prepButlerTimeMetric" # For consistency with analysis_tools outputs
bundle = lsst.analysis.tools.interfaces.MetricMeasurementBundle(
dataset_identifier=self.DATASET_IDENTIFIER,
Expand Down Expand Up @@ -501,24 +510,18 @@ def prep_butler(self) -> None:
"prep_butlerSearchTime",
"prep_butlerTransferTime",
]})
dispatcher = _get_sasquatch_dispatcher()
if dispatcher:
dispatcher.dispatch(
bundle,
run=self._get_preload_run(self._day_obs),
datasetType="promptPreload_metrics", # In case we have real Butler datasets in the future
identifierFields={"instrument": self.instrument.getName(),
"skymap": self.skymap_name,
"detector": self.visit.detector,
"physical_filter": self.visit.filters,
"band": self.butler.registry.expandDataId(
instrument=self.instrument.getName(),
physical_filter=self.visit.filters)["band"],
},
extraFields={"group": self.visit.groupId,
},
)
_log.debug(f"Uploaded preload metrics to {dispatcher.url}.")
self.butler.registry.registerDatasetType(DatasetType(
"promptPreload_metrics",
dimensions={"instrument", "group", "detector"},
storageClass="MetricMeasurementBundle",
universe=self.butler.dimensions,
))
self.butler.put(bundle,
"promptPreload_metrics",
run=self._get_preload_run(self._day_obs),
instrument=self.instrument.getName(),
detector=self.visit.detector,
group=self.visit.groupId)

def _export_refcats(self, center, radius):
"""Identify the refcats to export from the central butler.
Expand Down Expand Up @@ -866,6 +869,9 @@ def _prep_collections(self):
"""Pre-register output collections in advance of running the pipeline.
"""
self.butler.registry.refresh()
self.butler.registry.registerCollection(
self._get_preload_run(self._day_obs),
CollectionType.RUN)
for pipeline_file in self._get_pipeline_files():
self.butler.registry.registerCollection(
self._get_init_output_run(pipeline_file, self._day_obs),
Expand Down Expand Up @@ -1115,7 +1121,7 @@ def export_outputs(self, exposure_ids: set[int]) -> None:
Identifiers of the exposures that were processed.
"""
# Rather than determining which pipeline was run, just try to export all of them.
output_runs = []
output_runs = [self._get_preload_run(self._day_obs)]
for f in self._get_pipeline_files():
output_runs.extend([self._get_init_output_run(f, self._day_obs),
self._get_output_run(f, self._day_obs),
Expand Down Expand Up @@ -1148,7 +1154,7 @@ def export_outputs(self, exposure_ids: set[int]) -> None:
for bundle in bundles:
_log_trace.debug("Uploading %s...", bundle)
dispatcher.dispatchRef(self.butler.get(bundle), bundle)
_log.debug("Uploaded %d pipeline metrics to %s.", len(bundles), dispatcher.url)
_log.debug("Uploaded %d metrics to %s.", len(bundles), dispatcher.url)

@staticmethod
def _get_safe_dataset_types(butler):
Expand Down Expand Up @@ -1224,11 +1230,6 @@ def _export_subset(self, exposure_ids: set[int],
"day_obs",
"exposure",
"visit",
# TODO: visit_* are not needed from version 4; remove when we require v6
"visit_definition",
"visit_detector_region",
"visit_system",
"visit_system_membership",
]:
if dimension in self.butler.registry.dimensions:
records = self.butler.registry.queryDimensionRecords(
Expand Down
70 changes: 54 additions & 16 deletions python/tester/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,24 @@ def get_last_group(bucket, instrument, date):
group : `str`
The largest existing group for ``instrument``, or a newly generated
group if none exist.
Notes
-----
For LSST instruments, the group is an ISO 8601 timestamp to seconds
precision, followed by . and a six-digit counter. For HSC, it is a 4-digit
YMD string, followed by a four-digit counter, with the combination
guaranteed to be a valid HSC exposure ID.
"""
if instrument in _LSST_CAMERA_LIST:
blobs = bucket.objects.filter(
Prefix=f"{instrument}/{date}/",
)
numbers = [int(blob.key.split("/")[2].split("_")[-1]) for blob in blobs]
if numbers:
last_number = max(numbers)
else:
last_number = 0
return make_group(date, last_number)
else:
preblobs = bucket.objects.filter(
Prefix=f"{instrument}/",
Expand All @@ -92,15 +104,43 @@ def get_last_group(bucket, instrument, date):
(int(preblob.key.split("/")[1]) for preblob in preblobs), default=0
)

group_prefix = "-".join([date[:4], date[4:6:], date[-2:]])
group_prefix = make_compressed_date(date)
blobs = preblobs.filter(Prefix=f"{instrument}/{detector}/{group_prefix}")
numbers = [int(blob.key.split("/")[2][-6:]) for blob in blobs]
numbers = [int(blob.key.split("/")[2][4:]) for blob in blobs]

if numbers:
last_number = max(numbers)
else:
last_number = 0
return make_group(date, last_number)
if numbers:
last_number = max(numbers)
else:
last_number = 0
return group_prefix + ("%04d" % last_number)


def make_compressed_date(date):
"""Generate a day-unique string suitable for making integer IDs.
Parameters
----------
date : `str`
The current date in YYYYMMDD format.
Returns
-------
compressed : `str`
A digit sequence guaranteed to be unique for ``date``.
Notes
-----
The current implementation gives 4-digit results until September 2024.
If this generator is still needed after that, it will need to be tweaked.
"""
year = int(date[:4]) - 2023 # Always 1 digit, 0-1
night_id = int(date[-4:]) # Always 4 digits up to 1231
compressed = year*1200 + night_id # Always 4 digits
limit = max_exposure["HSC"] // 10_000
if compressed > limit:
raise RuntimeError(f"{date} compressed to {compressed}, "
f"max allowed is {limit}.")
return "%04d" % compressed


def make_exposure_id(instrument, group_id, snap):
Expand Down Expand Up @@ -163,12 +203,7 @@ def make_hsc_id(group_id, snap):
The current implementation gives illegal exposure IDs after September 2024.
If this generator is still needed after that, it will need to be tweaked.
"""
# This is a bit too dependent on how group_id is generated, but I want the
# group number to be discernible even after compressing to 8 digits.
date, run_id = decode_group(group_id) # run_id has up to 5 digits, but usually 2-3
year = int(date[:4]) - 2023 # Always 1 digit, 0-1
night_id = int(date[-4:]) # Always 4 digits up to 1231
exposure_id = (year*1200 + night_id) * 10000 + (run_id % 10000) # Always 8 digits
exposure_id = int(group_id)
if exposure_id > max_exposure["HSC"]:
raise RuntimeError(f"{group_id} translated to expId {exposure_id}, "
f"max allowed is { max_exposure['HSC']}.")
Expand Down Expand Up @@ -323,6 +358,9 @@ def increment_group(instrument, group_base, amount):
The numerical amount depends on the implementation for ths
``intrument``.
"""
day_obs, seq_num = decode_group(group_base)
seq_num += amount
return make_group(day_obs, seq_num)
if instrument in _LSST_CAMERA_LIST:
day_obs, seq_num = decode_group(group_base)
seq_num += amount
return make_group(day_obs, seq_num)
else:
return str(int(group_base) + amount)
Binary file modified tests/data/central_repo/gen3.sqlite3
Binary file not shown.
29 changes: 21 additions & 8 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ def fake_file_data(filename, dimensions, instrument, visit):
universe=dimensions)

start_time = astropy.time.Time("2015-02-18T05:28:18.716517500", scale="tai")
day_obs = 20150217
obs_info = astro_metadata_translator.makeObservationInfo(
instrument=instrument.getName(),
datetime_begin=start_time,
datetime_end=start_time + 30*u.second,
exposure_id=exposure_id,
exposure_group=visit.groupId,
visit_id=exposure_id,
boresight_rotation_angle=astropy.coordinates.Angle(visit.cameraAngle*u.degree),
boresight_rotation_coord=visit.rotationSystem.name.lower(),
Expand All @@ -105,6 +107,7 @@ def fake_file_data(filename, dimensions, instrument, visit):
physical_filter=filter,
exposure_time=30.0*u.second,
observation_type="science",
observing_day=day_obs,
group_counter_start=exposure_id,
group_counter_end=exposure_id,
)
Expand Down Expand Up @@ -907,6 +910,7 @@ def setUp(self):
self.second_interface = MiddlewareInterface(central_butler, self.input_data, self.second_visit,
pipelines, skymap_name, second_local_repo.name,
prefix="file://")
self.second_interface.prep_butler()
date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12)))
self.output_chain = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}"
self.output_run = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \
Expand All @@ -933,6 +937,9 @@ def _simulate_run(self):
for k, v in self.second_data_id.required.items()}
# Dataset types defined for local Butler on pipeline run, but code
# assumes output types already exist in central repo.
butler_tests.addDatasetType(self.interface.central_butler, "promptPreload_metrics",
{"instrument", "group", "detector"},
"MetricMeasurementBundle")
butler_tests.addDatasetType(self.interface.central_butler, "calexp",
{"instrument", "visit", "detector"},
"ExposureF")
Expand All @@ -959,14 +966,20 @@ def test_extra_collection(self):
central_butler.registry.registerCollection("emptyrun", CollectionType.RUN)
central_butler.collection_chains.prepend_chain("refcats", ["emptyrun"])

self.interface.prep_butler()

self.assertEqual(
self._count_datasets(self.interface.butler, "gaia_dr2_20200414", f"{instname}/defaults"),
3)
self.assertIn(
"emptyrun",
self.interface.butler.registry.queryCollections("refcats", flattenChains=True))
# Avoid collisions with other calls to prep_butler
with make_local_repo(tempfile.gettempdir(), central_butler, instname) as local_repo:
interface = MiddlewareInterface(central_butler, self.input_data,
dataclasses.replace(self.next_visit, groupId="42"),
pipelines, skymap_name, local_repo,
prefix="file://")
interface.prep_butler()

self.assertEqual(
self._count_datasets(interface.butler, "gaia_dr2_20200414", f"{instname}/defaults"),
3)
self.assertIn(
"emptyrun",
interface.butler.registry.queryCollections("refcats", flattenChains=True))

def test_export_outputs(self):
self.interface.export_outputs({self.raw_data_id["exposure"]})
Expand Down
Loading

0 comments on commit ac8f25c

Please sign in to comment.