From b6045749331d77b27a1f6d8a9e47ca00c7b9a475 Mon Sep 17 00:00:00 2001 From: Igor Gitman Date: Thu, 12 Dec 2024 14:03:37 -0800 Subject: [PATCH] Improve random seed handling in generate (#292) Signed-off-by: Igor Gitman --- nemo_skills/pipeline/generate.py | 17 ++++++++++++++--- nemo_skills/pipeline/run_cmd.py | 5 ++++- nemo_skills/pipeline/utils.py | 9 --------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/nemo_skills/pipeline/generate.py b/nemo_skills/pipeline/generate.py index 177190b44..fee472bd9 100644 --- a/nemo_skills/pipeline/generate.py +++ b/nemo_skills/pipeline/generate.py @@ -66,10 +66,14 @@ def get_rm_cmd(output_dir, extra_arguments, random_seed=None, eval_args=None): return cmd -def wrap_cmd(cmd, preprocess_cmd, postprocess_cmd): +def wrap_cmd(cmd, preprocess_cmd, postprocess_cmd, random_seed=None): if preprocess_cmd: + if random_seed is not None: + preprocess_cmd = preprocess_cmd.format(random_seed=random_seed) cmd = f" {preprocess_cmd} && {cmd} " if postprocess_cmd: + if random_seed is not None: + postprocess_cmd = postprocess_cmd.format(random_seed=random_seed) cmd = f" {cmd} && {postprocess_cmd} " return cmd @@ -114,6 +118,7 @@ def generate( num_random_seeds: int = typer.Option( None, help="Specify if want to run many generations with high temperature for the same input" ), + random_seeds: List[int] = typer.Option(None, help="List of random seeds to use for generation"), starting_seed: int = typer.Option(0, help="Starting seed for random sampling"), preprocess_cmd: str = typer.Option(None, help="Command to run before generation"), postprocess_cmd: str = typer.Option(None, help="Command to run after generation"), @@ -143,6 +148,11 @@ def generate( except AttributeError: pass + if random_seeds and num_random_seeds: + raise ValueError("Cannot specify both random_seeds and num_random_seeds") + if num_random_seeds: + random_seeds = list(range(starting_seed, starting_seed + num_random_seeds)) + cluster_config = get_cluster_config(cluster, config_dir) check_if_mounted(cluster_config, output_dir) if log_dir: @@ -179,8 +189,8 @@ def generate( get_cmd = client_command_factories[generation_type] with run.Experiment(expname) as exp: - if num_random_seeds: - for seed in range(starting_seed, starting_seed + num_random_seeds): + if random_seeds: + for seed in random_seeds: cmd = get_cmd( random_seed=seed, output_dir=output_dir, @@ -195,6 +205,7 @@ def generate( get_generation_command(server_address=server_address, generation_commands=cmd), preprocess_cmd, postprocess_cmd, + random_seed=seed, ), task_name=f'{expname}-rs{seed}', log_dir=log_dir, diff --git a/nemo_skills/pipeline/run_cmd.py b/nemo_skills/pipeline/run_cmd.py index 9510ef8a1..a337c4b72 100644 --- a/nemo_skills/pipeline/run_cmd.py +++ b/nemo_skills/pipeline/run_cmd.py @@ -20,6 +20,7 @@ from nemo_skills.pipeline import add_task, check_if_mounted, get_cluster_config, run_exp from nemo_skills.pipeline.app import app, typer_unpacker +from nemo_skills.pipeline.generate import wrap_cmd from nemo_skills.utils import setup_logging LOG = logging.getLogger(__file__) @@ -50,6 +51,8 @@ def run_cmd( run_after: List[str] = typer.Option( None, help="Can specify a list of expnames that need to be completed before this one starts" ), + preprocess_cmd: str = typer.Option(None, help="Command to run before job"), + postprocess_cmd: str = typer.Option(None, help="Command to run after job"), config_dir: str = typer.Option(None, help="Can customize where we search for cluster configs"), log_dir: str = typer.Option( None, @@ -69,7 +72,7 @@ def run_cmd( with run.Experiment(expname) as exp: add_task( exp, - cmd=get_cmd(extra_arguments=extra_arguments), + cmd=wrap_cmd(get_cmd(extra_arguments=extra_arguments), preprocess_cmd, postprocess_cmd), task_name=expname, log_dir=log_dir, container=cluster_config["containers"][container], diff --git a/nemo_skills/pipeline/utils.py b/nemo_skills/pipeline/utils.py index a0de17d30..c5f4529d9 100644 --- a/nemo_skills/pipeline/utils.py +++ b/nemo_skills/pipeline/utils.py @@ -52,15 +52,6 @@ def get_unmounted_path(cluster_config, path): raise ValueError(f"The path '{path}' is not mounted. Check cluster config.") -def _get_latest_dir(path, expname, job_id) -> str: - if job_id is not None: - return os.path.join(path, f"{expname}_{job_id}") - - dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))] - latest_dir = max(dirs, key=lambda d: os.path.getctime(os.path.join(path, d))) - return os.path.join(path, latest_dir) - - def get_exp_handles(expname: str, ignore_finished=True, ignore_exp_not_exists=True) -> list[str]: """Will return the handles of the tasks in the experiment.