Skip to content

Commit

Permalink
Merge pull request #6 from dirac-institute/awo/parsl-poc
Browse files Browse the repository at this point in the history
Scaffolding version of the workflow structure that reproduces the manual steps
  • Loading branch information
drewoldag authored Jul 12, 2024
2 parents 04f8448 + e01231b commit 36181bd
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 78 deletions.
4 changes: 0 additions & 4 deletions src/kbmod_wf/configurations/__init__.py

This file was deleted.

4 changes: 4 additions & 0 deletions src/kbmod_wf/resource_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .dev_configuration import dev_resource_config
from .klone_configuration import klone_resource_config

__all__ = ["dev_resource_config", "klone_resource_config"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
project_dir = os.path.abspath(os.path.join(this_dir, "../../../"))


def dev_config():
def dev_resource_config():
return Config(
# put the log files in in the top level folder, "run_logs".
run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
}


def klone_config():
def klone_resource_config():
return Config(
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
executors=[
Expand All @@ -27,6 +27,8 @@ def klone_config():
mem_per_node=64, # In GB
exclusive=False,
walltime=walltimes["compute-bigmem"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
Expand Down
8 changes: 6 additions & 2 deletions src/kbmod_wf/task_impls/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from .reproject_ic import reproject_ic
from .ic_to_wu import ic_to_wu
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic

__all__ = [
"reproject_ic",
"ic_to_wu",
"kbmod_search",
"reproject_wu",
"uri_to_ic",
]
11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def ic_to_wu(ic_file=None, wu_file=None, logger=None):
logger.info("In the ic_to_wu task_impl")
with open(ic_file, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(wu_file, "w") as f:
f.write(f"Logged: {value}")

return wu_file
11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/kbmod_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def kbmod_search(input_wu=None, result_file=None, logger=None):
logger.info("In the kbmod_search task_impl")
with open(input_wu, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(result_file, "w") as f:
f.write(f"Logged: {value}")

return result_file
3 changes: 0 additions & 3 deletions src/kbmod_wf/task_impls/reproject_ic.py

This file was deleted.

11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/reproject_wu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def reproject_wu(input_wu=None, reprojected_wu=None, logger=None):
logger.info("In the reproject_wu task_impl")
with open(input_wu, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(reprojected_wu, "w") as f:
f.write(f"Logged: {value}")

return reprojected_wu
91 changes: 52 additions & 39 deletions src/kbmod_wf/task_impls/uri_to_ic.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,57 @@
import os
import glob
from kbmod import ImageCollection

# from kbmod import ImageCollection


def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None):
# Load the list of images from our saved file "sample_uris.txt"
uris = []
with open(target_uris_file) as f:
for l in f.readlines():
l = l.strip() # seeing invisible trailing characters 6/12/2024 COC
if l == "":
continue # skip blank lines 6/12/2024 COC
if not l.startswith("#"):
# Ignore commented metadata
uris.append(l)

if uris_base_dir is not None:
logger.debug(f"Using URIs base dir: {uris_base_dir}")
if not os.path.isdir(uris_base_dir):
logger.error(f"Invalid URIS base directory provided: {uris_base_dir}")
raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}")

# Clean up the URI strings
for i in range(len(uris)):
file_prefix = "file://"
curr = uris[i].replace("%23", "#").strip()
if curr.startswith(file_prefix):
curr = curr[len(file_prefix) :]
if uris_base_dir is not None:
curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep))
uris[i] = curr

# Make sure the files can all be found 6/12/2024 COC
for uri in uris:
if len(glob.glob(uri)) == 0:
raise FileNotFoundError(f"Could not find {uri}.")

logger.info("Creating ImageCollection")
# Create an ImageCollection object from the list of URIs
ic = ImageCollection.fromTargets(uris)
logger.info("ImageCollection created")

ic.write(ic_output_file, format="ascii.ecsv")
with open(target_uris_file, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(ic_output_file, "w") as f:
f.write(f"Logged: {value}")

return ic_output_file


# def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None):
# # Load the list of images from our saved file "sample_uris.txt"
# uris = []
# with open(target_uris_file) as f:
# for l in f.readlines():
# l = l.strip() # seeing invisible trailing characters 6/12/2024 COC
# if l == "":
# continue # skip blank lines 6/12/2024 COC
# if not l.startswith("#"):
# # Ignore commented metadata
# uris.append(l)

# if uris_base_dir is not None:
# logger.debug(f"Using URIs base dir: {uris_base_dir}")
# if not os.path.isdir(uris_base_dir):
# logger.error(f"Invalid URIS base directory provided: {uris_base_dir}")
# raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}")

# # Clean up the URI strings
# for i in range(len(uris)):
# file_prefix = "file://"
# curr = uris[i].replace("%23", "#").strip()
# if curr.startswith(file_prefix):
# curr = curr[len(file_prefix) :]
# if uris_base_dir is not None:
# curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep))
# uris[i] = curr

# # Make sure the files can all be found 6/12/2024 COC
# for uri in uris:
# if len(glob.glob(uri)) == 0:
# raise FileNotFoundError(f"Could not find {uri}.")

# logger.info("Creating ImageCollection")
# # Create an ImageCollection object from the list of URIs
# ic = ImageCollection.fromTargets(uris)
# logger.info("ImageCollection created")

# ic.write(ic_output_file, format="ascii.ecsv")
4 changes: 3 additions & 1 deletion src/kbmod_wf/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .configuration_utilities import get_resource_config
from .executor_utilities import get_executors
from .logger_utilities import configure_logger

__all__ = ["configure_logger"]
__all__ = ["get_resource_config", "get_executors", "configure_logger"]
26 changes: 20 additions & 6 deletions src/kbmod_wf/utilities/configuration_utilities.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import platform
from typing import Literal

from kbmod_wf.configurations import *
from kbmod_wf.resource_configs import *


def get_config(env: Literal["dev", "klone"] | None = None):
def get_resource_config(env: Literal["dev", "klone"] | None = None):
"""A naive attempt to return a reasonable configuration using platform.system.
This will likely be insufficient in a very short amount of time, but this
is meant to be a first step.
Expand All @@ -28,14 +28,28 @@ def get_config(env: Literal["dev", "klone"] | None = None):

if env is None:
if platform.system().lower() == "darwin":
config = dev_config()
config = dev_resource_config()
elif is_running_on_wsl():
config = dev_resource_config()
else:
config = klone_config()
config = klone_resource_config()
elif env == "dev":
config = dev_config()
config = dev_resource_config()
elif env == "klone":
config = klone_config()
config = klone_resource_config()
else:
raise ValueError(f"Unknown environment: {env}")

return config


def is_running_on_wsl() -> bool:
"""Check if the script is running on Windows Subsystem for Linux (WSL)."""
if platform.system().lower() == "linux":
try:
with open("/proc/version") as version_file:
content = version_file.read().lower()
return "microsoft" in content or "wsl" in content
except FileNotFoundError:
pass
return False
4 changes: 2 additions & 2 deletions src/kbmod_wf/utilities/executor_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kbmod_wf.utilities.configuration_utilities import get_config
from kbmod_wf.utilities.configuration_utilities import get_resource_config


def get_executors(possible_executors=[]):
Expand All @@ -15,7 +15,7 @@ def get_executors(possible_executors=[]):
A list of executors that are available on the system.
"""

config = get_config()
config = get_resource_config()
available_executors = [e.label for e in config.executors]

return [executor for executor in possible_executors if executor in available_executors]
3 changes: 3 additions & 0 deletions src/kbmod_wf/utilities/logger_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def configure_logger(name, file_path):

import logging

if name in logging.Logger.manager.loggerDict:
return logging.Logger.manager.loggerDict[name]

logger = logging.getLogger(name)
handler = logging.FileHandler(file_path)
formatter = logging.Formatter(DEFAULT_FORMAT, datefmt="%Y-%m-%d %H:%M:%S")
Expand Down
Loading

0 comments on commit 36181bd

Please sign in to comment.