From 3e92054efa40ed71c460c3c387d64f4ecab64681 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 15 Oct 2024 11:08:53 -0700 Subject: [PATCH] Add multi_night_workflow for ingesting multi-night ImageCollections --- src/kbmod_wf/multi_night_workflow.py | 161 ++++++++++ .../resource_configs/klone_configuration.py | 14 +- ...roject_multi_chip_multi_night_from_uris.py | 289 ++++++++++++++++++ .../reproject_multi_chip_multi_night_wu.py | 170 +---------- src/kbmod_wf/workflow_tasks/kbmod_search.py | 2 +- src/kbmod_wf/workflow_tasks/reproject_wu.py | 2 +- .../reproject_wu_multi_night.py | 56 ++++ 7 files changed, 523 insertions(+), 171 deletions(-) create mode 100644 src/kbmod_wf/multi_night_workflow.py create mode 100644 src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py create mode 100644 src/kbmod_wf/workflow_tasks/reproject_wu_multi_night.py diff --git a/src/kbmod_wf/multi_night_workflow.py b/src/kbmod_wf/multi_night_workflow.py new file mode 100644 index 0000000..4b176f6 --- /dev/null +++ b/src/kbmod_wf/multi_night_workflow.py @@ -0,0 +1,161 @@ +import argparse +import os + +import toml +import parsl +from parsl import python_app, File +import parsl.executors + +from kbmod_wf.utilities import ( + apply_runtime_updates, + get_resource_config, + get_executors, + get_configured_logger, +) + +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, reproject_wu_multi_night, kbmod_search + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "sharded_reproject"]), + ignore_for_cache=["logging_file"], +) +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + + logger = get_configured_logger("task.reproject_wu", logging_file.filepath) + + from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu + guess_dist = inputs[1] # heliocentric guess distance in AU + logger.info(f"Starting reproject_ic for guess distance {guess_dist}") + with ErrorLogger(logger): + reproject_wu( + guess_dist, + original_wu_filepath=inputs[0].filepath, + reprojected_wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + logger.info("Completed reproject_ic") + return outputs[0] + + +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + dfk = parsl.load(resource_config) + if dfk: + logging_file = File(os.path.join(dfk.run_dir, "kbmod.log")) + logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # gather all the *.collection files that are staged for processing + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) + create_manifest_future = create_manifest( + inputs=[], + outputs=[manifest_file], + runtime_config=app_configs.get("create_manifest", {}), + logging_file=logging_file, + ) + + wu_filenames =[] + with open(create_manifest_future.result(), "r") as f: + # process each .collection file in the manifest into a .wu file + original_work_unit_futures = [] + collection_files = [] + for line in f: + collection_file = File(line.strip()) + collection_files.append(collection_file) + wu_filename = line + ".wu" + wu_filenames.append(wu_filename) + original_work_unit_futures.append( + ic_to_wu( + inputs=[collection_file], + outputs=[File(wu_filename)], + runtime_config=app_configs.get("ic_to_wu", {}), + logging_file=logging_file, + ) + ) + + # reproject each WorkUnit for a range of distances + reproject_futures = [] + repro_wu_filenames = [] + for i in range(len(original_work_unit_futures)): + f = original_work_unit_futures[i] + for distance in [46.7, 30.6]: # The reprojected distance in AU + output_filename=wu_filenames[i]+ f".{distance}.repro" + repro_wu_filenames.append(output_filename) + reproject_futures.append( + reproject_wu( + inputs=[f, distance], + outputs=[File(output_filename)], + runtime_config=app_configs.get("reproject_wu", {}), + logging_file=logging_file, + ) + ) + + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for i in range(len(reproject_futures)): + f = reproject_futures[i] + search_futures.append( + kbmod_search( + inputs=[f], + outputs=[File(repro_wu_filenames[i] + ".search.ecsv")], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + ) + + [f.result() for f in search_futures] + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index 953622b..da72c0d 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -18,9 +18,9 @@ def klone_resource_config(): app_cache=True, checkpoint_mode="task_exit", checkpoint_files=get_all_checkpoints( - os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()) + os.path.join("/gscratch/dirac/kbmod/workflow/run_logs/wbeebe", datetime.date.today().isoformat()) ), - run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()), + run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs/wbeebe", datetime.date.today().isoformat()), retries=1, executors=[ HighThroughputExecutor( @@ -46,8 +46,8 @@ def klone_resource_config(): label="large_mem", max_workers=1, provider=SlurmProvider( - partition="ckpt-g2", - account="astro", + partition="gpu-a40", + account="escience", min_blocks=0, max_blocks=2, init_blocks=0, @@ -65,8 +65,8 @@ def klone_resource_config(): label="sharded_reproject", max_workers=1, provider=SlurmProvider( - partition="ckpt-g2", - account="astro", + partition="gpu-a40", + account="escience", min_blocks=0, max_blocks=2, init_blocks=0, @@ -84,7 +84,7 @@ def klone_resource_config(): label="gpu", max_workers=1, provider=SlurmProvider( - partition="ckpt-g2", + partition="gpu-a40", account="escience", min_blocks=0, max_blocks=2, diff --git a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py new file mode 100644 index 0000000..49fe440 --- /dev/null +++ b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py @@ -0,0 +1,289 @@ +import kbmod +from kbmod.work_unit import WorkUnit +from kbmod.reprojection_utils import transform_wcses_to_ebd + +import kbmod.reprojection as reprojection +from astropy.wcs import WCS +from astropy.io import fits +from astropy.coordinates import EarthLocation +from astropy.time import Time +import numpy as np +import os +import time +from logging import Logger + + +def reproject_wu( + original_wu_filepath: str = None, + uri_filepath: str = None, + reprojected_wu_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, +): + """This task will perform reflex correction and reproject a WorkUnit to a common WCS. + + Parameters + ---------- + original_wu_filepath : str, optional + The fully resolved filepath to the input WorkUnit file, by default None + uri_filepath : str, optional + The fully resolved filepath to the original uri file. This is used + exclusively for the header contents, by default None + reprojected_wu_filepath : str, optional + The fully resolved filepath to the resulting WorkUnit file after reflex + and reprojection, by default None + runtime_config : dict, optional + Additional configuration parameters to be used at runtime, by default {} + logger : Logger, optional + Primary logger for the workflow, by default None + + Returns + ------- + str + The fully resolved filepath of the resulting WorkUnit file after reflex + and reprojection. + """ + wu_reprojector = WUReprojector( + original_wu_filepath=original_wu_filepath, + uri_filepath=uri_filepath, + reprojected_wu_filepath=reprojected_wu_filepath, + runtime_config=runtime_config, + logger=logger, + ) + + return wu_reprojector.reproject_workunit() + + +class WUReprojector: + def __init__( + self, + original_wu_filepath: str = None, + uri_filepath: str = None, + reprojected_wu_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, + ): + self.original_wu_filepath = original_wu_filepath + self.uri_filepath = uri_filepath + self.reprojected_wu_filepath = reprojected_wu_filepath + self.runtime_config = runtime_config + self.logger = logger + kbmod._logging.basicConfig(level=self.logger.level) + + self.overwrite = self.runtime_config.get("overwrite", False) + self.search_config = self.runtime_config.get("search_config", None) + + # Default to 8 workers if not in the config. Value must be 0