Skip to content

Commit

Permalink
Comibine ic_to_wu and reproject steps
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonbb committed Oct 22, 2024
1 parent f83ef49 commit e2cbe7c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 67 deletions.
53 changes: 18 additions & 35 deletions src/kbmod_wf/multi_night_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
with ErrorLogger(logger):
reproject_wu(
guess_dist,
original_wu_filepath=inputs[0].filepath,
ic_filepath=inputs[0],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
Expand Down Expand Up @@ -79,45 +79,28 @@ def workflow_runner(env=None, runtime_config={}):
logging_file=logging_file,
)

wu_filenames =[]
with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
original_work_unit_futures = []
collection_files = []
for line in f:
collection_file = File(line.strip())
collection_files.append(collection_file)
wu_filename = line + ".wu"
wu_filenames.append(wu_filename)
original_work_unit_futures.append(
ic_to_wu(
inputs=[collection_file],
outputs=[File(wu_filename)],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
repro_wu_filenames = []
runtime_config=app_configs.get("reproject_wu", {})
for i in range(len(original_work_unit_futures)):
f = original_work_unit_futures[i]
# Get the requested heliocentric guess distances (in AU) for reflex correction.
# If none are provided, default to 42.0 AU.
distances = runtime_config["helio_guess_dists"] if "helio_guess_dists" in runtime_config else [42.0]
for dist in distances:
output_filename=wu_filenames[i]+ f".{dist}.repro"
repro_wu_filenames.append(output_filename)
reproject_futures.append(
reproject_wu(
inputs=[f, dist],
outputs=[File(output_filename)],
runtime_config=runtime_config,
logging_file=logging_file,
with open(create_manifest_future.result(), "r") as f:
for line in f:
collection_file = File(line.strip())
wu_filename = line + ".wu"
# Get the requested heliocentric guess distances (in AU) for reflex correction.
# If none are provided, default to 42.0 AU.
distances = runtime_config["helio_guess_dists"] if "helio_guess_dists" in runtime_config else [42.0]
for dist in distances:
output_filename=wu_filename + f".{dist}.repro"
repro_wu_filenames.append(output_filename)
reproject_futures.append(
reproject_wu(
inputs=[collection_file, dist],
outputs=[File(output_filename)],
runtime_config=runtime_config,
logging_file=logging_file,
)
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
Expand Down
24 changes: 12 additions & 12 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def ic_to_wu(
ic_filepath: str = None, wu_filepath: str = None, runtime_config: dict = {}, logger: Logger = None
ic_filepath: str = None, wu_filepath: str = None, save: bool = True, runtime_config: dict = {}, logger: Logger = None
):
"""This task will convert an ImageCollection to a WorkUnit.
Expand All @@ -19,18 +19,20 @@ def ic_to_wu(
The fully resolved filepath to the input ImageCollection file, by default None
wu_filepath : str, optional
The fully resolved filepath for the output WorkUnit file, by default None
save : bool, optional
Flag to save the WorkUnit to disk, by default True. If False, the WorkUnit is returned.
runtime_config : dict, optional
Additional configuration parameters to be used at runtime, by default {}
logger : Logger, optional
Primary logger for the workflow, by default None
Returns
-------
str
The fully resolved filepath of the output WorkUnit file.
str | WorkUnit
The fully resolved filepath of the output WorkUnit file or the WorkUnit object itself if save=False.
"""
ic_to_wu_converter = ICtoWUConverter(
ic_filepath=ic_filepath, wu_filepath=wu_filepath, runtime_config=runtime_config, logger=logger
ic_filepath=ic_filepath, wu_filepath=wu_filepath, save=save, runtime_config=runtime_config, logger=logger
)

return ic_to_wu_converter.create_work_unit()
Expand All @@ -41,25 +43,19 @@ def __init__(
self,
ic_filepath: str = None,
wu_filepath: str = None,
save: bool = True,
runtime_config: dict = {},
logger: Logger = None,
):
self.ic_filepath = ic_filepath
self.wu_filepath = wu_filepath
self.save = save
self.runtime_config = runtime_config
self.logger = logger

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

Expand All @@ -76,6 +72,10 @@ def create_work_unit(self):
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

if not self.save:
self.logger.debug(f"Required {elapsed}[s] to create the WorkUnit.")
return orig_wu

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
Expand Down
44 changes: 27 additions & 17 deletions src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import kbmod
from kbmod import ImageCollection
from kbmod.work_unit import WorkUnit
from kbmod.reprojection_utils import transform_wcses_to_ebd


import kbmod.reprojection as reprojection
from kbmod_wf.task_impls.ic_to_wu import ic_to_wu
from astropy.wcs import WCS
from astropy.io import fits
from astropy.coordinates import EarthLocation
Expand All @@ -15,7 +18,7 @@

def reproject_wu(
guess_dist: float,
original_wu_filepath: str = None,
ic_filepath: str,
reprojected_wu_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
Expand All @@ -26,8 +29,8 @@ def reproject_wu(
----------
guess_dist: float
The heliocentric guess distance to reproject to in AU.
original_wu_filepath : str, optional
The fully resolved filepath to the input WorkUnit file, by default None
ic_filepath : str
The fully resolved filepath to the input ImageCollection file
reprojected_wu_filepath : str, optional
The fully resolved filepath to the resulting WorkUnit file after reflex
and reprojection, by default None
Expand All @@ -44,7 +47,7 @@ def reproject_wu(
"""
wu_reprojector = WUReprojector(
guess_dist=guess_dist,
original_wu_filepath=original_wu_filepath,
ic_filepath=ic_filepath,
reprojected_wu_filepath=reprojected_wu_filepath,
runtime_config=runtime_config,
logger=logger,
Expand All @@ -57,13 +60,13 @@ class WUReprojector:
def __init__(
self,
guess_dist: float,
original_wu_filepath: str = None,
ic_filepath: str = None,
reprojected_wu_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
self.guess_dist = guess_dist
self.original_wu_filepath = original_wu_filepath
self.ic_filepath = ic_filepath
self.reprojected_wu_filepath = reprojected_wu_filepath
self.runtime_config = runtime_config
self.logger = logger
Expand All @@ -79,12 +82,17 @@ def __init__(

def reproject_workunit(self):
last_time = time.time()
self.logger.info(f"Lazy reading existing WorkUnit from disk: {self.original_wu_filepath}")
directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
self.logger.info(f"Loading a WorkUnit from ImageCollection at {self.ic_filepath}")
wu = ic_to_wu(
ic_filepath=self.ic_filepath,
wu_filepath=None,
save=False,
runtime_config=self.runtime_config,
logger=self.logger,
)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(
f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}."
f"Required {elapsed}[s] to create original WorkUnit from ImageCollection at {self.ic_filepath}."
)

#! This method to get image dimensions won't hold if the images are different sizes.
Expand Down Expand Up @@ -118,17 +126,19 @@ def reproject_workunit(self):
self.logger.debug(f"Reprojecting WorkUnit with {self.n_workers} workers...")
last_time = time.time()

directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
self.reprojected_wu_filepath
)
reprojection.reproject_lazy_work_unit(
# Use the global WCS that was specified from the ImageCollection.
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
common_wcs = WCS(ic["global_wcs"][0])

resampled_wu = reprojection.reproject_work_unit(
wu,
wu.wcs, # Use the common WCS of the WorkUnit
directory_containing_reprojected_shards,
reprojected_wu_filename,
common_wcs,
parallelize=True,
frame="ebd",
max_parallel_processes=self.n_workers,
)
directory_containing_shards, wu_filename = os.path.split(self.reprojected_wu_filepath)
resampled_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=self.overwrite)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

Expand Down
6 changes: 3 additions & 3 deletions src/kbmod_wf/workflow_tasks/reproject_wu_multi_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
Parameters
----------
inputs : tuple, optional
A tuple with a single parsl.File object that references the original WorkUnit
A tuple with a single parsl.File object that references the ImageCollection
file, by default ()
outputs : tuple, optional
A tuple with a single parsl.File object that references the reprojected
Expand All @@ -37,15 +37,15 @@ def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
"""
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.ic_to_wu", logging_file)
logger = get_configured_logger("task.reproject_wu", logging_file)

from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu


logger.info("Starting reproject_ic")
with ErrorLogger(logger):
reproject_wu(
original_wu_filepath=inputs[0].filepath,
ic_filepath=inputs[0].filepath,
guess_dist=inputs[1],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
Expand Down

0 comments on commit e2cbe7c

Please sign in to comment.