Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental wf #43

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def klone_resource_config():
app_cache=True,
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(
os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat())
os.path.join(os.path.abspath(os.curdir), datetime.date.today().isoformat())
),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
run_dir=os.path.join(os.path.abspath(os.curdir), datetime.date.today().isoformat()),
retries=1,
executors=[
HighThroughputExecutor(
label="small_cpu",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
partition="ckpt-all",
account="astro",
min_blocks=0,
max_blocks=4,
Expand All @@ -46,15 +46,15 @@ def klone_resource_config():
label="large_mem",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
partition="ckpt-all",
account="astro",
min_blocks=0,
max_blocks=2,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=32,
mem_per_node=512,
mem_per_node=256,
exclusive=False,
walltime=walltimes["large_mem"],
# Command to run before starting worker - i.e. conda activate <special_env>
Expand All @@ -63,17 +63,17 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="sharded_reproject",
max_workers=1,
max_workers=1, # Do we mean max_workers_per_node here?
provider=SlurmProvider(
partition="ckpt-g2",
partition="ckpt-all",
account="astro",
min_blocks=0,
max_blocks=2,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=32,
mem_per_node=128, # ~2-4 GB per core
cores_per_node=8,
mem_per_node=100,
exclusive=False,
walltime=walltimes["sharded_reproject"],
# Command to run before starting worker - i.e. conda activate <special_env>
Expand All @@ -84,15 +84,15 @@ def klone_resource_config():
label="gpu",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
partition="gpu-a40",
account="escience",
min_blocks=0,
max_blocks=2,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=2, # perhaps should be 8???
mem_per_node=512, # In GB
cores_per_node=1, # perhaps should be 8???
mem_per_node=64, # In GB
exclusive=False,
walltime=walltimes["gpu_max"],
# Command to run before starting worker - i.e. conda activate <special_env>
Expand Down
197 changes: 197 additions & 0 deletions src/kbmod_wf/single_chip_workflow2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import argparse
import os
import glob

import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def step1(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger
logger = get_configured_logger("kbmod.ic_to_wu")

import numpy as np
from reproject.mosaicking import find_optimal_celestial_wcs

from kbmod import ImageCollection
from kbmod.configuration import SearchConfiguration
import kbmod.reprojection as reprojection

from lsst.daf.butler import Butler

logger.info(f"Running with config {runtime_config}")

with ErrorLogger(logger):
# Unravell inputs
repo_root = runtime_config["butler_config_filepath"]
search_conf_path = runtime_config.get("search_config_filepath", None)
ic_file = inputs[0].filepath

# Run core tasks
logger.info("Reading ImageCollection and adjusting search limits.")
ic = ImageCollection.read(ic_file)
ic.data.sort("mjd_mid")
butler = Butler(repo_root)
search_conf = SearchConfiguration.from_file(search_conf_path)

# fixup config to match specifics of the image collection in question
search_conf._params["n_obs"] = len(ic)/2
logger.info(f"Setting search config n_obs to {search_conf._params['n_obs']}")

# Fit the optimal WCS
# TODO: triple check this doesn't flip the array, I'm pretty sure it does
opt_wcs, shape = find_optimal_celestial_wcs(list(ic.wcs))
opt_wcs.array_shape = shape

wu = ic.toWorkUnit(search_config=search_conf, butler=butler, overwrite=True)
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Created a WorkUnit")

# we've got everything we wanted out of IC, clean it up.
del ic

# Resample the work unit so all pixels point to the same (ra, dec)
logger.info(f"Writing resampled wu to {outputs[0]}")
resampled_wu = reprojection.reproject_work_unit(
wu,
opt_wcs,
max_parallel_processes=runtime_config.get("n_workers", 8),
)
resampled_wu.to_fits(outputs[0])

return outputs


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "gpu"]),
ignore_for_cache=["logging_file"],
)
def step2(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger
logger = get_configured_logger("kbmod.search_task", logging_file.filepath)

import json

from kbmod.work_unit import WorkUnit
from kbmod.run_search import SearchRunner

with ErrorLogger(logger):
wu = WorkUnit.from_fits(inputs[0].filename)
res = SearchRunner().run_search_from_work_unit(wu)
header = wu.wcs.to_header(relax=True)
h, w = wu.wcs.pixel_shape
header["NAXIS1"], header["NAXIS2"] = h, w
res.table.meta["wcs"] = json.dumps(dict(header))
res.write_table(outputs[0].filename)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can build the output filename from the input: inputs[0].filename + ".results.ecsv".


return outputs


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.

Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)

resource_config = apply_runtime_updates(resource_config, runtime_config)


app_configs = runtime_config.get("apps", {})


dfk = parsl.load(resource_config)

if dfk:
logging_file = File(os.path.join(dfk.run_dir, "kbmod.log"))
logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

directory_path = runtime_config.get("staging_directory", ".")
file_pattern = runtime_config.get("file_pattern", "*.collection")
pattern = os.path.join(directory_path, file_pattern)
entries = glob.glob(pattern)
logger.info(f"Found {len(entries)} files in {directory_path}")

step1_futures = []
for collection in entries:
collection_file = File(collection)
collname = os.path.basename(collection)
collname = collname.split(".")[0]
step1_futures.append(
step1(
inputs=[collection_file],
outputs=[File(f"{collname}_resampled.wu")],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for resampled_future in step1_futures:
search_futures.append(
step2(
drewoldag marked this conversation as resolved.
Show resolved Hide resolved
inputs=resampled_future.result(),
outputs=[File(resampled_future.result()[0].filepath + ".results.ecsv")],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that 161 and 162 should change to:

                    inputs=resampled_future,
                    outputs=[],

runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]
logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
14 changes: 7 additions & 7 deletions src/kbmod_wf/utilities/logger_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@
},
"handlers": {
"stdout": {
"level": "INFO",
"level": "DEBUG",
"formatter": "standard",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"stderr": {
"level": "INFO",
"level": "DEBUG",
"formatter": "standard",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
"file": {
"level": "INFO",
"level": "DEBUG",
"formatter": "standard",
"class": "logging.FileHandler",
"filename": "parsl.log",
"filename": "kbmod.log",
},
},
"loggers": {
"task": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False},
"task": {"level": "DEBUG", "handlers": ["file", "stdout"], "propagate": True},
"task.create_manifest": {},
"task.ic_to_wu": {},
"task.reproject_wu": {},
"task.kbmod_search": {},
"kbmod": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False},
"task.kbmod_search": {"level": "DEBUG", "handlers": ["file", "stdout"], "propagate": True},
"kbmod": {"level": "DEBUG", "handlers": ["file", "stdout"], "propagate": False},
},
}
"""Default logging configuration for Parsl."""
Expand Down
Loading