Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaffolding version of the workflow structure that reproduces the manual steps #6

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/kbmod_wf/configurations/__init__.py

This file was deleted.

4 changes: 4 additions & 0 deletions src/kbmod_wf/resource_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .dev_configuration import dev_resource_config
from .klone_configuration import klone_resource_config

__all__ = ["dev_resource_config", "klone_resource_config"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
project_dir = os.path.abspath(os.path.join(this_dir, "../../../"))


def dev_config():
def dev_resource_config():
return Config(
# put the log files in in the top level folder, "run_logs".
run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
}


def klone_config():
def klone_resource_config():
return Config(
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
executors=[
Expand All @@ -27,6 +27,8 @@ def klone_config():
mem_per_node=64, # In GB
exclusive=False,
walltime=walltimes["compute-bigmem"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
),
HighThroughputExecutor(
Expand Down
8 changes: 6 additions & 2 deletions src/kbmod_wf/task_impls/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from .reproject_ic import reproject_ic
from .ic_to_wu import ic_to_wu
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic

__all__ = [
"reproject_ic",
"ic_to_wu",
"kbmod_search",
"reproject_wu",
"uri_to_ic",
]
11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def ic_to_wu(ic_file=None, wu_file=None, logger=None):
logger.info("In the ic_to_wu task_impl")
with open(ic_file, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(wu_file, "w") as f:
f.write(f"Logged: {value}")

return wu_file
11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/kbmod_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def kbmod_search(input_wu=None, result_file=None, logger=None):
logger.info("In the kbmod_search task_impl")
with open(input_wu, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(result_file, "w") as f:
f.write(f"Logged: {value}")

return result_file
3 changes: 0 additions & 3 deletions src/kbmod_wf/task_impls/reproject_ic.py

This file was deleted.

11 changes: 11 additions & 0 deletions src/kbmod_wf/task_impls/reproject_wu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def reproject_wu(input_wu=None, reprojected_wu=None, logger=None):
logger.info("In the reproject_wu task_impl")
with open(input_wu, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(reprojected_wu, "w") as f:
f.write(f"Logged: {value}")

return reprojected_wu
91 changes: 52 additions & 39 deletions src/kbmod_wf/task_impls/uri_to_ic.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,57 @@
import os
import glob
from kbmod import ImageCollection

# from kbmod import ImageCollection


def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None):
# Load the list of images from our saved file "sample_uris.txt"
uris = []
with open(target_uris_file) as f:
for l in f.readlines():
l = l.strip() # seeing invisible trailing characters 6/12/2024 COC
if l == "":
continue # skip blank lines 6/12/2024 COC
if not l.startswith("#"):
# Ignore commented metadata
uris.append(l)

if uris_base_dir is not None:
logger.debug(f"Using URIs base dir: {uris_base_dir}")
if not os.path.isdir(uris_base_dir):
logger.error(f"Invalid URIS base directory provided: {uris_base_dir}")
raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}")

# Clean up the URI strings
for i in range(len(uris)):
file_prefix = "file://"
curr = uris[i].replace("%23", "#").strip()
if curr.startswith(file_prefix):
curr = curr[len(file_prefix) :]
if uris_base_dir is not None:
curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep))
uris[i] = curr

# Make sure the files can all be found 6/12/2024 COC
for uri in uris:
if len(glob.glob(uri)) == 0:
raise FileNotFoundError(f"Could not find {uri}.")

logger.info("Creating ImageCollection")
# Create an ImageCollection object from the list of URIs
ic = ImageCollection.fromTargets(uris)
logger.info("ImageCollection created")

ic.write(ic_output_file, format="ascii.ecsv")
with open(target_uris_file, "r") as f:
for line in f:
value = line.strip()
logger.info(line.strip())

with open(ic_output_file, "w") as f:
f.write(f"Logged: {value}")

return ic_output_file


# def uri_to_ic(target_uris_file=None, uris_base_dir=None, ic_output_file=None, logger=None):
# # Load the list of images from our saved file "sample_uris.txt"
# uris = []
# with open(target_uris_file) as f:
# for l in f.readlines():
# l = l.strip() # seeing invisible trailing characters 6/12/2024 COC
# if l == "":
# continue # skip blank lines 6/12/2024 COC
# if not l.startswith("#"):
# # Ignore commented metadata
# uris.append(l)

# if uris_base_dir is not None:
# logger.debug(f"Using URIs base dir: {uris_base_dir}")
# if not os.path.isdir(uris_base_dir):
# logger.error(f"Invalid URIS base directory provided: {uris_base_dir}")
# raise ValueError(f"Invalid URIS base directory provided: {uris_base_dir}")

# # Clean up the URI strings
# for i in range(len(uris)):
# file_prefix = "file://"
# curr = uris[i].replace("%23", "#").strip()
# if curr.startswith(file_prefix):
# curr = curr[len(file_prefix) :]
# if uris_base_dir is not None:
# curr = os.path.join(uris_base_dir, curr.lstrip(os.path.sep))
# uris[i] = curr

# # Make sure the files can all be found 6/12/2024 COC
# for uri in uris:
# if len(glob.glob(uri)) == 0:
# raise FileNotFoundError(f"Could not find {uri}.")

# logger.info("Creating ImageCollection")
# # Create an ImageCollection object from the list of URIs
# ic = ImageCollection.fromTargets(uris)
# logger.info("ImageCollection created")

# ic.write(ic_output_file, format="ascii.ecsv")
4 changes: 3 additions & 1 deletion src/kbmod_wf/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .configuration_utilities import get_resource_config
from .executor_utilities import get_executors
from .logger_utilities import configure_logger

__all__ = ["configure_logger"]
__all__ = ["get_resource_config", "get_executors", "configure_logger"]
26 changes: 20 additions & 6 deletions src/kbmod_wf/utilities/configuration_utilities.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import platform
from typing import Literal

from kbmod_wf.configurations import *
from kbmod_wf.resource_configs import *


def get_config(env: Literal["dev", "klone"] | None = None):
def get_resource_config(env: Literal["dev", "klone"] | None = None):
"""A naive attempt to return a reasonable configuration using platform.system.
This will likely be insufficient in a very short amount of time, but this
is meant to be a first step.
Expand All @@ -28,14 +28,28 @@ def get_config(env: Literal["dev", "klone"] | None = None):

if env is None:
if platform.system().lower() == "darwin":
config = dev_config()
config = dev_resource_config()
elif is_running_on_wsl():
config = dev_resource_config()
else:
config = klone_config()
config = klone_resource_config()
elif env == "dev":
config = dev_config()
config = dev_resource_config()
elif env == "klone":
config = klone_config()
config = klone_resource_config()
else:
raise ValueError(f"Unknown environment: {env}")

return config


def is_running_on_wsl() -> bool:
"""Check if the script is running on Windows Subsystem for Linux (WSL)."""
if platform.system().lower() == "linux":
try:
with open("/proc/version") as version_file:
content = version_file.read().lower()
return "microsoft" in content or "wsl" in content
except FileNotFoundError:
pass
return False
4 changes: 2 additions & 2 deletions src/kbmod_wf/utilities/executor_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kbmod_wf.utilities.configuration_utilities import get_config
from kbmod_wf.utilities.configuration_utilities import get_resource_config


def get_executors(possible_executors=[]):
Expand All @@ -15,7 +15,7 @@ def get_executors(possible_executors=[]):
A list of executors that are available on the system.
"""

config = get_config()
config = get_resource_config()
available_executors = [e.label for e in config.executors]

return [executor for executor in possible_executors if executor in available_executors]
3 changes: 3 additions & 0 deletions src/kbmod_wf/utilities/logger_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def configure_logger(name, file_path):

import logging

if name in logging.Logger.manager.loggerDict:
return logging.Logger.manager.loggerDict[name]

logger = logging.getLogger(name)
handler = logging.FileHandler(file_path)
formatter = logging.Formatter(DEFAULT_FORMAT, datefmt="%Y-%m-%d %H:%M:%S")
Expand Down
Loading
Loading