Skip to content

Commit

Permalink
Merge pull request #18 from dirac-institute/issue/17/port-tno-scripts
Browse files Browse the repository at this point in the history
Pulling kbmod-tno-scripts over to the corresponding tasks.
  • Loading branch information
drewoldag authored Jul 19, 2024
2 parents df39af1 + 6cd3f39 commit 0f16133
Show file tree
Hide file tree
Showing 7 changed files with 530 additions and 44 deletions.
11 changes: 10 additions & 1 deletion example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,13 @@ checkpoint_mode = 'task_exit'
[apps.create_uri_manifest]
# The path to the staging directory
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"

[apps.reproject_wu]
# The name of the observation site to use for reflex correction
observation_site = "ctio"

[apps.kbmod_search]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
search_config_filepath = "/home/drew/code/kbmod-wf/dev_staging/search_config.yaml"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ requires-python = ">=3.9"
dependencies = [
"parsl", # The primary workflow orchestration tool
"toml", # Used to read runtime configuration files
"astropy", # Used for various file, date, and location manipulations
]

[project.urls]
Expand Down
79 changes: 78 additions & 1 deletion src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from kbmod import ImageCollection
from kbmod.configuration import SearchConfiguration

import os
import glob
import time
from logging import Logger


def ic_to_wu(ic_file=None, wu_file=None, logger=None):
def placeholder_ic_to_wu(ic_file=None, wu_file=None, logger=None):
logger.info("In the ic_to_wu task_impl")
with open(ic_file, "r") as f:
for line in f:
Expand All @@ -12,3 +18,74 @@ def ic_to_wu(ic_file=None, wu_file=None, logger=None):
f.write(f"Logged: {value} - {time.time()}\n")

return wu_file


def ic_to_wu(
ic_filepath: str = None, wu_filepath: str = None, runtime_config: dict = {}, logger: Logger = None
):
"""This task will convert an ImageCollection to a WorkUnit.
Parameters
----------
ic_filepath : str, optional
The fully resolved filepath to the input ImageCollection file, by default None
wu_filepath : str, optional
The fully resolved filepath for the output WorkUnit file, by default None
runtime_config : dict, optional
Additional configuration parameters to be used at runtime, by default {}
logger : Logger, optional
Primary logger for the workflow, by default None
Returns
-------
str
The fully resolved filepath of the output WorkUnit file.
"""
ic_to_wu_converter = ICtoWUConverter(
ic_filepath=ic_filepath, wu_filepath=wu_filepath, runtime_config=runtime_config, logger=logger
)

return ic_to_wu_converter.create_work_unit()


class ICtoWUConverter:
def __init__(
self,
ic_filepath: str = None,
wu_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
self.ic_filepath = ic_filepath
self.wu_filepath = wu_filepath
self.runtime_config = runtime_config
self.logger = logger

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config = self.runtime_config.get("search_config", None)

def create_work_unit(self):
make_wu = True
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

if make_wu:
ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

last_time = time.time()
orig_wu = ic.toWorkUnit(config=SearchConfiguration.from_file(self.search_config))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

self.logger.info(f"Saving original work unit to: {self.wu_filepath}")
last_time = time.time()
orig_wu.to_fits(self.wu_filepath, overwrite=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")

return self.wu_filepath
89 changes: 88 additions & 1 deletion src/kbmod_wf/task_impls/kbmod_search.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import kbmod
from kbmod.work_unit import WorkUnit

import os
import time
from logging import Logger


def kbmod_search(input_wu=None, result_file=None, logger=None):
def placeholder_kbmod_search(input_wu=None, result_file=None, logger=None):
logger.info("In the kbmod_search task_impl")
with open(input_wu, "r") as f:
for line in f:
Expand All @@ -14,3 +19,85 @@ def kbmod_search(input_wu=None, result_file=None, logger=None):
f.write(f"Logged: {value} - {time.time()}\n")

return result_file


def kbmod_search(
wu_filepath: str = None,
result_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
"""This task will run the KBMOD search algorithm on a WorkUnit.
Parameters
----------
wu_filepath : str, optional
The fully resolved filepath to the input WorkUnit file, by default None
runtime_config : dict, optional
Additional configuration parameters to be used at runtime, by default {}
logger : Logger, optional
Primary logger for the workflow, by default None
Returns
-------
str
The fully resolved filepath of the results file.
"""
kbmod_searcher = KBMODSearcher(
wu_filepath=wu_filepath,
result_filepath=result_filepath,
runtime_config=runtime_config,
logger=logger,
)

return kbmod_searcher.run_search()


class KBMODSearcher:
def __init__(
self,
wu_filepath: str = None,
result_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
self.input_wu_filepath = wu_filepath
self.runtime_config = runtime_config
self.result_filepath = result_filepath
self.logger = logger

self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)
self.results_directory = os.path.dirname(self.result_filepath)

def run_search(self):
self.logger.info("Loading workunit from file")
wu = WorkUnit.from_fits(self.input_wu_filepath)

self.logger.debug("Loaded work unit")
if self.search_config_filepath is not None:
# Load a search configuration, otherwise use the one loaded with the work unit
wu.config = kbmod.configuration.SearchConfiguration.from_file(self.search_config_filepath)

config = wu.config

# Modify the work unit results to be what is specified in command line args
input_parameters = {
"res_filepath": self.results_directory,
"result_filename": self.result_filepath,
}
config.set_multiple(input_parameters)

# Save the search config in the results directory for record keeping
config.to_file(os.path.join(self.results_directory, "search_config.yaml"))
wu.config = config

self.logger.info("Running KBMOD search")
res = kbmod.run_search.SearchRunner().run_search_from_work_unit(wu)

self.logger.info("Search complete")
self.logger.info(f"Number of results found: {len(res)}")

self.logger.info(f"Writing results to output file: {self.result_filepath}")
res.write_table(self.result_filepath)

return self.result_filepath
Loading

0 comments on commit 0f16133

Please sign in to comment.