Skip to content

Commit

Permalink
Merge pull request #88 from lsst-dm/tickets/DM-37387
Browse files Browse the repository at this point in the history
DM-37387: A tester script using LATISS data
  • Loading branch information
hsinfang authored Oct 18, 2023
2 parents 998349b + 6f3b2bf commit 3ee2790
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 102 deletions.
58 changes: 34 additions & 24 deletions bin/prompt_prototype_upload_raws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,41 @@ shopt -s expand_aliases
alias aws="singularity exec /sdf/sw/s3/aws-cli_latest.sif aws \
--endpoint-url https://s3dfrgw.slac.stanford.edu"

RAW_DIR="/sdf/group/rubin/datasets/hsc/raw/ssp_pdr2/2016-03-07"
RAW_DIR="/sdf/data/rubin/ddn/ncsa-datasets/hsc/raw/ssp_pdr2/2016-03-07"
UPLOAD_BUCKET=rubin-pp-users/unobserved


# Need to copy to a temp.fits first before uploading to the bucket because
# of current permission of these HSC files.
if [[ -f temp.fits ]]; then
echo "temp.fits exists in the folder. Exit."
exit 1
fi

# Filename format is defined in activator/raw.py:
# instrument/detector/group/snap/exposureId/filter/instrument-group-snap-exposureId-filter-detector
aws s3 cp "${RAW_DIR}/HSCA05913553.fits" \
s3://${UPLOAD_BUCKET}/HSC/0/2016030700001/0/0059134/HSC-G/HSC-2016030700001-0-0059134-HSC-G-0.fits
aws s3 cp "${RAW_DIR}/HSCA05913542.fits" \
s3://${UPLOAD_BUCKET}/HSC/4/2016030700001/0/0059134/HSC-G/HSC-2016030700001-0-0059134-HSC-G-4.fits
aws s3 cp "${RAW_DIR}/HSCA05913543.fits" \
s3://${UPLOAD_BUCKET}/HSC/5/2016030700001/0/0059134/HSC-G/HSC-2016030700001-0-0059134-HSC-G-5.fits

aws s3 cp "${RAW_DIR}/HSCA05914353.fits" \
s3://${UPLOAD_BUCKET}/HSC/0/2016030700002/0/0059142/HSC-G/HSC-2016030700002-0-0059142-HSC-G-0.fits
aws s3 cp "${RAW_DIR}/HSCA05914343.fits" \
s3://${UPLOAD_BUCKET}/HSC/5/2016030700002/0/0059142/HSC-G/HSC-2016030700002-0-0059142-HSC-G-5.fits
aws s3 cp "${RAW_DIR}/HSCA05914337.fits" \
s3://${UPLOAD_BUCKET}/HSC/11/2016030700002/0/0059142/HSC-G/HSC-2016030700002-0-0059142-HSC-G-11.fits

aws s3 cp "${RAW_DIR}/HSCA05915112.fits" \
s3://${UPLOAD_BUCKET}/HSC/50/2016030700003/0/0059150/HSC-G/HSC-2016030700003-0-0059150-HSC-G-50.fits
aws s3 cp "${RAW_DIR}/HSCA05915116.fits" \
s3://${UPLOAD_BUCKET}/HSC/58/2016030700003/0/0059150/HSC-G/HSC-2016030700003-0-0059150-HSC-G-58.fits

aws s3 cp "${RAW_DIR}/HSCA05916109.fits" \
s3://${UPLOAD_BUCKET}/HSC/43/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-43.fits
aws s3 cp "${RAW_DIR}/HSCA05916113.fits" \
s3://${UPLOAD_BUCKET}/HSC/51/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-51.fits
cp "${RAW_DIR}/HSCA05913553.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-0.fits
cp "${RAW_DIR}/HSCA05913542.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/4/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-4.fits
cp "${RAW_DIR}/HSCA05913543.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/2016-03-07T00:00:00.000001/0/0059134/HSC-G/HSC-2016-03-07T00:00:00.000001-0-0059134-HSC-G-5.fits

cp "${RAW_DIR}/HSCA05914353.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/0/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-0.fits
cp "${RAW_DIR}/HSCA05914343.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/5/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-5.fits
cp "${RAW_DIR}/HSCA05914337.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/11/2016-03-07T00:00:00.000002/0/0059142/HSC-G/HSC-2016-03-07T00:00:00.000002-0-0059142-HSC-G-11.fits

cp "${RAW_DIR}/HSCA05915112.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/50/2016-03-07T00:00:00.000003/0/0059150/HSC-G/HSC-2016-03-07T00:00:00.000003-0-0059150-HSC-G-50.fits
cp "${RAW_DIR}/HSCA05915116.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/58/2016-03-07T00:00:00.000003/0/0059150/HSC-G/HSC-2016-03-07T00:00:00.000003-0-0059150-HSC-G-58.fits

cp "${RAW_DIR}/HSCA05916109.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/43/2016-03-07T00:00:00.000004/0/0059160/HSC-G/HSC-2016-03-07T00:00:00.000004-0-0059160-HSC-G-43.fits
cp "${RAW_DIR}/HSCA05916113.fits" temp.fits
aws s3 cp temp.fits s3://${UPLOAD_BUCKET}/HSC/51/2016-03-07T00:00:00.000004/0/0059160/HSC-G/HSC-2016-03-07T00:00:00.000004-0-0059160-HSC-G-51.fits

rm temp.fits
7 changes: 5 additions & 2 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,18 @@ They then upload a batch of files representing the snaps of the visit to the ``r

Eventually a set of parallel processes running on multiple nodes will be needed to upload the images sufficiently rapidly.

``python/tester/upload.py``: Command line arguments are the instrument name (currently HSC only) and the number of groups of images to send.
``python/tester/upload.py``: Command line arguments are the instrument name (currently HSC or LATISS) and the number of groups of images to send.

Sample command line:

.. code-block:: sh
python upload.py HSC 3
python upload.py LATISS 2
This draw images from 4 groups, in total 10 raw files, stored in the ``rubin:rubin-pp-users`` bucket.
This script draws images stored in the ``rubin:rubin-pp-users`` bucket.
For HSC, 4 groups, in total 10 raw files, are curated.
For LATISS, 2 groups, in total 2 raw fits files and their corresponding json metadata files, are curated.

``python/tester/upload_hsc_rc2.py``: Command line argument is the number of groups of images to send.

Expand Down
129 changes: 114 additions & 15 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
import datetime
import itertools
import json
import logging
import random
import re
Expand All @@ -10,9 +12,23 @@
import boto3
from botocore.handlers import validate_bucket_name

from activator.raw import OTHER_REGEXP, get_raw_path
from lsst.resources import ResourcePath

from activator.raw import (
LSST_REGEXP,
OTHER_REGEXP,
get_raw_path,
_LSST_CAMERA_LIST,
_DETECTOR_FROM_RS,
)
from activator.visit import FannedOutVisit, SummitVisit
from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit
from tester.utils import (
get_last_group,
increment_group,
make_exposure_id,
replace_header_key,
send_next_visit,
)


@dataclasses.dataclass
Expand Down Expand Up @@ -96,14 +112,18 @@ def main():
dest_bucket = s3.Bucket("rubin:rubin-pp")
dest_bucket.meta.client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)

last_group = get_last_group(dest_bucket, instrument, date)
_log.info(f"Last group {last_group}")

src_bucket = s3.Bucket("rubin:rubin-pp-users")
src_bucket.meta.client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)
raw_pool = get_samples(src_bucket, instrument)

new_group_base = last_group + random.randrange(10, 19)
last_group = get_last_group(dest_bucket, instrument, date)
new_group_base = increment_group(instrument, last_group, random.randrange(10, 19))
_log.info(f"Last group {last_group}. New group base {new_group_base}")

if instrument in _LSST_CAMERA_LIST:
raw_pool = get_samples_lsst(src_bucket, instrument)
else:
raw_pool = get_samples_non_lsst(src_bucket, instrument)

if raw_pool:
_log.info(f"Observing real raw files from {instrument}.")
upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket,
Expand All @@ -112,8 +132,11 @@ def main():
_log.error(f"No raw files found for {instrument}, aborting.")


def get_samples(bucket, instrument):
"""Return any predefined raw exposures for a given instrument.
def get_samples_non_lsst(bucket, instrument):
"""Return any predefined raw exposures for a non-LSST instrument.
The raws follows the non-LSST filename format as defined in activator/raw.py:
instrument/detector/group/snap/expid/filter/*.fz
Parameters
----------
Expand Down Expand Up @@ -195,6 +218,68 @@ def get_samples(bucket, instrument):
return result


def get_samples_lsst(bucket, instrument):
"""Return any predefined raw exposures for a LSST instrument.
The raws follows the LSST filename convention.
Parameters
----------
bucket : `S3.Bucket`
The bucket in which to search for predefined raws.
instrument : `str`
The short name of the instrument to sample.
Returns
-------
raws : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]]
A mapping from group IDs to a mapping of snap ID. The value of the
innermost mapping is the observation metadata for each detector,
and a Blob representing the image taken in that detector-snap.
"""
# The pre-made raw files are stored with the "unobserved" prefix
blobs = bucket.objects.filter(Prefix=f"unobserved/{instrument}/")
result = {}
for blob in blobs:
# Assume that the unobserved bucket uses the same filename scheme as
# the observed bucket.
m = re.match(LSST_REGEXP, blob.key)
if not m or m["extension"] == ".json":
continue

# Retrieve the corresponding sidecar json file
sidecar = ResourcePath("s3://" + blob.bucket_name).join(
blob.key.removesuffix(m["extension"]) + ".json"
)
if not sidecar.exists():
raise RuntimeError(f"Unable to retrieve JSON sidecar: {sidecar}")
with sidecar.open("r") as f:
md = json.load(f)

visit = FannedOutVisit(
instrument=instrument,
detector=_DETECTOR_FROM_RS[instrument][m["raft_sensor"]],
groupId=md["GROUPID"],
nimages=INSTRUMENTS[instrument].n_snaps,
filters=md["FILTBAND"],
coordinateSystem=FannedOutVisit.CoordSys.ICRS,
position=[md["RA"], md["DEC"]],
rotationSystem=FannedOutVisit.RotSys.SKY,
cameraAngle=md["ROTPA"],
survey="SURVEY",
salIndex=2, # 2 is LATISS
scriptSalIndex=2,
dome=FannedOutVisit.Dome.OPEN,
duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL),
totalCheckpoints=1,
private_sndStamp=datetime.datetime.fromisoformat(md["DATE"]).timestamp(),
)
_log.debug(f"File {blob.key} parsed as visit {visit} and registered as group {md['GROUPID']}.")
result[md["GROUPID"]] = {0: {visit: blob}}

return result


def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n_groups, group_base):
"""Upload visits and files using real raws.
Expand All @@ -217,8 +302,8 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n
The number of observation groups to simulate. If more than the number
of groups in ``raw_pool``, files will be re-uploaded under new
group IDs.
group_base : `int`
The base number from which to offset new group numbers.
group_base : `str`
The base group ID from which to offset new group IDs.
Exceptions
----------
Expand All @@ -230,7 +315,7 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n
"unobserved raw groups are available.")

for i, true_group in enumerate(itertools.islice(raw_pool, n_groups)):
group = str(group_base + i)
group = increment_group(instrument, group_base, i)
_log.info(f"Processing group {group} from unobserved {true_group}...")
# snap_dict maps snap_id to {visit: blob}
snap_dict = {}
Expand All @@ -248,16 +333,30 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n
# closures for the buckets and data.
def upload_from_pool(visit, snap_id):
src_blob = snap_dict[snap_id][visit]
exposure_key, exposure_header, exposure_num = \
make_exposure_id(visit.instrument, int(visit.groupId), snap_id)
exposure_num, headers = \
make_exposure_id(visit.instrument, visit.groupId, snap_id)
filename = get_raw_path(visit.instrument, visit.detector, visit.groupId, snap_id,
exposure_num, visit.filters)
# r+b required by replace_header_key.
with tempfile.TemporaryFile(mode="r+b") as buffer:
src_bucket.download_fileobj(src_blob.key, buffer)
replace_header_key(buffer, exposure_key, exposure_header)
for header_key in headers:
replace_header_key(buffer, header_key, headers[header_key])
buffer.seek(0) # Assumed by upload_fileobj.
dest_bucket.upload_fileobj(buffer, filename)
_log.debug(f"{filename} is uploaded to {dest_bucket}")

if instrument in _LSST_CAMERA_LIST:
# Upload a corresponding sidecar json file
sidecar = ResourcePath("s3://" + src_blob.bucket_name).join(
src_blob.key.removesuffix("fits") + "json"
)
filename_sidecar = filename.removesuffix("fits") + "json"
with sidecar.open("r") as f:
md = json.load(f)
for header_key in headers:
md[header_key] = headers[header_key]
dest_bucket.put_object(Body=json.dumps(md), Key=filename_sidecar)

process_group(kafka_url, visit_infos, upload_from_pool)

Expand Down
31 changes: 19 additions & 12 deletions python/tester/upload_hsc_rc2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@

from activator.raw import get_raw_path
from activator.visit import SummitVisit
from tester.utils import get_last_group, make_exposure_id, replace_header_key, send_next_visit, \
day_obs_to_unix_utc
from tester.utils import (
get_last_group,
increment_group,
make_exposure_id,
replace_header_key,
send_next_visit,
day_obs_to_unix_utc,
)


EXPOSURE_INTERVAL = 18
Expand Down Expand Up @@ -81,8 +87,8 @@ def main():
_set_s3_bucket()

last_group = get_last_group(dest_bucket, "HSC", date)
group_num = last_group + random.randrange(10, 19)
_log.debug(f"Last group {last_group}; new group base {group_num}")
group = increment_group("HSC", last_group, random.randrange(10, 19))
_log.debug(f"Last group {last_group}; new group base {group}")

butler = Butler("/repo/main")
visit_list = get_hsc_visit_list(butler, n_groups)
Expand All @@ -98,14 +104,14 @@ def main():
with context.Pool(processes=max_processes, initializer=_set_s3_bucket) as pool, \
tempfile.TemporaryDirectory() as temp_dir:
for visit in visit_list:
group_num += 1
refs = prepare_one_visit(kafka_url, str(group_num), butler, visit)
_log.info(f"Slewing to group {group_num}, with HSC visit {visit}")
group = increment_group("HSC", group, 1)
refs = prepare_one_visit(kafka_url, group, butler, visit)
_log.info(f"Slewing to group {group}, with HSC visit {visit}")
time.sleep(SLEW_INTERVAL)
_log.info(f"Taking exposure for group {group_num}")
_log.info(f"Taking exposure for group {group}")
time.sleep(EXPOSURE_INTERVAL)
_log.info(f"Uploading detector images for group {group_num}")
upload_hsc_images(pool, temp_dir, str(group_num), butler, refs)
_log.info(f"Uploading detector images for group {group}")
upload_hsc_images(pool, temp_dir, group, butler, refs)
pool.close()
_log.info("Waiting for uploads to finish...")
pool.join()
Expand Down Expand Up @@ -251,7 +257,7 @@ def _upload_one_image(temp_dir, group_id, butler, ref):
The dataset to upload.
"""
with time_this(log=_log, msg="Single-image processing", prefix=None):
exposure_key, exposure_header, exposure_num = make_exposure_id("HSC", int(group_id), 0)
exposure_num, headers = make_exposure_id("HSC", group_id, 0)
dest_key = get_raw_path(
"HSC",
ref.dataId["detector"],
Expand All @@ -278,7 +284,8 @@ def _upload_one_image(temp_dir, group_id, butler, ref):
f"Raw file for {ref.dataId} was copied from Butler to {path}"
)
with open(path, "r+b") as temp_file:
replace_header_key(temp_file, exposure_key, exposure_header)
for header_key in headers:
replace_header_key(temp_file, header_key, headers[header_key])
dest_bucket.upload_file(path, dest_key)
_log.debug(f"{dest_key} was written at {dest_bucket}")

Expand Down
Loading

0 comments on commit 3ee2790

Please sign in to comment.