Skip to content

Commit

Permalink
Add multi_night_workflow for ingesting multi-night ImageCollections
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonbb committed Oct 15, 2024
1 parent b6f28c6 commit 3e92054
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 171 deletions.
161 changes: 161 additions & 0 deletions src/kbmod_wf/multi_night_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import argparse
import os

import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, reproject_wu_multi_night, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu
guess_dist = inputs[1] # heliocentric guess distance in AU
logger.info(f"Starting reproject_ic for guess distance {guess_dist}")
with ErrorLogger(logger):
reproject_wu(
guess_dist,
original_wu_filepath=inputs[0].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
logger.info("Completed reproject_ic")
return outputs[0]


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.
Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

app_configs = runtime_config.get("apps", {})

dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "kbmod.log"))
logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

wu_filenames =[]
with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
original_work_unit_futures = []
collection_files = []
for line in f:
collection_file = File(line.strip())
collection_files.append(collection_file)
wu_filename = line + ".wu"
wu_filenames.append(wu_filename)
original_work_unit_futures.append(
ic_to_wu(
inputs=[collection_file],
outputs=[File(wu_filename)],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
repro_wu_filenames = []
for i in range(len(original_work_unit_futures)):
f = original_work_unit_futures[i]
for distance in [46.7, 30.6]: # The reprojected distance in AU
output_filename=wu_filenames[i]+ f".{distance}.repro"
repro_wu_filenames.append(output_filename)
reproject_futures.append(
reproject_wu(
inputs=[f, distance],
outputs=[File(output_filename)],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for i in range(len(reproject_futures)):
f = reproject_futures[i]
search_futures.append(
kbmod_search(
inputs=[f],
outputs=[File(repro_wu_filenames[i] + ".search.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
14 changes: 7 additions & 7 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def klone_resource_config():
app_cache=True,
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(
os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat())
os.path.join("/gscratch/dirac/kbmod/workflow/run_logs/wbeebe", datetime.date.today().isoformat())
),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs/wbeebe", datetime.date.today().isoformat()),
retries=1,
executors=[
HighThroughputExecutor(
Expand All @@ -46,8 +46,8 @@ def klone_resource_config():
label="large_mem",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
partition="gpu-a40",
account="escience",
min_blocks=0,
max_blocks=2,
init_blocks=0,
Expand All @@ -65,8 +65,8 @@ def klone_resource_config():
label="sharded_reproject",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
partition="gpu-a40",
account="escience",
min_blocks=0,
max_blocks=2,
init_blocks=0,
Expand All @@ -84,7 +84,7 @@ def klone_resource_config():
label="gpu",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
partition="gpu-a40",
account="escience",
min_blocks=0,
max_blocks=2,
Expand Down
Loading

0 comments on commit 3e92054

Please sign in to comment.