Skip to content

Commit

Permalink
Merge pull request #16 from dirac-institute/issue/11/investigate-resu…
Browse files Browse the repository at this point in the history
…ming-wf-klone

Get checkpointing working correctly
  • Loading branch information
drewoldag authored Jul 18, 2024
2 parents 2977f63 + bfae2a5 commit df39af1
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 57 deletions.
2 changes: 1 addition & 1 deletion example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# calling parsl.load(config). Even if the key does't exist in the resource
# config, it will be added with the value defined here.
[resource_config_modifiers]
checkpoint_mode = "task_exit"
checkpoint_mode = 'task_exit'


# Values in the apps.XXX section will be passed as a dictionary to the corresponding
Expand Down
7 changes: 7 additions & 0 deletions src/kbmod_wf/resource_configs/dev_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import datetime
from parsl import Config
from parsl.executors import ThreadPoolExecutor
from parsl.utils import get_all_checkpoints


this_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = os.path.abspath(os.path.join(this_dir, "../../../"))
Expand All @@ -11,6 +13,11 @@ def dev_resource_config():
return Config(
# put the log files in in the top level folder, "run_logs".
run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()),
app_cache=True,
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(
os.path.join(project_dir, "run_logs", datetime.date.today().isoformat())
),
executors=[
ThreadPoolExecutor(
label="local_dev_testing",
Expand Down
4 changes: 3 additions & 1 deletion src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def klone_resource_config():
return Config(
app_cache=True,
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(),
checkpoint_files=get_all_checkpoints(
os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat())
),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
executors=[
HighThroughputExecutor(
Expand Down
5 changes: 4 additions & 1 deletion src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time


def ic_to_wu(ic_file=None, wu_file=None, logger=None):
logger.info("In the ic_to_wu task_impl")
with open(ic_file, "r") as f:
Expand All @@ -6,6 +9,6 @@ def ic_to_wu(ic_file=None, wu_file=None, logger=None):
logger.info(line.strip())

with open(wu_file, "w") as f:
f.write(f"Logged: {value}")
f.write(f"Logged: {value} - {time.time()}\n")

return wu_file
7 changes: 6 additions & 1 deletion src/kbmod_wf/task_impls/kbmod_search.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import time


def kbmod_search(input_wu=None, result_file=None, logger=None):
logger.info("In the kbmod_search task_impl")
with open(input_wu, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

time.sleep(5)

with open(result_file, "w") as f:
f.write(f"Logged: {value}")
f.write(f"Logged: {value} - {time.time()}\n")

return result_file
5 changes: 4 additions & 1 deletion src/kbmod_wf/task_impls/reproject_wu.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time


def reproject_wu(input_wu=None, reprojected_wu=None, logger=None):
logger.info("In the reproject_wu task_impl")
with open(input_wu, "r") as f:
Expand All @@ -6,6 +9,6 @@ def reproject_wu(input_wu=None, reprojected_wu=None, logger=None):
logger.info(line.strip())

with open(reprojected_wu, "w") as f:
f.write(f"Logged: {value}")
f.write(f"Logged: {value} - {time.time()}\n")

return reprojected_wu
25 changes: 13 additions & 12 deletions src/kbmod_wf/task_impls/uri_to_ic.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import os
import glob
import time

from kbmod import ImageCollection
# from kbmod import ImageCollection


# def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None):
# with open(target_uris_file_path, "r") as f:
# for line in f:
# value = line.strip()
# logger.info(line.strip())
def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None):
with open(target_uris_file_path, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

# with open(ic_output_file_path, "w") as f:
# f.write(f"Logged: {value}")
with open(ic_output_file_path, "w") as f:
f.write(f"Logged: {value} - {time.time()}\n")

# return ic_output_file_path
return ic_output_file_path


#! I believe that we can remove the `uris_base_dir` parameter from the function
#! signature. It doesn't seem to be used in practice.
def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None):
def no_uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None):
"""For each URI in the target_uris_file, perform string cleaning and build a
file path. Then create an ImageCollection object with the final file paths
and write the ImageCollection to a file.
Expand Down Expand Up @@ -85,7 +86,7 @@ def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_pat

logger.info("Creating ImageCollection")
# Create an ImageCollection object from the list of URIs
ic = ImageCollection.fromTargets(uris)
# ic = ImageCollection.fromTargets(uris)

logger.info(f"Writing ImageCollection to file {ic_output_file_path}")
ic.write(ic_output_file_path, format="ascii.ecsv")
# ic.write(ic_output_file_path, format="ascii.ecsv")
4 changes: 2 additions & 2 deletions src/kbmod_wf/utilities/memoization_utilities.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
import pickle
from parsl.dataflow.memoization import id_for_memo
from parsl import File


@id_for_memo.register(File)
def id_for_memo_file(parsl_file_object: File, output_ref: bool = False) -> bytes:
if output_ref and os.path.exists(parsl_file_object.filepath):
return pickle.dumps(parsl_file_object.filepath)
return pickle.dumps(parsl_file_object.filepath)
90 changes: 52 additions & 38 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from kbmod_wf.utilities.logger_utilities import configure_logger


@python_app(executors=get_executors(["local_dev_testing", "local_thread"]))
@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "local_thread"]),
ignore_for_cache=["logging_file"],
)
def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None):
"""This app will go to a given directory, find all of the uri.lst files there,
and copy the paths to the manifest file."""
Expand Down Expand Up @@ -46,7 +50,9 @@ def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None):
return outputs[0]


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def read_and_log(inputs=[], outputs=[], logging_file=None):
"""THIS IS A PLACEHOLDER FUNCTION THAT WILL BE REMOVED SOON"""
from kbmod_wf.utilities.logger_utilities import configure_logger
Expand All @@ -64,7 +70,9 @@ def read_and_log(inputs=[], outputs=[], logging_file=None):
return outputs[0]


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def uri_to_ic(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.uri_to_ic import uri_to_ic
Expand All @@ -83,7 +91,9 @@ def uri_to_ic(inputs=[], outputs=[], logging_file=None):
return outputs[0]


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def ic_to_wu(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.ic_to_wu import ic_to_wu
Expand All @@ -97,7 +107,9 @@ def ic_to_wu(inputs=[], outputs=[], logging_file=None):
return outputs[0]


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def reproject_wu(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.reproject_wu import reproject_wu
Expand All @@ -111,7 +123,9 @@ def reproject_wu(inputs=[], outputs=[], logging_file=None):
return outputs[0]


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def kbmod_search(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.kbmod_search import kbmod_search
Expand All @@ -125,19 +139,18 @@ def kbmod_search(inputs=[], outputs=[], logging_file=None):
return outputs[0]


def workflow_runner(env: str = None, runtime_config: dict = None) -> None:
def workflow_runner(env: str = None, runtime_config: dict = {}) -> None:
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

if runtime_config is not None:
resource_config = apply_runtime_updates(resource_config, runtime_config)
app_configs = runtime_config.get("apps", {})
app_configs = runtime_config.get("apps", {})

with parsl.load(resource_config) as dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration defintion:\n{toml.dumps(runtime_config)}")
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

Expand All @@ -162,39 +175,39 @@ def workflow_runner(env: str = None, runtime_config: dict = None) -> None:
)
)

# create an original WorkUnit for each .ecsv file
original_work_unit_futures = []
for f in uri_to_ic_futures:
original_work_unit_futures.append(
ic_to_wu(
# create an original WorkUnit for each .ecsv file
original_work_unit_futures = []
for f in uri_to_ic_futures:
original_work_unit_futures.append(
ic_to_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".wu")],
logging_file=logging_file,
)
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
for f in original_work_unit_futures:
for distance in range(40, 60, 10):
reproject_futures.append(
reproject_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".wu")],
outputs=[File(f.result().filepath + f".{distance}.repro")],
logging_file=logging_file,
)
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
for f in original_work_unit_futures:
for distance in range(40, 60, 10):
reproject_futures.append(
reproject_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + f".{distance}.repro")],
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search")],
logging_file=logging_file,
)
# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search")],
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

Expand All @@ -221,6 +234,7 @@ def workflow_runner(env: str = None, runtime_config: dict = None) -> None:
args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)
Expand Down

0 comments on commit df39af1

Please sign in to comment.