Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add incomplete flag #16

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions datalad_slurm/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,41 @@ def check_finish_exists(dset, revision, rev_branch, allow_reschedule=True):

return

def extract_incomplete_jobs(dset):
"""
Get the number of incomplete jobs, (re)-scheduled jobs with no finish command.

Finds the most recent finish or (re)-scheduled job. If none is found, returns 0.
Used to make datalad finish more efficient.
"""
ds_repo = dset.repo
revrange = ds_repo.get_corresponding_branch() or ds_repo.get_active_branch() or "HEAD"
rev_lines = ds_repo.get_revisions(
revrange, fmt="%H %P", options=["--topo-order"]
)
if not rev_lines:
return 0

for rev_line in rev_lines:
# The strip() below is necessary because, with the format above, a
# commit without any parent has a trailing space. (We could also use a
# custom `rev-list --parents ...` call to avoid this.)
fields = rev_line.strip().split(" ")
rev, parents = fields[0], fields[1:]
full_msg = ds_repo.format_commit("%B", rev)
# see if we get a hit on a finish command
msg, info = get_finish_info(dset, full_msg)
if msg and info:
try:
return info["incomplete_job_number"]
except KeyError:
return 0
# see if we get a hit on a (re)schedule command
msg, info = get_schedule_info(dset, full_msg)
if msg and info:
try:
return info["incomplete_job_number"]
except KeyError:
return 0
return 0

17 changes: 14 additions & 3 deletions datalad_slurm/finish.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
quote_cmdlinearg,
)

from .common import check_finish_exists, get_schedule_info
from .common import check_finish_exists, get_schedule_info, extract_incomplete_jobs

from datalad.core.local.run import _create_record, get_command_pwds

Expand Down Expand Up @@ -222,7 +222,7 @@ def get_scheduled_commits(since, dset, branch):
revrange = "{}..{}".format(since, revision)

rev_lines = ds_repo.get_revisions(
revrange, fmt="%H %P", options=["--reverse", "--topo-order"]
revrange, fmt="%H %P", options=["--topo-order"]
)
if not rev_lines:
return
Expand All @@ -234,7 +234,11 @@ def get_scheduled_commits(since, dset, branch):
# custom `rev-list --parents ...` call to avoid this.)
fields = rev_line.strip().split(" ")
rev, parents = fields[0], fields[1:]
res = get_status_dict("run", ds=dset, commit=rev, parents=parents)
# get the incomplete job number
incomplete_job_number = extract_incomplete_jobs(dset)
# only go as far back as the last commit with no unfinished jobs
if incomplete_job_number == 0:
break
full_msg = ds_repo.format_commit("%B", rev)
try:
msg, info = get_schedule_info(dset, full_msg)
Expand All @@ -248,6 +252,9 @@ def get_scheduled_commits(since, dset, branch):
# Recast the error so the message includes the revision.
raise ValueError("Error on {}'s message".format(rev)) from exc

# reverse the order
commit_list.reverse()

return commit_list


Expand Down Expand Up @@ -365,6 +372,10 @@ def finish_cmd(
# slurm_submission_file = f"slurm-job-submission-{slurm_job_id}"
# os.remove(slurm_submission_file)

# get the number of incomplete jobs and subtract one
incomplete_job_number = extract_incomplete_jobs(ds)
run_info["incomplete_job_number"] = incomplete_job_number - 1

# expand the wildcards
# TODO do this in a better way with GlobbedPaths
globbed_outputs = []
Expand Down
6 changes: 5 additions & 1 deletion datalad_slurm/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
_get_substitutions,
)

from .common import get_schedule_info, check_finish_exists
from .common import get_schedule_info, check_finish_exists, extract_incomplete_jobs

lgr = logging.getLogger("datalad.slurm.schedule")

Expand Down Expand Up @@ -673,6 +673,10 @@ def run_command(
)
return

# extract the incomplete job number
incomplete_job_number = extract_incomplete_jobs(ds)
run_info["incomplete_job_number"] = incomplete_job_number + 1

# now check history of outputs in un-finished slurm commands
if check_outputs:
output_conflict = check_output_conflict(ds, run_info["outputs"])
Expand Down
Loading