Skip to content

Commit

Permalink
Merge pull request #8 from timcallow/fix_finish
Browse files Browse the repository at this point in the history
Improve error reporting when using `finish` and `reschedule` commands
  • Loading branch information
timcallow authored Dec 17, 2024
2 parents 1bfaf7b + 2e10b4b commit 7e7e1ee
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
4 changes: 3 additions & 1 deletion datalad_slurm/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ def get_slurm_job_id(dset, revision, allow_reschedule=True):
full_msg = ds_repo.format_commit("%B", rev)
try:
msg, info = get_schedule_info(dset, full_msg, allow_reschedule=allow_reschedule)
if msg is None or info is None:
return
except ValueError as exc:
# Recast the error so the message includes the revision.
raise ValueError("Error on {}'s message".format(rev)) from exc
Expand All @@ -198,7 +200,7 @@ def check_finish_exists(dset, revision, rev_branch, allow_reschedule=True):
slurm_job_id = get_slurm_job_id(dset, revision, allow_reschedule=allow_reschedule)

if not slurm_job_id:
return
return 0 # return a special exit code to distinguish errors

# now check the finish exists
revrange = "{}..{}".format(revision, rev_branch)
Expand Down
28 changes: 8 additions & 20 deletions datalad_slurm/finish.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,9 @@ def finish_cmd(
# for now, we just assume this to be run on a single commit
revrange = "{rev}^..{rev}".format(rev=commit)

try:
results = _revrange_as_results(ds, revrange)
except ValueError as exc:
ce = CapturedException(exc)
yield get_status_dict("run", status="error", message=str(ce), exception=ce)
results = _revrange_as_results(ds, revrange)
if not results:
yield get_status_dict("finish", status="error", message="The commit message {} is not a DATALAD SCHEDULE commit".format(commit[:7]))
return

run_message = results["run_message"]
Expand All @@ -333,8 +331,8 @@ def finish_cmd(

job_status = get_job_status(slurm_job_id)
if job_status != "COMPLETED":
message = f"Slurm job is not complete. Status is {job_status}."
yield get_status_dict("run", status="error", message=message)
message = f"Slurm job for commit {commit[:7]} is not complete. Status is {job_status}."
yield get_status_dict("finish", status="error", message=message)
return

# delete the slurm_job_id file
Expand Down Expand Up @@ -411,19 +409,9 @@ def _revrange_as_results(dset, revrange):
rev, parents = fields[0], fields[1:]
res = get_status_dict("finish", ds=dset, commit=rev, parents=parents)
full_msg = ds_repo.format_commit("%B", rev)
try:
msg, info = get_schedule_info(dset, full_msg)
if msg is None or info is None:
yield get_status_dict(
"finish",
ds=dset,
status="error",
commit=rev,
message="Scheduled job cannot be processed as commit message has the wrong format",
)
except ValueError as exc:
# Recast the error so the message includes the revision.
raise ValueError("Error on {}'s message".format(rev)) from exc
msg, info = get_schedule_info(dset, full_msg)
if msg is None or info is None:
return
res["run_info"] = info
res["run_message"] = msg

Expand Down
21 changes: 15 additions & 6 deletions datalad_slurm/reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def __call__(
script=None,
report=False,
assume_ready=None,
jobs=None
jobs=None,
):

ds = require_dataset(
Expand Down Expand Up @@ -314,13 +314,22 @@ def __call__(

# get the revrange to check for datalad finish corresponding command
# don't allow reschedule because we only check for the original job
job_finished = check_finish_exists(ds, revision, rev_branch, allow_reschedule=False)
job_finished = check_finish_exists(
ds, revision, rev_branch, allow_reschedule=False
)
if not job_finished:
if job_finished == 0:
err_msg = (
f"Commit {revision[:7]} is not a scheduled job. \n"
"N.B., already re-scheduled jobs cannot be re-re-scheduled."
)
else:
err_msg = f"No finish found for schedule commit {revision}"
yield get_status_dict(
"run",
ds=ds,
status="error",
message="No finish found for schedule command".format(branch),
message=err_msg,
)
return
results = _rerun_as_results(ds, revrange, since, branch, onto, message)
Expand All @@ -336,6 +345,7 @@ def __call__(
for res in handler(ds, results):
yield res


def _revrange_as_results(dset, revrange):
ds_repo = dset.repo
rev_lines = ds_repo.get_revisions(
Expand Down Expand Up @@ -581,7 +591,7 @@ def _rerun(dset, results, assume_ready=None, explicit=True, jobs=None):
# run records outputs relative to the "pwd" field.
if op.relpath(p, outputs_dir) not in outputs
]

# remove the slurm outputs from the previous run from the outputs
old_slurm_outputs = run_info.get("slurm_run_outputs", [])
outputs = [output for output in outputs if output not in old_slurm_outputs]
Expand Down Expand Up @@ -686,7 +696,7 @@ def fn(dset, results):
cmd,
**dict(
run_info, dspath=dset.path, pwd=op.join(dset.path, run_info["pwd"])
)
),
)

msg = res["run_message"]
Expand Down Expand Up @@ -719,7 +729,6 @@ def fn(dset, results):
return fn



def diff_revision(dataset, revision="HEAD"):
"""Yield files that have been added or modified in `revision`.
Expand Down

0 comments on commit 7e7e1ee

Please sign in to comment.