From 9b7188e061aafc90ebb028b68636b1fcdffbdaf9 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Fri, 19 Jul 2024 14:47:45 -0700 Subject: [PATCH] Updating the workflow to prep for Klone. Added new executors for different apps. --- .../resource_configs/klone_configuration.py | 43 ++++++++++++++++++- src/kbmod_wf/workflow.py | 25 +++++++---- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index f87a6b2..9621a87 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -6,7 +6,9 @@ from parsl.utils import get_all_checkpoints walltimes = { - "compute-bigmem": "01:00:00", # change this to be appropriate + "compute_bigmem": "01:00:00", + "large_mem": "08:00:00", + "gpu_max": "04:00:00", } @@ -32,7 +34,44 @@ def klone_resource_config(): cores_per_node=1, # perhaps should be 8??? mem_per_node=64, # In GB exclusive=False, - walltime=walltimes["compute-bigmem"], + walltime=walltimes["compute_bigmem"], + # Command to run before starting worker - i.e. conda activate + worker_init="", + ), + ), + HighThroughputExecutor( + label="large_mem", + provider=SlurmProvider( + partition="compute-bigmem", + account="astro", + min_blocks=0, + max_blocks=2, + init_blocks=1, + parallelism=1, + nodes_per_block=1, + cores_per_node=8, + mem_per_node=512, + exclusive=False, + walltime=walltimes["large_mem"], + # Command to run before starting worker - i.e. conda activate + worker_init="", + ), + ), + HighThroughputExecutor( + label="gpu", + available_accelerators=2, + provider=SlurmProvider( + partition="gpu_a40", + account="escience", + min_blocks=0, + max_blocks=2, + init_blocks=1, + parallelism=1, + nodes_per_block=1, + cores_per_node=4, # perhaps should be 8??? + mem_per_node=128, # In GB + exclusive=False, + walltime=walltimes["gpu_max"], # Command to run before starting worker - i.e. conda activate worker_init="", ), diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index f3b31d5..ee7f08b 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -15,7 +15,7 @@ executors=get_executors(["local_dev_testing", "local_thread"]), ignore_for_cache=["logging_file"], ) -def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None): +def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None): """This app will go to a given directory, find all of the uri.lst files there, and copy the paths to the manifest file.""" import glob @@ -24,7 +24,7 @@ def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None): logger = configure_logger("task.create_uri_manifest", logging_file.filepath) - directory_path = config.get("staging_directory") + directory_path = runtime_config.get("staging_directory") if directory_path is None: raise ValueError("No staging_directory provided in the configuration.") @@ -94,7 +94,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( - cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] + cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] ) def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger @@ -105,7 +105,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): logger.info("Starting reproject_ic") reproject_wu( original_wu_filepath=inputs[0].filepath, - uri_filepath=None, # ! determine what, if any, value should be used. + uri_filepath=inputs[1].filepath, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, @@ -116,7 +116,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( - cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] + cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] ) def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger @@ -156,18 +156,22 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: create_uri_manifest_future = create_uri_manifest( inputs=[], outputs=[manifest_file], - config=app_configs.get("create_uri_manifest", {}), + runtime_config=app_configs.get("create_uri_manifest", {}), logging_file=logging_file, ) with open(create_uri_manifest_future.result(), "r") as f: # process each .lst file in the manifest into a .ecvs file uri_to_ic_futures = [] + uri_files = [] for line in f: + uri_file = File(line.strip()) + uri_files.append(uri_file) uri_to_ic_futures.append( uri_to_ic( - inputs=[File(line.strip())], + inputs=[uri_file], outputs=[File(line + ".ecsv")], + runtime_config=app_configs.get("uri_to_ic", {}), logging_file=logging_file, ) ) @@ -179,18 +183,20 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: ic_to_wu( inputs=[f.result()], outputs=[File(f.result().filepath + ".wu")], + runtime_config=app_configs.get("ic_to_wu", {}), logging_file=logging_file, ) ) # reproject each WorkUnit for a range of distances reproject_futures = [] - for f in original_work_unit_futures: + for f, uri_file in zip(original_work_unit_futures, uri_files): for distance in range(40, 60, 10): reproject_futures.append( reproject_wu( - inputs=[f.result()], + inputs=[f.result(), uri_file], outputs=[File(f.result().filepath + f".{distance}.repro")], + runtime_config=app_configs.get("reproject_wu", {}), logging_file=logging_file, ) ) @@ -202,6 +208,7 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: kbmod_search( inputs=[f.result()], outputs=[File(f.result().filepath + ".search")], + runtime_config=app_configs.get("kbmod_search", {}), logging_file=logging_file, ) )