Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update workflow to start with ImageCollection files and use a Butler #30

Merged
merged 7 commits into from
Aug 18, 2024
15 changes: 15 additions & 0 deletions example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,32 @@ checkpoint_mode = 'task_exit'
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"


[apps.uri_to_ic]
# The path to the butler config file that instantiate a butler to retrieve images
butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml"


[apps.ic_to_wu]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
search_config_filepath = "/home/drew/code/kbmod-wf/dev_staging/search_config.yaml"

# The path to the butler config file that instantiate a butler to retrieve images
butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml"
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

# Remove a previously created WU file if it exists
overwrite = false


[apps.reproject_wu]
# Number of processors to use for parallelizing the reprojection
n_workers = 32

# The name of the observation site to use for reflex correction
observation_site = "ctio"


[apps.kbmod_search]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
Expand Down
261 changes: 261 additions & 0 deletions src/kbmod_wf/ic_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import argparse
import os
import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config
from kbmod_wf.utilities.executor_utilities import get_executors
from kbmod_wf.utilities.logger_utilities import configure_logger

wilsonbb marked this conversation as resolved.
Show resolved Hide resolved

@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "local_thread"]),
ignore_for_cache=["logging_file"],
)
def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None):
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
"""This app will go to a given directory, find all of the *.collection files there,
and copy the paths to a manifest file."""
import glob
import os
from kbmod_wf.utilities.logger_utilities import configure_logger

logger = configure_logger("task.create_manifest", logging_file.filepath)

directory_path = runtime_config.get("staging_directory")
if directory_path is None:
raise ValueError("No staging_directory provided in the configuration.")

logger.info(f"Looking for staged files in {directory_path}")

# Gather all the *.lst entries in the directory
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
pattern = os.path.join(directory_path, "*.collection")
entries = glob.glob(pattern)

# Filter out directories, keep only files
files = []
for f in entries:
if os.path.isfile(os.path.join(directory_path, f)):
files.append(os.path.join(os.path.abspath(directory_path), f))

logger.info(f"Found {len(files)} files in {directory_path}")

# Write the filenames to the manifest file
with open(outputs[0].filename, "w") as manifest_file:
for file in files:
manifest_file.write(file + "\n")

return outputs[0]


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
)
def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None):
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.uri_to_ic import uri_to_ic

logger = configure_logger("task.uri_to_ic", logging_file.filepath)

logger.info("Starting uri_to_ic")
try:
uri_to_ic(
uris_filepath=inputs[0].filepath,
uris_base_dir=None, # determine what, if any, value should be used.
ic_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running uri_to_ic: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed uri_to_ic")
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

return outputs[0]


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"]
)
def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.ic_to_wu import ic_to_wu

logger = configure_logger("task.ic_to_wu", logging_file.filepath)

logger.info("Starting ic_to_wu")
try:
ic_to_wu(
ic_filepath=inputs[0].filepath,
wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running ic_to_wu: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed ic_to_wu")

return outputs[0]


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.reproject_wu import reproject_wu

logger = configure_logger("task.reproject_wu", logging_file.filepath)

logger.info("Starting reproject_ic")
try:
reproject_wu(
original_wu_filepath=inputs[0].filepath,
uri_filepath=inputs[1].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running reproject_ic: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed reproject_ic")

return outputs[0]


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"]
)
def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None):
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.kbmod_search import kbmod_search

logger = configure_logger("task.kbmod_search", logging_file.filepath)

logger.info("Starting kbmod_search")
try:
kbmod_search(
wu_filepath=inputs[0].filepath,
result_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running kbmod_search: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed kbmod_search")

return outputs[0]


def workflow_runner(env: str = None, runtime_config: dict = {}) -> None:
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

app_configs = runtime_config.get("apps", {})

with parsl.load(resource_config) as dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the *.collection files that are staged for processing
manifest_file = File(os.path.join(os.getcwd(), "manifest.txt"))
create_manifest_future = create_uri_manifest(
inputs=[],
outputs=[manifest_file],
runtime_config=app_configs.get("create_uri_manifest", {}),
logging_file=logging_file,
)

with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
original_work_unit_futures = []
collection_files = []
for line in f:
collection_file = File(line.strip())
collection_files.append(collection_file)
original_work_unit_futures.append(
ic_to_wu(
inputs=[collection_file],
outputs=[File(line + ".wu")],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit
# For chip-by-chip, this isn't really necessary, so hardcoding to 0.
reproject_futures = []
for f, collection_file in zip(original_work_unit_futures, collection_files):
distance = 0
reproject_futures.append(
reproject_wu(
inputs=[f.result(), collection_file],
outputs=[File(f.result().filepath + f".{distance}.repro")],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
36 changes: 21 additions & 15 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from kbmod import ImageCollection
from kbmod.configuration import SearchConfiguration
from lsst.daf.butler import Butler

import os
import glob
Expand Down Expand Up @@ -52,29 +53,34 @@ def __init__(
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
make_wu = True
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

if make_wu:
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

last_time = time.time()
#! This needs the butler.
orig_wu = ic.toWorkUnit(search_config=SearchConfiguration.from_file(self.search_config_filepath))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")
last_time = time.time()
self.logger.info("Creating butler instance")
this_butler = Butler(self.runtime_config.get("butler_config_filepath", None))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to instantiate butler.")
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")
last_time = time.time()
orig_wu = ic.toWorkUnit(
search_config=SearchConfiguration.from_file(self.search_config_filepath), butler=this_butler
)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")

return self.wu_filepath
8 changes: 4 additions & 4 deletions src/kbmod_wf/task_impls/reproject_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ def __init__(
# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

#! In the long run, we likely won't have the URI files to start from
#! So we'll need to rethink how we get these parameters.
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
self.uri_params = self._get_params_from_uri_file()
self.patch_size = self.uri_params["patch_size"]
self.pixel_scale = self.uri_params["pixel_scale"]
self.guess_dist = self.uri_params["dist_au"] # ! Let's update the terminology here to be consistent.
self.patch_corners = self.uri_params[
"patch_box"
] # ! Let's update the terminology here to be consistent.
self.guess_dist = self.uri_params.get("dist_au", 40)
self.patch_corners = self.uri_params["patch_box"]

if "patch_size" not in self.uri_params:
raise KeyError(
Expand Down
9 changes: 8 additions & 1 deletion src/kbmod_wf/task_impls/uri_to_ic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from logging import Logger
from kbmod import ImageCollection
from lsst.daf.butler import Butler


#! I believe that we can remove the `uris_base_dir` parameter from the function
Expand Down Expand Up @@ -82,7 +83,13 @@ def uri_to_ic(

logger.info("Creating ImageCollection")
# Create an ImageCollection object from the list of URIs
ic = ImageCollection.fromTargets(uris)
last_time = time.time()
logger.info("Creating butler instance")
this_butler = Butler(runtime_config.get("butler_config_filepath", None))
elapsed = round(time.time() - last_time, 1)
logger.debug(f"Required {elapsed}[s] to instantiate butler.")

ic = ImageCollection.fromTargets(uris, butler=this_butler)
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Writing ImageCollection to file {ic_filepath}")
ic.write(ic_filepath, format="ascii.ecsv")
Loading
Loading