Skip to content

Commit

Permalink
Merge pull request #21 from dirac-institute/awo/updates-while-running…
Browse files Browse the repository at this point in the history
…-on-klone

Updates to get things working on Klone
  • Loading branch information
drewoldag authored Jul 30, 2024
2 parents b08ddff + b7ae010 commit 10cedb3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 40 deletions.
7 changes: 7 additions & 0 deletions example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ checkpoint_mode = 'task_exit'
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"

[apps.ic_to_wu]
# The path to the KBMOD search config file
# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml"
search_config_filepath = "/home/drew/code/kbmod-wf/dev_staging/search_config.yaml"

[apps.reproject_wu]
# Number of processors to use for parallelizing the reprojection
n_workers = 32
# The name of the observation site to use for reflex correction
observation_site = "ctio"

Expand Down
28 changes: 16 additions & 12 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

walltimes = {
"compute_bigmem": "01:00:00",
"large_mem": "08:00:00",
"gpu_max": "04:00:00",
"large_mem": "04:00:00",
"gpu_max": "08:00:00",
}


Expand All @@ -20,19 +20,21 @@ def klone_resource_config():
os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat())
),
run_dir=os.path.join("/gscratch/dirac/kbmod/workflow/run_logs", datetime.date.today().isoformat()),
retries=1,
executors=[
HighThroughputExecutor(
label="small_cpu",
max_workers_per_node=1,
provider=SlurmProvider(
partition="compute-bigmem",
account="astro",
min_blocks=0,
max_blocks=4,
init_blocks=1,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=1, # perhaps should be 8???
mem_per_node=64, # In GB
mem_per_node=256, # In GB
exclusive=False,
walltime=walltimes["compute_bigmem"],
# Command to run before starting worker - i.e. conda activate <special_env>
Expand All @@ -41,12 +43,13 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="large_mem",
max_workers_per_node=1,
provider=SlurmProvider(
partition="compute-bigmem",
partition="ckpt-g2",
account="astro",
min_blocks=0,
max_blocks=2,
init_blocks=1,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=8,
Expand All @@ -59,27 +62,28 @@ def klone_resource_config():
),
HighThroughputExecutor(
label="gpu",
available_accelerators=2,
max_workers_per_node=1,
provider=SlurmProvider(
partition="gpu_a40",
partition="ckpt-g2",
account="escience",
min_blocks=0,
max_blocks=2,
init_blocks=1,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=4, # perhaps should be 8???
mem_per_node=128, # In GB
cores_per_node=2, # perhaps should be 8???
mem_per_node=512, # In GB
exclusive=False,
walltime=walltimes["gpu_max"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
scheduler_options="#SBATCH --gpus=1",
),
),
HighThroughputExecutor(
label="local_thread",
provider=LocalProvider(
init_blocks=1,
init_blocks=0,
max_blocks=1,
),
),
Expand Down
4 changes: 2 additions & 2 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
self.logger = logger

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config = self.runtime_config.get("search_config", None)
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
make_wu = True
Expand All @@ -78,7 +78,7 @@ def create_work_unit(self):
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

last_time = time.time()
orig_wu = ic.toWorkUnit(config=SearchConfiguration.from_file(self.search_config))
orig_wu = ic.toWorkUnit(config=SearchConfiguration.from_file(self.search_config_filepath))
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

Expand Down
41 changes: 16 additions & 25 deletions src/kbmod_wf/task_impls/reproject_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,24 @@ def __init__(
self.search_config = self.runtime_config.get("search_config", None)

# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = np.max(1, np.min(self.runtime_config.get("n_workers", 8), 64))
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

self.uri_params = self._get_params_from_uri_file(uri_file=self.uri_file)
self.uri_params = self._get_params_from_uri_file()
self.patch_size = self.uri_params["patch_size"]
self.pixel_scale = self.uri_params["pixel_scale"]
self.guess_dist = self.uri_params["dist_au"] # ! Let's update the terminology here to be consistent.
self.patch_corners = self.uri_params[
"patch_box"
] # ! Let's update the terminology here to be consistent.

# handle image dimensions
if self.image_width == None or self.image_height == None:
if "patch_size" not in self.uri_params:
raise KeyError(
f"Must supply image dimensions (image_width, image_height) or #patch_size= must be in a specified URI file."
)
if self.pixel_scale == None:
raise KeyError(
f"When patch pixel dimensions are not specifified, the user must supply a pixel scale via the command line or the uri file."
)
if "patch_size" not in self.uri_params:
raise KeyError(
f"Must supply image dimensions (image_width, image_height) or #patch_size= must be in a specified URI file."
)
if self.pixel_scale == None:
raise KeyError(
f"When patch pixel dimensions are not specifified, the user must supply a pixel scale via the command line or the uri file."
)

self.image_width, self.image_height = self._patch_arcmin_to_pixels(
patch_size_arcmin=self.patch_size,
Expand All @@ -123,6 +121,7 @@ def reproject_workunit(self):
pixel_scale=self.pixel_scale,
)

last_time = time.time()
self.logger.info(f"Reading existing WorkUnit from disk: {self.original_wu_filepath}")
orig_wu = WorkUnit.from_fits(self.original_wu_filepath)
elapsed = round(time.time() - last_time, 1)
Expand Down Expand Up @@ -217,28 +216,20 @@ def _get_params_from_uri_file(self):
results[lhs] = rhs
return results

def _patch_arcmin_to_pixels(self, patch_size_arcmin, pixel_scale_arcsec_per_pix):
"""Take an array of two dimensions, in arcminutes, and convert this to
pixels using the supplied pixel scale (in arcseconds per pixel).
Parameters
----------
patch_size_arcmin : nd.array
A 2d array with shape (2,1) containing the width and height of the
patch in arcminutes.
pixel_scale_arcsec_per_pix : float
The pixel scale in arcseconds per pixel.
def _patch_arcmin_to_pixels(self):
"""Operate on the self.patch_size array (with size (2,1)) to convert to
pixels. Uses self.pixel_scale to do the conversion.
Returns
-------
nd.array
A 2d array with shape (2,1) containing the width and height of the
patch in pixels.
"""
patch_pixels = int(np.ceil(self.patch_size * 60 / self.pixel_scale))
patch_pixels = np.ceil(np.array(self.patch_size) * 60 / self.pixel_scale).astype(int)

self.logger.debug(
f"Derived patch_pixels (w, h) = {patch_pixels} from patch_size_arcmin={patch_size_arcmin} and pixel_scale_arcsec_per_pix={pixel_scale_arcsec_per_pix}."
f"Derived patch_pixels (w, h) = {patch_pixels} from patch_size={self.patch_size}[arcmin] and pixel_scale={self.pixel_scale}[arcsec/pixel]."
)

return patch_pixels
Expand Down
2 changes: 1 addition & 1 deletion src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None):


@python_app(
cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"]
cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"]
)
def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
Expand Down

0 comments on commit 10cedb3

Please sign in to comment.