diff --git a/example_runtime_config.toml b/example_runtime_config.toml index d30f55d..6347cc0 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -3,7 +3,7 @@ # calling parsl.load(config). Even if the key does't exist in the resource # config, it will be added with the value defined here. [resource_config_modifiers] -checkpoint_mode = "task_exit" +checkpoint_mode = 'task_exit' # Values in the apps.XXX section will be passed as a dictionary to the corresponding diff --git a/src/kbmod_wf/resource_configs/dev_configuration.py b/src/kbmod_wf/resource_configs/dev_configuration.py index d0631b5..b003b62 100644 --- a/src/kbmod_wf/resource_configs/dev_configuration.py +++ b/src/kbmod_wf/resource_configs/dev_configuration.py @@ -2,6 +2,8 @@ import datetime from parsl import Config from parsl.executors import ThreadPoolExecutor +from parsl.utils import get_all_checkpoints + this_dir = os.path.dirname(os.path.abspath(__file__)) project_dir = os.path.abspath(os.path.join(this_dir, "../../../")) @@ -11,6 +13,11 @@ def dev_resource_config(): return Config( # put the log files in in the top level folder, "run_logs". run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()), + app_cache=True, + checkpoint_mode="task_exit", + checkpoint_files=get_all_checkpoints( + os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()) + ), executors=[ ThreadPoolExecutor( label="local_dev_testing", diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index 78a045f..f87a6b2 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -14,7 +14,9 @@ def klone_resource_config(): return Config( app_cache=True, checkpoint_mode="task_exit", - checkpoint_files=get_all_checkpoints(), + checkpoint_files=get_all_checkpoints( + os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()) + ), run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()), executors=[ HighThroughputExecutor( diff --git a/src/kbmod_wf/task_impls/ic_to_wu.py b/src/kbmod_wf/task_impls/ic_to_wu.py index ba9f974..a8e02e7 100644 --- a/src/kbmod_wf/task_impls/ic_to_wu.py +++ b/src/kbmod_wf/task_impls/ic_to_wu.py @@ -1,3 +1,6 @@ +import time + + def ic_to_wu(ic_file=None, wu_file=None, logger=None): logger.info("In the ic_to_wu task_impl") with open(ic_file, "r") as f: @@ -6,6 +9,6 @@ def ic_to_wu(ic_file=None, wu_file=None, logger=None): logger.info(line.strip()) with open(wu_file, "w") as f: - f.write(f"Logged: {value}") + f.write(f"Logged: {value} - {time.time()}\n") return wu_file diff --git a/src/kbmod_wf/task_impls/kbmod_search.py b/src/kbmod_wf/task_impls/kbmod_search.py index f51019c..bace169 100644 --- a/src/kbmod_wf/task_impls/kbmod_search.py +++ b/src/kbmod_wf/task_impls/kbmod_search.py @@ -1,3 +1,6 @@ +import time + + def kbmod_search(input_wu=None, result_file=None, logger=None): logger.info("In the kbmod_search task_impl") with open(input_wu, "r") as f: @@ -5,7 +8,9 @@ def kbmod_search(input_wu=None, result_file=None, logger=None): value = line.strip() logger.info(line.strip()) + time.sleep(5) + with open(result_file, "w") as f: - f.write(f"Logged: {value}") + f.write(f"Logged: {value} - {time.time()}\n") return result_file diff --git a/src/kbmod_wf/task_impls/reproject_wu.py b/src/kbmod_wf/task_impls/reproject_wu.py index 52329bf..716fe27 100644 --- a/src/kbmod_wf/task_impls/reproject_wu.py +++ b/src/kbmod_wf/task_impls/reproject_wu.py @@ -1,3 +1,6 @@ +import time + + def reproject_wu(input_wu=None, reprojected_wu=None, logger=None): logger.info("In the reproject_wu task_impl") with open(input_wu, "r") as f: @@ -6,6 +9,6 @@ def reproject_wu(input_wu=None, reprojected_wu=None, logger=None): logger.info(line.strip()) with open(reprojected_wu, "w") as f: - f.write(f"Logged: {value}") + f.write(f"Logged: {value} - {time.time()}\n") return reprojected_wu diff --git a/src/kbmod_wf/task_impls/uri_to_ic.py b/src/kbmod_wf/task_impls/uri_to_ic.py index 921ac85..95122dc 100644 --- a/src/kbmod_wf/task_impls/uri_to_ic.py +++ b/src/kbmod_wf/task_impls/uri_to_ic.py @@ -1,24 +1,25 @@ import os import glob +import time -from kbmod import ImageCollection +# from kbmod import ImageCollection -# def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None): -# with open(target_uris_file_path, "r") as f: -# for line in f: -# value = line.strip() -# logger.info(line.strip()) +def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None): + with open(target_uris_file_path, "r") as f: + for line in f: + value = line.strip() + logger.info(line.strip()) -# with open(ic_output_file_path, "w") as f: -# f.write(f"Logged: {value}") + with open(ic_output_file_path, "w") as f: + f.write(f"Logged: {value} - {time.time()}\n") -# return ic_output_file_path + return ic_output_file_path #! I believe that we can remove the `uris_base_dir` parameter from the function #! signature. It doesn't seem to be used in practice. -def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None): +def no_uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_path=None, logger=None): """For each URI in the target_uris_file, perform string cleaning and build a file path. Then create an ImageCollection object with the final file paths and write the ImageCollection to a file. @@ -85,7 +86,7 @@ def uri_to_ic(target_uris_file_path=None, uris_base_dir=None, ic_output_file_pat logger.info("Creating ImageCollection") # Create an ImageCollection object from the list of URIs - ic = ImageCollection.fromTargets(uris) + # ic = ImageCollection.fromTargets(uris) logger.info(f"Writing ImageCollection to file {ic_output_file_path}") - ic.write(ic_output_file_path, format="ascii.ecsv") + # ic.write(ic_output_file_path, format="ascii.ecsv") diff --git a/src/kbmod_wf/utilities/memoization_utilities.py b/src/kbmod_wf/utilities/memoization_utilities.py index 5d6a5e7..922c03a 100644 --- a/src/kbmod_wf/utilities/memoization_utilities.py +++ b/src/kbmod_wf/utilities/memoization_utilities.py @@ -1,9 +1,9 @@ import os +import pickle from parsl.dataflow.memoization import id_for_memo from parsl import File @id_for_memo.register(File) def id_for_memo_file(parsl_file_object: File, output_ref: bool = False) -> bytes: - if output_ref and os.path.exists(parsl_file_object.filepath): - return pickle.dumps(parsl_file_object.filepath) + return pickle.dumps(parsl_file_object.filepath) diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index ea99c8b..fc9c4cb 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -10,7 +10,11 @@ from kbmod_wf.utilities.logger_utilities import configure_logger -@python_app(executors=get_executors(["local_dev_testing", "local_thread"])) +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "local_thread"]), + ignore_for_cache=["logging_file"], +) def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None): """This app will go to a given directory, find all of the uri.lst files there, and copy the paths to the manifest file.""" @@ -46,7 +50,9 @@ def create_uri_manifest(inputs=[], outputs=[], config={}, logging_file=None): return outputs[0] -@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) def read_and_log(inputs=[], outputs=[], logging_file=None): """THIS IS A PLACEHOLDER FUNCTION THAT WILL BE REMOVED SOON""" from kbmod_wf.utilities.logger_utilities import configure_logger @@ -64,7 +70,9 @@ def read_and_log(inputs=[], outputs=[], logging_file=None): return outputs[0] -@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) def uri_to_ic(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.uri_to_ic import uri_to_ic @@ -83,7 +91,9 @@ def uri_to_ic(inputs=[], outputs=[], logging_file=None): return outputs[0] -@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) def ic_to_wu(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.ic_to_wu import ic_to_wu @@ -97,7 +107,9 @@ def ic_to_wu(inputs=[], outputs=[], logging_file=None): return outputs[0] -@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) def reproject_wu(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.reproject_wu import reproject_wu @@ -111,7 +123,9 @@ def reproject_wu(inputs=[], outputs=[], logging_file=None): return outputs[0] -@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) def kbmod_search(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.kbmod_search import kbmod_search @@ -125,19 +139,18 @@ def kbmod_search(inputs=[], outputs=[], logging_file=None): return outputs[0] -def workflow_runner(env: str = None, runtime_config: dict = None) -> None: +def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) - if runtime_config is not None: - resource_config = apply_runtime_updates(resource_config, runtime_config) - app_configs = runtime_config.get("apps", {}) + app_configs = runtime_config.get("apps", {}) with parsl.load(resource_config) as dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) logger = configure_logger("workflow.workflow_runner", logging_file.filepath) if runtime_config is not None: - logger.info(f"Using runtime configuration defintion:\n{toml.dumps(runtime_config)}") + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") logger.info("Starting workflow") @@ -162,39 +175,39 @@ def workflow_runner(env: str = None, runtime_config: dict = None) -> None: ) ) - # create an original WorkUnit for each .ecsv file - original_work_unit_futures = [] - for f in uri_to_ic_futures: - original_work_unit_futures.append( - ic_to_wu( + # create an original WorkUnit for each .ecsv file + original_work_unit_futures = [] + for f in uri_to_ic_futures: + original_work_unit_futures.append( + ic_to_wu( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".wu")], + logging_file=logging_file, + ) + ) + + # reproject each WorkUnit for a range of distances + reproject_futures = [] + for f in original_work_unit_futures: + for distance in range(40, 60, 10): + reproject_futures.append( + reproject_wu( inputs=[f.result()], - outputs=[File(f.result().filepath + ".wu")], + outputs=[File(f.result().filepath + f".{distance}.repro")], logging_file=logging_file, ) ) - # reproject each WorkUnit for a range of distances - reproject_futures = [] - for f in original_work_unit_futures: - for distance in range(40, 60, 10): - reproject_futures.append( - reproject_wu( - inputs=[f.result()], - outputs=[File(f.result().filepath + f".{distance}.repro")], - logging_file=logging_file, - ) - ) - - # run kbmod search on each reprojected WorkUnit - search_futures = [] - for f in reproject_futures: - search_futures.append( - kbmod_search( - inputs=[f.result()], - outputs=[File(f.result().filepath + ".search")], - logging_file=logging_file, - ) + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for f in reproject_futures: + search_futures.append( + kbmod_search( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".search")], + logging_file=logging_file, ) + ) [f.result() for f in search_futures] @@ -221,6 +234,7 @@ def workflow_runner(env: str = None, runtime_config: dict = None) -> None: args = parser.parse_args() # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} if args.runtime_config is not None and os.path.exists(args.runtime_config): with open(args.runtime_config, "r") as toml_runtime_config: runtime_config = toml.load(toml_runtime_config)