Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update workflow to start with ImageCollection files and use a Butler #30

Merged
merged 7 commits into from
Aug 18, 2024
14 changes: 13 additions & 1 deletion example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,34 @@ checkpoint_mode = 'task_exit'

# Values in the apps.XXX section will be passed as a dictionary to the corresponding
# app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app.
[apps.create_uri_manifest]
[apps.create_manifest]
# The path to the staging directory
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"
output_directory = "/home/drew/code/kbmod-wf/dev_staging/single_chip_workflow"
file_pattern = "*.collection"


[apps.ic_to_wu]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
search_config_filepath = "/home/drew/code/kbmod-wf/dev_staging/search_config.yaml"

# The path to the butler config file that instantiate a butler to retrieve images
butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml"
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

# Remove a previously created WU file if it exists
overwrite = false


[apps.reproject_wu]
# Number of processors to use for parallelizing the reprojection
n_workers = 32

# The name of the observation site to use for reflex correction
observation_site = "ctio"


[apps.kbmod_search]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
Expand Down
8 changes: 4 additions & 4 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def klone_resource_config():
executors=[
HighThroughputExecutor(
label="small_cpu",
max_workers_per_node=1,
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
Expand All @@ -44,7 +44,7 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="large_mem",
max_workers_per_node=1,
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
Expand All @@ -63,7 +63,7 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="sharded_reproject",
max_workers_per_node=1,
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
Expand All @@ -82,7 +82,7 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="gpu",
max_workers_per_node=1,
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="escience",
Expand Down
157 changes: 157 additions & 0 deletions src/kbmod_wf/single_chip_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import argparse
import os
import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config
from kbmod_wf.utilities.executor_utilities import get_executors
from kbmod_wf.utilities.logger_utilities import configure_logger

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search


# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu
# that should be refactored.
# The only difference is the import of reproject_single_chip_single_night_wu here.
@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu

logger = configure_logger("task.reproject_wu", logging_file.filepath)

logger.info("Starting reproject_ic")
try:
reproject_wu(
original_wu_filepath=inputs[0].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running reproject_ic: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed reproject_ic")

return outputs[0]


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.

Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

app_configs = runtime_config.get("apps", {})

dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
original_work_unit_futures = []
collection_files = []
for line in f:
collection_file = File(line.strip())
collection_files.append(collection_file)
original_work_unit_futures.append(
ic_to_wu(
inputs=[collection_file],
outputs=[File(line + ".wu")],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit
# For chip-by-chip, this isn't really necessary, so hardcoding to 0.
reproject_futures = []
for f, collection_file in zip(original_work_unit_futures, collection_files):
distance = 0
reproject_futures.append(
reproject_wu(
inputs=[f.result(), collection_file],
outputs=[File(f.result().filepath + f".{distance}.repro")],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
1 change: 0 additions & 1 deletion src/kbmod_wf/task_impls/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .ic_to_wu import ic_to_wu
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic

__all__ = [
Expand Down
36 changes: 21 additions & 15 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from kbmod import ImageCollection
from kbmod.configuration import SearchConfiguration
from lsst.daf.butler import Butler

import os
import glob
Expand Down Expand Up @@ -52,29 +53,34 @@ def __init__(
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
make_wu = True
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

if make_wu:
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

last_time = time.time()
#! This needs the butler.
orig_wu = ic.toWorkUnit(search_config=SearchConfiguration.from_file(self.search_config_filepath))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")
last_time = time.time()
self.logger.info("Creating butler instance")
this_butler = Butler(self.runtime_config.get("butler_config_filepath", None))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to instantiate butler.")
drewoldag marked this conversation as resolved.
Show resolved Hide resolved

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")
last_time = time.time()
orig_wu = ic.toWorkUnit(
search_config=SearchConfiguration.from_file(self.search_config_filepath), butler=this_butler
)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")

return self.wu_filepath
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ def __init__(
# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

#! In the long run, we likely won't have the URI files to start from
#! So we'll need to rethink how we get these parameters.
self.uri_params = self._get_params_from_uri_file()
self.patch_size = self.uri_params["patch_size"]
self.pixel_scale = self.uri_params["pixel_scale"]
self.guess_dist = self.uri_params["dist_au"] # ! Let's update the terminology here to be consistent.
self.patch_corners = self.uri_params[
"patch_box"
] # ! Let's update the terminology here to be consistent.
self.guess_dist = self.uri_params.get("dist_au", 40)
self.patch_corners = self.uri_params["patch_box"]

if "patch_size" not in self.uri_params:
raise KeyError(
Expand Down
Loading
Loading