Skip to content

Commit

Permalink
Merge pull request #20 from dirac-institute/awo/tweaks-for-klone
Browse files Browse the repository at this point in the history
Updating the workflow for real use
  • Loading branch information
drewoldag authored Jul 19, 2024
2 parents 0f16133 + 9b7188e commit b08ddff
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
43 changes: 41 additions & 2 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from parsl.utils import get_all_checkpoints

walltimes = {
"compute-bigmem": "01:00:00", # change this to be appropriate
"compute_bigmem": "01:00:00",
"large_mem": "08:00:00",
"gpu_max": "04:00:00",
}


Expand All @@ -32,7 +34,44 @@ def klone_resource_config():
cores_per_node=1, # perhaps should be 8???
mem_per_node=64, # In GB
exclusive=False,
walltime=walltimes["compute-bigmem"],
walltime=walltimes["compute_bigmem"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
label="large_mem",
provider=SlurmProvider(
partition="compute-bigmem",
account="astro",
min_blocks=0,
max_blocks=2,
init_blocks=1,
parallelism=1,
nodes_per_block=1,
cores_per_node=8,
mem_per_node=512,
exclusive=False,
walltime=walltimes["large_mem"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
label="gpu",
available_accelerators=2,
provider=SlurmProvider(
partition="gpu_a40",
account="escience",
min_blocks=0,
max_blocks=2,
init_blocks=1,
parallelism=1,
nodes_per_block=1,
cores_per_node=4, # perhaps should be 8???
mem_per_node=128, # In GB
exclusive=False,
walltime=walltimes["gpu_max"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
Expand Down
25 changes: 16 additions & 9 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
executors=get_executors(["local_dev_testing", "local_thread"]),
ignore_for_cache=["logging_file"],
)
def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None):
def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None):
"""This app will go to a given directory, find all of the uri.lst files there,
and copy the paths to the manifest file."""
import glob
Expand All @@ -24,7 +24,7 @@ def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None):

logger = configure_logger("task.create_uri_manifest", logging_file.filepath)

directory_path = config.get("staging_directory")
directory_path = runtime_config.get("staging_directory")
if directory_path is None:
raise ValueError("No staging_directory provided in the configuration.")

Expand Down Expand Up @@ -94,7 +94,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"]
)
def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
Expand All @@ -105,7 +105,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):
logger.info("Starting reproject_ic")
reproject_wu(
original_wu_filepath=inputs[0].filepath,
uri_filepath=None, # ! determine what, if any, value should be used.
uri_filepath=inputs[1].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
Expand All @@ -116,7 +116,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"]
)
def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
Expand Down Expand Up @@ -156,18 +156,22 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None:
create_uri_manifest_future = create_uri_manifest(
inputs=[],
outputs=[manifest_file],
config=app_configs.get("create_uri_manifest", {}),
runtime_config=app_configs.get("create_uri_manifest", {}),
logging_file=logging_file,
)

with open(create_uri_manifest_future.result(), "r") as f:
# process each .lst file in the manifest into a .ecvs file
uri_to_ic_futures = []
uri_files = []
for line in f:
uri_file = File(line.strip())
uri_files.append(uri_file)
uri_to_ic_futures.append(
uri_to_ic(
inputs=[File(line.strip())],
inputs=[uri_file],
outputs=[File(line + ".ecsv")],
runtime_config=app_configs.get("uri_to_ic", {}),
logging_file=logging_file,
)
)
Expand All @@ -179,18 +183,20 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None:
ic_to_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".wu")],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
for f in original_work_unit_futures:
for f, uri_file in zip(original_work_unit_futures, uri_files):
for distance in range(40, 60, 10):
reproject_futures.append(
reproject_wu(
inputs=[f.result()],
inputs=[f.result(), uri_file],
outputs=[File(f.result().filepath + f".{distance}.repro")],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)
Expand All @@ -202,6 +208,7 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None:
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)
Expand Down

0 comments on commit b08ddff

Please sign in to comment.