diff --git a/datalad_slurm/common.py b/datalad_slurm/common.py index 7961a54..5d82736 100644 --- a/datalad_slurm/common.py +++ b/datalad_slurm/common.py @@ -232,3 +232,41 @@ def check_finish_exists(dset, revision, rev_branch, allow_reschedule=True): return +def extract_incomplete_jobs(dset): + """ + Get the number of incomplete jobs, (re)-scheduled jobs with no finish command. + + Finds the most recent finish or (re)-scheduled job. If none is found, returns 0. + Used to make datalad finish more efficient. + """ + ds_repo = dset.repo + revrange = ds_repo.get_corresponding_branch() or ds_repo.get_active_branch() or "HEAD" + rev_lines = ds_repo.get_revisions( + revrange, fmt="%H %P", options=["--topo-order"] + ) + if not rev_lines: + return 0 + + for rev_line in rev_lines: + # The strip() below is necessary because, with the format above, a + # commit without any parent has a trailing space. (We could also use a + # custom `rev-list --parents ...` call to avoid this.) + fields = rev_line.strip().split(" ") + rev, parents = fields[0], fields[1:] + full_msg = ds_repo.format_commit("%B", rev) + # see if we get a hit on a finish command + msg, info = get_finish_info(dset, full_msg) + if msg and info: + try: + return info["incomplete_job_number"] + except KeyError: + return 0 + # see if we get a hit on a (re)schedule command + msg, info = get_schedule_info(dset, full_msg) + if msg and info: + try: + return info["incomplete_job_number"] + except KeyError: + return 0 + return 0 + diff --git a/datalad_slurm/finish.py b/datalad_slurm/finish.py index cdbf4b1..1ecc517 100644 --- a/datalad_slurm/finish.py +++ b/datalad_slurm/finish.py @@ -64,7 +64,7 @@ quote_cmdlinearg, ) -from .common import check_finish_exists, get_schedule_info +from .common import check_finish_exists, get_schedule_info, extract_incomplete_jobs from datalad.core.local.run import _create_record, get_command_pwds @@ -222,7 +222,7 @@ def get_scheduled_commits(since, dset, branch): revrange = "{}..{}".format(since, revision) rev_lines = ds_repo.get_revisions( - revrange, fmt="%H %P", options=["--reverse", "--topo-order"] + revrange, fmt="%H %P", options=["--topo-order"] ) if not rev_lines: return @@ -234,7 +234,11 @@ def get_scheduled_commits(since, dset, branch): # custom `rev-list --parents ...` call to avoid this.) fields = rev_line.strip().split(" ") rev, parents = fields[0], fields[1:] - res = get_status_dict("run", ds=dset, commit=rev, parents=parents) + # get the incomplete job number + incomplete_job_number = extract_incomplete_jobs(dset) + # only go as far back as the last commit with no unfinished jobs + if incomplete_job_number == 0: + break full_msg = ds_repo.format_commit("%B", rev) try: msg, info = get_schedule_info(dset, full_msg) @@ -248,6 +252,9 @@ def get_scheduled_commits(since, dset, branch): # Recast the error so the message includes the revision. raise ValueError("Error on {}'s message".format(rev)) from exc + # reverse the order + commit_list.reverse() + return commit_list @@ -365,6 +372,10 @@ def finish_cmd( # slurm_submission_file = f"slurm-job-submission-{slurm_job_id}" # os.remove(slurm_submission_file) + # get the number of incomplete jobs and subtract one + incomplete_job_number = extract_incomplete_jobs(ds) + run_info["incomplete_job_number"] = incomplete_job_number - 1 + # expand the wildcards # TODO do this in a better way with GlobbedPaths globbed_outputs = [] diff --git a/datalad_slurm/schedule.py b/datalad_slurm/schedule.py index 845fbe0..da2f8b9 100644 --- a/datalad_slurm/schedule.py +++ b/datalad_slurm/schedule.py @@ -85,7 +85,7 @@ _get_substitutions, ) -from .common import get_schedule_info, check_finish_exists +from .common import get_schedule_info, check_finish_exists, extract_incomplete_jobs lgr = logging.getLogger("datalad.slurm.schedule") @@ -673,6 +673,10 @@ def run_command( ) return + # extract the incomplete job number + incomplete_job_number = extract_incomplete_jobs(ds) + run_info["incomplete_job_number"] = incomplete_job_number + 1 + # now check history of outputs in un-finished slurm commands if check_outputs: output_conflict = check_output_conflict(ds, run_info["outputs"])