From 94ce5b8106e71d36c6f88c0031241046cc90b86a Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Wed, 10 Jul 2024 16:02:20 -0700 Subject: [PATCH] Lots of tweaks, pushing toward a final POC. --- .../configurations/dev_configuration.py | 3 +- .../configurations/klone_configuration.py | 13 ++++- .../{tasks => task_impls}/__init__.py | 0 .../{tasks => task_impls}/reproject_ic.py | 0 src/kbmod_wf/task_impls/uri_to_ic.py | 44 +++++++++++++++ src/kbmod_wf/tasks/uri_to_ic.py | 3 - .../utilities/configuration_utilities.py | 41 ++++++++++++++ src/kbmod_wf/utilities/executor_utilities.py | 21 +++++++ src/kbmod_wf/workflow.py | 55 +++++++++++++------ 9 files changed, 157 insertions(+), 23 deletions(-) rename src/kbmod_wf/{tasks => task_impls}/__init__.py (100%) rename src/kbmod_wf/{tasks => task_impls}/reproject_ic.py (100%) create mode 100644 src/kbmod_wf/task_impls/uri_to_ic.py delete mode 100644 src/kbmod_wf/tasks/uri_to_ic.py create mode 100644 src/kbmod_wf/utilities/configuration_utilities.py create mode 100644 src/kbmod_wf/utilities/executor_utilities.py diff --git a/src/kbmod_wf/configurations/dev_configuration.py b/src/kbmod_wf/configurations/dev_configuration.py index 287ba15..8e50ba7 100644 --- a/src/kbmod_wf/configurations/dev_configuration.py +++ b/src/kbmod_wf/configurations/dev_configuration.py @@ -1,4 +1,5 @@ import os +import datetime from parsl import Config from parsl.executors import ThreadPoolExecutor @@ -9,7 +10,7 @@ def dev_config(): return Config( # put the log files in in the top level folder, "run_logs". - run_dir=os.path.join(project_dir, "run_logs"), + run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()), executors=[ ThreadPoolExecutor( label="local_dev_testing", diff --git a/src/kbmod_wf/configurations/klone_configuration.py b/src/kbmod_wf/configurations/klone_configuration.py index bd0c646..5822ca0 100644 --- a/src/kbmod_wf/configurations/klone_configuration.py +++ b/src/kbmod_wf/configurations/klone_configuration.py @@ -1,6 +1,8 @@ +import os +import datetime from parsl import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import SlurmProvider +from parsl.providers import LocalProvider, SlurmProvider walltimes = { "compute-bigmem": "01:00:00", # change this to be appropriate @@ -9,7 +11,7 @@ def klone_config(): return Config( - run_dir="/gscratch/dirac/kbmod/workflow/", + run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()), executors=[ HighThroughputExecutor( label="small_cpu", @@ -27,5 +29,12 @@ def klone_config(): walltime=walltimes["compute-bigmem"], ), ), + HighThroughputExecutor( + label="local_thread", + provider=LocalProvider( + init_blocks=1, + max_blocks=1, + ), + ), ], ) diff --git a/src/kbmod_wf/tasks/__init__.py b/src/kbmod_wf/task_impls/__init__.py similarity index 100% rename from src/kbmod_wf/tasks/__init__.py rename to src/kbmod_wf/task_impls/__init__.py diff --git a/src/kbmod_wf/tasks/reproject_ic.py b/src/kbmod_wf/task_impls/reproject_ic.py similarity index 100% rename from src/kbmod_wf/tasks/reproject_ic.py rename to src/kbmod_wf/task_impls/reproject_ic.py diff --git a/src/kbmod_wf/task_impls/uri_to_ic.py b/src/kbmod_wf/task_impls/uri_to_ic.py new file mode 100644 index 0000000..2a6d775 --- /dev/null +++ b/src/kbmod_wf/task_impls/uri_to_ic.py @@ -0,0 +1,44 @@ +import os +import glob +from kbmod import ImageCollection + + +def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None): + # Load the list of images from our saved file "sample_uris.txt" + uris = [] + with open(target_uris_file) as f: + for l in f.readlines(): + l = l.strip() # seeing invisible trailing characters 6/12/2024 COC + if l == "": + continue # skip blank lines 6/12/2024 COC + if not l.startswith("#"): + # Ignore commented metadata + uris.append(l) + + if uris_base_dir is not None: + logger.debug(f"Using URIs base dir: {uris_base_dir}") + if not os.path.isdir(uris_base_dir): + logger.error(f"Invalid URIS base directory provided: {uris_base_dir}") + raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}") + + # Clean up the URI strings + for i in range(len(uris)): + file_prefix = "file://" + curr = uris[i].replace("%23", "#").strip() + if curr.startswith(file_prefix): + curr = curr[len(file_prefix) :] + if uris_base_dir is not None: + curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep)) + uris[i] = curr + + # Make sure the files can all be found 6/12/2024 COC + for uri in uris: + if len(glob.glob(uri)) == 0: + raise FileNotFoundError(f"Could not find {uri}.") + + logger.info("Creating ImageCollection") + # Create an ImageCollection object from the list of URIs + ic = ImageCollection.fromTargets(uris) + logger.info("ImageCollection created") + + ic.write(ic_output_file, format="ascii.ecsv") diff --git a/src/kbmod_wf/tasks/uri_to_ic.py b/src/kbmod_wf/tasks/uri_to_ic.py deleted file mode 100644 index f981035..0000000 --- a/src/kbmod_wf/tasks/uri_to_ic.py +++ /dev/null @@ -1,3 +0,0 @@ -def uri_to_ic(logger=None): - logger.info("In the uri_to_ic task_impl") - return 10 diff --git a/src/kbmod_wf/utilities/configuration_utilities.py b/src/kbmod_wf/utilities/configuration_utilities.py new file mode 100644 index 0000000..47de7ff --- /dev/null +++ b/src/kbmod_wf/utilities/configuration_utilities.py @@ -0,0 +1,41 @@ +import platform +from typing import Literal + +from kbmod_wf.configurations import * + + +def get_config(env: Literal["dev", "klone"] | None = None): + """A naive attempt to return a reasonable configuration using platform.system. + This will likely be insufficient in a very short amount of time, but this + is meant to be a first step. + + Parameters + ---------- + env : Literal["dev", "klone"] | None, optional + The common name used to retrieve the given configuration, by default None. + If none, the configuration will be determined by the platform.system(). + + Returns + ------- + parls.config.Config + The configuration object to be used by the parsl.load() function. + + Raises + ------ + ValueError + If an unknown environment is provided, raise a ValueError. + """ + + if env is None: + if platform.system().lower() == "darwin": + config = dev_config() + else: + config = klone_config() + elif env == "dev": + config = dev_config() + elif env == "klone": + config = klone_config() + else: + raise ValueError(f"Unknown environment: {env}") + + return config diff --git a/src/kbmod_wf/utilities/executor_utilities.py b/src/kbmod_wf/utilities/executor_utilities.py new file mode 100644 index 0000000..f23e15f --- /dev/null +++ b/src/kbmod_wf/utilities/executor_utilities.py @@ -0,0 +1,21 @@ +from kbmod_wf.utilities.configuration_utilities import get_config + + +def get_executors(possible_executors=[]): + """Get the list of executors that are available on the system. + + Parameters + ---------- + possible_executors : List[str] + A list of possible executors that can be used. + + Returns + ------- + List[str] + A list of executors that are available on the system. + """ + + config = get_config() + available_executors = [e.label for e in config.executors] + + return [executor for executor in possible_executors if executor in available_executors] diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index 6d34d57..d329b12 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -3,37 +3,61 @@ from parsl import python_app, File import parsl.executors -from kbmod_wf.configurations import * +from kbmod_wf.utilities.configuration_utilities import get_config +from kbmod_wf.utilities.executor_utilities import get_executors -@python_app(executors=["local_dev_testing"]) +@python_app(executors=get_executors(["local_dev_testing", "local_thread"])) +def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file=None): + """This app will go to a given directory, find all of the uri.lst files there, + and copy the paths to the manifest file.""" + from kbmod_wf.utilities.logger_utilities import configure_logger + + logger = configure_logger("task.create_uri_manifest", logging_file.filepath) + + # List all entries in the directory + entries = os.listdir(directory_path) + + # Filter out directories, keep only files + files = [] + for f in entries: + if os.path.isfile(os.path.join(directory_path, f)): + files.append(os.path.join(os.path.abspath(directory_path), f)) + + logger.info(f"Found {len(files)} files in {directory_path}") + + # Write the filenames to the manifest file + with open(outputs[0].filename, "w") as manifest_file: + for file in files: + manifest_file.write(file + "\n") + + +@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) def uri_to_ic(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.tasks.uri_to_ic import uri_to_ic + from kbmod_wf.task_impls.uri_to_ic import uri_to_ic logger = configure_logger("task.uri_to_ic", logging_file.filepath) logger.info("Starting uri_to_ic") - output = uri_to_ic(logger=logger) - logger.warning("You're the cool one.") - return output + uri_to_ic(ic_output_file=outputs[0].filepath, logger=logger) + logger.warning("Completed uri_to_ic") -@python_app(executors=["local_dev_testing"]) +@python_app(executors=get_executors(["local_dev_testing", "small_cpu"])) def reproject_ic(inputs=[], outputs=[], logging_file=None): from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.tasks.reproject_ic import reproject_ic + from kbmod_wf.task_impls.reproject_ic import reproject_ic logger = configure_logger("task.reproject_ic", logging_file.filepath) logger.info("Starting reproject_ic") - output = reproject_ic(logger=logger) - logger.warning("This is a slow step.") - return output + reproject_ic(logger=logger) + logger.warning("Completed reproject_ic") -def workflow_runner(): - with parsl.load(dev_config()) as dfk: +def workflow_runner(env=None): + with parsl.load(get_config(env=env)) as dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) uri_list = File(os.path.join(os.getcwd(), "uri_list.txt")) @@ -44,14 +68,11 @@ def workflow_runner(): ) reproject_ic_future = reproject_ic( - inputs=[], + inputs=[uri_to_ic_future.outputs[0]], outputs=[], logging_file=logging_file, ) - print(uri_to_ic_future.result()) - print(reproject_ic_future.result()) - parsl.clear()