Skip to content

Commit

Permalink
Merge pull request #5 from dirac-institute/awo/parsl-poc
Browse files Browse the repository at this point in the history
Lots of tweaks, pushing toward a final POC.
  • Loading branch information
drewoldag authored Jul 10, 2024
2 parents 3c667f1 + 94ce5b8 commit 04f8448
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 23 deletions.
3 changes: 2 additions & 1 deletion src/kbmod_wf/configurations/dev_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import datetime
from parsl import Config
from parsl.executors import ThreadPoolExecutor

Expand All @@ -9,7 +10,7 @@
def dev_config():
return Config(
# put the log files in in the top level folder, "run_logs".
run_dir=os.path.join(project_dir, "run_logs"),
run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()),
executors=[
ThreadPoolExecutor(
label="local_dev_testing",
Expand Down
13 changes: 11 additions & 2 deletions src/kbmod_wf/configurations/klone_configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import datetime
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.providers import LocalProvider, SlurmProvider

walltimes = {
"compute-bigmem": "01:00:00", # change this to be appropriate
Expand All @@ -9,7 +11,7 @@

def klone_config():
return Config(
run_dir="/gscratch/dirac/kbmod/workflow/",
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
executors=[
HighThroughputExecutor(
label="small_cpu",
Expand All @@ -27,5 +29,12 @@ def klone_config():
walltime=walltimes["compute-bigmem"],
),
),
HighThroughputExecutor(
label="local_thread",
provider=LocalProvider(
init_blocks=1,
max_blocks=1,
),
),
],
)
File renamed without changes.
File renamed without changes.
44 changes: 44 additions & 0 deletions src/kbmod_wf/task_impls/uri_to_ic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import glob
from kbmod import ImageCollection


def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None):
# Load the list of images from our saved file "sample_uris.txt"
uris = []
with open(target_uris_file) as f:
for l in f.readlines():
l = l.strip() # seeing invisible trailing characters 6/12/2024 COC
if l == "":
continue # skip blank lines 6/12/2024 COC
if not l.startswith("#"):
# Ignore commented metadata
uris.append(l)

if uris_base_dir is not None:
logger.debug(f"Using URIs base dir: {uris_base_dir}")
if not os.path.isdir(uris_base_dir):
logger.error(f"Invalid URIS base directory provided: {uris_base_dir}")
raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}")

# Clean up the URI strings
for i in range(len(uris)):
file_prefix = "file://"
curr = uris[i].replace("%23", "#").strip()
if curr.startswith(file_prefix):
curr = curr[len(file_prefix) :]
if uris_base_dir is not None:
curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep))
uris[i] = curr

# Make sure the files can all be found 6/12/2024 COC
for uri in uris:
if len(glob.glob(uri)) == 0:
raise FileNotFoundError(f"Could not find {uri}.")

logger.info("Creating ImageCollection")
# Create an ImageCollection object from the list of URIs
ic = ImageCollection.fromTargets(uris)
logger.info("ImageCollection created")

ic.write(ic_output_file, format="ascii.ecsv")
3 changes: 0 additions & 3 deletions src/kbmod_wf/tasks/uri_to_ic.py

This file was deleted.

41 changes: 41 additions & 0 deletions src/kbmod_wf/utilities/configuration_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import platform
from typing import Literal

from kbmod_wf.configurations import *


def get_config(env: Literal["dev", "klone"] | None = None):
"""A naive attempt to return a reasonable configuration using platform.system.
This will likely be insufficient in a very short amount of time, but this
is meant to be a first step.
Parameters
----------
env : Literal["dev", "klone"] | None, optional
The common name used to retrieve the given configuration, by default None.
If none, the configuration will be determined by the platform.system().
Returns
-------
parls.config.Config
The configuration object to be used by the parsl.load() function.
Raises
------
ValueError
If an unknown environment is provided, raise a ValueError.
"""

if env is None:
if platform.system().lower() == "darwin":
config = dev_config()
else:
config = klone_config()
elif env == "dev":
config = dev_config()
elif env == "klone":
config = klone_config()
else:
raise ValueError(f"Unknown environment: {env}")

return config
21 changes: 21 additions & 0 deletions src/kbmod_wf/utilities/executor_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from kbmod_wf.utilities.configuration_utilities import get_config


def get_executors(possible_executors=[]):
"""Get the list of executors that are available on the system.
Parameters
----------
possible_executors : List[str]
A list of possible executors that can be used.
Returns
-------
List[str]
A list of executors that are available on the system.
"""

config = get_config()
available_executors = [e.label for e in config.executors]

return [executor for executor in possible_executors if executor in available_executors]
55 changes: 38 additions & 17 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,61 @@
from parsl import python_app, File
import parsl.executors

from kbmod_wf.configurations import *
from kbmod_wf.utilities.configuration_utilities import get_config
from kbmod_wf.utilities.executor_utilities import get_executors


@python_app(executors=["local_dev_testing"])
@python_app(executors=get_executors(["local_dev_testing", "local_thread"]))
def create_uri_manifest(inputs=[], outputs=[], directory_path=None, logging_file=None):
"""This app will go to a given directory, find all of the uri.lst files there,
and copy the paths to the manifest file."""
from kbmod_wf.utilities.logger_utilities import configure_logger

logger = configure_logger("task.create_uri_manifest", logging_file.filepath)

# List all entries in the directory
entries = os.listdir(directory_path)

# Filter out directories, keep only files
files = []
for f in entries:
if os.path.isfile(os.path.join(directory_path, f)):
files.append(os.path.join(os.path.abspath(directory_path), f))

logger.info(f"Found {len(files)} files in {directory_path}")

# Write the filenames to the manifest file
with open(outputs[0].filename, "w") as manifest_file:
for file in files:
manifest_file.write(file + "\n")


@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
def uri_to_ic(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.tasks.uri_to_ic import uri_to_ic
from kbmod_wf.task_impls.uri_to_ic import uri_to_ic

logger = configure_logger("task.uri_to_ic", logging_file.filepath)

logger.info("Starting uri_to_ic")
output = uri_to_ic(logger=logger)
logger.warning("You're the cool one.")
return output
uri_to_ic(ic_output_file=outputs[0].filepath, logger=logger)
logger.warning("Completed uri_to_ic")


@python_app(executors=["local_dev_testing"])
@python_app(executors=get_executors(["local_dev_testing", "small_cpu"]))
def reproject_ic(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.tasks.reproject_ic import reproject_ic
from kbmod_wf.task_impls.reproject_ic import reproject_ic

logger = configure_logger("task.reproject_ic", logging_file.filepath)

logger.info("Starting reproject_ic")
output = reproject_ic(logger=logger)
logger.warning("This is a slow step.")
return output
reproject_ic(logger=logger)
logger.warning("Completed reproject_ic")


def workflow_runner():
with parsl.load(dev_config()) as dfk:
def workflow_runner(env=None):
with parsl.load(get_config(env=env)) as dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))

uri_list = File(os.path.join(os.getcwd(), "uri_list.txt"))
Expand All @@ -44,14 +68,11 @@ def workflow_runner():
)

reproject_ic_future = reproject_ic(
inputs=[],
inputs=[uri_to_ic_future.outputs[0]],
outputs=[],
logging_file=logging_file,
)

print(uri_to_ic_future.result())
print(reproject_ic_future.result())

parsl.clear()


Expand Down

0 comments on commit 04f8448

Please sign in to comment.