From 17f88d43e9fca31f15594d5708d942ee4e988e99 Mon Sep 17 00:00:00 2001 From: Tim Callow Date: Tue, 17 Dec 2024 14:52:32 +0100 Subject: [PATCH 1/2] Reset schedule to include check_outputs functionality --- datalad_slurm/schedule.py | 456 ++++++++++++++++++++++++-------------- 1 file changed, 290 insertions(+), 166 deletions(-) diff --git a/datalad_slurm/schedule.py b/datalad_slurm/schedule.py index b868c3e..3640970 100644 --- a/datalad_slurm/schedule.py +++ b/datalad_slurm/schedule.py @@ -8,7 +8,7 @@ # ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## """Schedule a slurm command""" -__docformat__ = 'restructuredtext' +__docformat__ = "restructuredtext" import json @@ -84,7 +84,9 @@ _get_substitutions, ) -lgr = logging.getLogger('datalad.slurm.schedule') +from .common import get_schedule_info, check_finish_exists + +lgr = logging.getLogger("datalad.slurm.schedule") assume_ready_opt = Parameter( args=("--assume-ready",), @@ -92,7 +94,8 @@ doc="""Assume that inputs do not need to be retrieved and/or outputs do not need to unlocked or removed before running the command. This option allows you to avoid the expense of these preparation steps if you know that they - are unnecessary.""") + are unnecessary.""", +) @build_doc @@ -165,57 +168,68 @@ class Schedule(Interface): % datalad run "echo my name is {name} >me" """ + result_renderer = "tailored" # make run stop immediately on non-success results. # this prevents command execution after failure to obtain inputs of prepare # outputs. but it can be overriding via the common 'on_failure' parameter # if needed. - on_failure = 'stop' + on_failure = "stop" _params_ = dict( cmd=Parameter( args=("cmd",), nargs=REMAINDER, - metavar='COMMAND', + metavar="COMMAND", doc="""command for execution. A leading '--' can be used to disambiguate this command from the preceding options to - DataLad."""), + DataLad.""", + ), dataset=Parameter( args=("-d", "--dataset"), doc="""specify the dataset to record the command results in. An attempt is made to identify the dataset based on the current working directory. If a dataset is given, the command will be executed in the root directory of this dataset.""", - constraints=EnsureDataset() | EnsureNone()), + constraints=EnsureDataset() | EnsureNone(), + ), inputs=Parameter( args=("-i", "--input"), dest="inputs", metavar=("PATH"), - action='append', + action="append", doc="""A dependency for the run. Before running the command, the content for this relative path will be retrieved. A value of "." means "run :command:`datalad get .`". The value can also be a glob. [CMD: This - option can be given more than once. CMD]"""), + option can be given more than once. CMD]""", + ), outputs=Parameter( args=("-o", "--output"), dest="outputs", metavar=("PATH"), - action='append', + action="append", doc="""Prepare this relative path to be an output file of the command. A value of "." means "run :command:`datalad unlock .`" (and will fail if some content isn't present). For any other value, if the content of this file is present, unlock the file. Otherwise, remove it. The value can also be a glob. [CMD: This option can be given more than - once. CMD]"""), + once. CMD]""", + ), expand=Parameter( args=("--expand",), doc="""Expand globs when storing inputs and/or outputs in the commit message.""", - constraints=EnsureChoice(None, "inputs", "outputs", "both")), + constraints=EnsureChoice(None, "inputs", "outputs", "both"), + ), assume_ready=assume_ready_opt, message=save_message_opt, + check_outputs=Parameter( + args=("--check-outputs",), + doc="""Check previous scheduled commits to see if there is any overlap in the outputs.""", + constraints=EnsureNone() | EnsureBool(), + ), sidecar=Parameter( - args=('--sidecar',), + args=("--sidecar",), metavar="{yes|no}", doc="""By default, the configuration variable 'datalad.run.record-sidecar' determines whether a record with @@ -225,7 +239,8 @@ class Schedule(Interface): case-by-case basis. Sidecar files are placed into the dataset's '.datalad/runinfo' directory (customizable via the 'datalad.run.record-directory' configuration variable).""", - constraints=EnsureNone() | EnsureBool()), + constraints=EnsureNone() | EnsureBool(), + ), dry_run=Parameter( # Leave out common -n short flag to avoid confusion with # `containers-run [-n|--container-name]`. @@ -237,35 +252,45 @@ class Schedule(Interface): command only. Note that input and output globs underneath an uninstalled dataset will be left unexpanded because no subdatasets will be installed for a dry run.""", - constraints=EnsureChoice(None, "basic", "command")), - jobs=jobs_opt + constraints=EnsureChoice(None, "basic", "command"), + ), + jobs=jobs_opt, ) - _params_['jobs']._doc += """\ + _params_[ + "jobs" + ]._doc += """\ NOTE: This option can only parallelize input retrieval (get) and output recording (save). DataLad does NOT parallelize your scripts for you. """ @staticmethod - @datasetmethod(name='run') + @datasetmethod(name="run") @eval_results def __call__( - cmd=None, - *, - dataset=None, - inputs=None, - outputs=None, - expand=None, - assume_ready=None, - message=None, - dry_run=None, - jobs=None): - for r in run_command(cmd, dataset=dataset, - inputs=inputs, outputs=outputs, - expand=expand, - assume_ready=assume_ready, - message=message, - dry_run=dry_run, - jobs=jobs): + cmd=None, + *, + dataset=None, + inputs=None, + outputs=None, + expand=None, + assume_ready=None, + message=None, + check_outputs=True, + dry_run=None, + jobs=None, + ): + for r in run_command( + cmd, + dataset=dataset, + inputs=inputs, + outputs=outputs, + expand=expand, + assume_ready=assume_ready, + message=message, + check_outputs=check_outputs, + dry_run=dry_run, + jobs=jobs, + ): yield r @staticmethod @@ -279,14 +304,19 @@ def custom_result_renderer(res, **kwargs): else: raise ValueError(f"Unknown dry-run mode: {dry_run!r}") else: - if kwargs.get("on_failure") == "stop" and \ - res.get("action") == "run" and res.get("status") == "error": + if ( + kwargs.get("on_failure") == "stop" + and res.get("action") == "run" + and res.get("status") == "error" + ): msg_path = res.get("msg_path") if msg_path: ds_path = res["path"] - if datalad.get_apimode() == 'python': - help = f"\"Dataset('{ds_path}').save(path='.', " \ - "recursive=True, message_file='%s')\"" + if datalad.get_apimode() == "python": + help = ( + f"\"Dataset('{ds_path}').save(path='.', " + "recursive=True, message_file='%s')\"" + ) else: help = "'datalad save -d . -r -F %s'" lgr.info( @@ -295,19 +325,21 @@ def custom_result_renderer(res, **kwargs): f"{help}", # shorten to the relative path for a more concise # message - Path(msg_path).relative_to(ds_path)) + Path(msg_path).relative_to(ds_path), + ) generic_result_renderer(res) + def _execute_slurm_command(command, pwd): """Execute a Slurm submission command and create a job tracking file. - + Parameters ---------- command : str Command to execute (typically an sbatch command) pwd : str Working directory for command execution - + Returns ------- tuple @@ -315,21 +347,17 @@ def _execute_slurm_command(command, pwd): exit_code is 0 on success, exception is None on success """ from datalad.cmd import WitlessRunner - + exc = None cmd_exitcode = None - + try: lgr.info("== Slurm submission start (output follows) =====") # Run the command and capture output result = subprocess.run( - command, - shell=True, - capture_output=True, - text=True, - cwd=pwd + command, shell=True, capture_output=True, text=True, cwd=pwd ) - + # Extract job ID from Slurm output # Typical output: "Submitted batch job 123456" stdout = result.stdout @@ -354,15 +382,16 @@ def _execute_slurm_command(command, pwd): job_id = match.group(1) else: lgr.warning("Could not extract job ID from Slurm output") - + except subprocess.SubprocessError as e: exc = e - cmd_exitcode = e.returncode if hasattr(e, 'returncode') else 1 + cmd_exitcode = e.returncode if hasattr(e, "returncode") else 1 lgr.error(f"Command failed with exit code {cmd_exitcode}") - + lgr.info("== Slurm submission complete =====") return cmd_exitcode or 0, exc, job_id + def _create_record(run_info, sidecar_flag, ds): """ Returns @@ -375,8 +404,7 @@ def _create_record(run_info, sidecar_flag, ds): """ record = json.dumps(run_info, indent=1, sort_keys=True, ensure_ascii=False) if sidecar_flag is None: - use_sidecar = ds.config.get( - 'datalad.run.record-sidecar', default=False) + use_sidecar = ds.config.get("datalad.run.record-sidecar", default=False) use_sidecar = anything2bool(use_sidecar) else: use_sidecar = sidecar_flag @@ -386,10 +414,11 @@ def _create_record(run_info, sidecar_flag, ds): if use_sidecar: # record ID is hash of record itself from hashlib import md5 - record_id = md5(record.encode('utf-8')).hexdigest() # nosec + + record_id = md5(record.encode("utf-8")).hexdigest() # nosec record_dir = ds.config.get( - 'datalad.run.record-directory', - default=op.join('.datalad', 'runinfo')) + "datalad.run.record-directory", default=op.join(".datalad", "runinfo") + ) record_path = ds.pathobj / record_dir / record_id if not op.lexists(record_path): # go for compression, even for minimal records not much difference, @@ -399,19 +428,29 @@ def _create_record(run_info, sidecar_flag, ds): return record_id or record, record_path -def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, - assume_ready=None, message=None, sidecar=None, - dry_run=False, jobs=None, - explicit=True, - extra_info=None, - rerun_info=None, - extra_inputs=None, - rerun_outputs=None, - inject=False, - parametric_record=False, - remove_outputs=False, - skip_dirtycheck=False, - yield_expanded=None,): +def run_command( + cmd, + dataset=None, + inputs=None, + outputs=None, + expand=None, + assume_ready=None, + message=None, + check_outputs=True, + sidecar=None, + dry_run=False, + jobs=None, + explicit=True, + extra_info=None, + rerun_info=None, + extra_inputs=None, + rerun_outputs=None, + inject=False, + parametric_record=False, + remove_outputs=False, + skip_dirtycheck=False, + yield_expanded=None, +): """Run `cmd` in `dataset` and record the results. `Run.__call__` is a simple wrapper over this function. Aside from backward @@ -467,12 +506,15 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, lgr.warning("No command given") return specs = { - k: ensure_list(v) for k, v in (('inputs', inputs), - ('extra_inputs', extra_inputs), - ('outputs', outputs)) + k: ensure_list(v) + for k, v in ( + ("inputs", inputs), + ("extra_inputs", extra_inputs), + ("outputs", outputs), + ) } - rel_pwd = rerun_info.get('pwd') if rerun_info else None + rel_pwd = rerun_info.get("pwd") if rerun_info else None if rel_pwd and dataset: # recording is relative to the dataset pwd = op.normpath(op.join(dataset.path, rel_pwd)) @@ -481,11 +523,11 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, pwd, rel_pwd = get_command_pwds(dataset) ds = require_dataset( - dataset, check_installed=True, - purpose='track command outcomes') + dataset, check_installed=True, purpose="track command outcomes" + ) ds_path = ds.path - lgr.debug('tracking command output underneath %s', ds) + lgr.debug("tracking command output underneath %s", ds) # skip for callers that already take care of this if not (skip_dirtycheck or rerun_info or inject): @@ -495,12 +537,14 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, # MIH: is_dirty() is gone, but status() can do all of the above! if not explicit and ds.repo.dirty: yield get_status_dict( - 'run', + "run", ds=ds, - status='impossible', + status="impossible", message=( - 'clean dataset required to detect changes from command; ' - 'use `datalad status` to inspect unsaved changes')) + "clean dataset required to detect changes from command; " + "use `datalad status` to inspect unsaved changes" + ), + ) return # everything below expects the string-form of the command @@ -514,9 +558,7 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, cmd_fmt_kwargs[n] = val # apply the substitution to the IO specs - expanded_specs = { - k: _format_iospecs(v, **cmd_fmt_kwargs) for k, v in specs.items() - } + expanded_specs = {k: _format_iospecs(v, **cmd_fmt_kwargs) for k, v in specs.items()} # try-expect to catch expansion issues in _format_iospecs() which # expands placeholders in dependency/output specification before # globbing @@ -525,29 +567,37 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, k: GlobbedPaths( v, pwd=pwd, - expand=expand in ( + expand=expand + in ( # extra_inputs follow same expansion rules as `inputs`. - ["both"] + (['outputs'] if k == 'outputs' else ['inputs']) - )) + ["both"] + + (["outputs"] if k == "outputs" else ["inputs"]) + ), + ) for k, v in expanded_specs.items() } except KeyError as exc: yield get_status_dict( - 'run', + "run", ds=ds, - status='impossible', + status="impossible", message=( - 'input/output specification has an unrecognized ' - 'placeholder: %s', exc)) + "input/output specification has an unrecognized " "placeholder: %s", + exc, + ), + ) return if not (inject or dry_run): yield from _prep_worktree( - ds_path, pwd, globbed, + ds_path, + pwd, + globbed, assume_ready=assume_ready, remove_outputs=remove_outputs, rerun_outputs=rerun_outputs, - jobs=None) + jobs=None, + ) else: # If an inject=True caller wants to override the exit code, they can do # so in extra_info. @@ -564,18 +614,18 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, tmpdir=mkdtemp(prefix="datalad-run-") if "{tmpdir}" in cmd else "", # the following override any matching non-glob substitution # values - inputs=globbed['inputs'], - outputs=globbed['outputs'], + inputs=globbed["inputs"], + outputs=globbed["outputs"], ) try: cmd_expanded = format_command(ds, cmd, **cmd_fmt_kwargs) except KeyError as exc: yield get_status_dict( - 'run', + "run", ds=ds, - status='impossible', - message=('command has an unrecognized placeholder: %s', - exc)) + status="impossible", + message=("command has an unrecognized placeholder: %s", exc), + ) return # amend commit message with `run` info: @@ -583,25 +633,25 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, # - the command itself # - exit code of the command run_info = { - 'cmd': cmd, + "cmd": cmd, # rerun does not handle any prop being None, hence all # the `or/else []` - 'chain': rerun_info["chain"] if rerun_info else [], + "chain": rerun_info["chain"] if rerun_info else [], } # for all following we need to make sure that the raw # specifications, incl. any placeholders make it into # the run-record to enable "parametric" re-runs # ...except when expansion was requested for k, v in specs.items(): - run_info[k] = globbed[k].paths \ - if expand in ["both"] + ( - ['outputs'] if k == 'outputs' else ['inputs']) \ - else (v if parametric_record - else expanded_specs[k]) or [] + run_info[k] = ( + globbed[k].paths + if expand in ["both"] + (["outputs"] if k == "outputs" else ["inputs"]) + else (v if parametric_record else expanded_specs[k]) or [] + ) if rel_pwd is not None: # only when inside the dataset to not leak information - run_info['pwd'] = rel_pwd + run_info["pwd"] = rel_pwd if ds.id: run_info["dsid"] = ds.id if extra_info: @@ -609,26 +659,44 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, if dry_run: yield get_status_dict( - "run [dry-run]", ds=ds, status="ok", message="Dry run", + "run [dry-run]", + ds=ds, + status="ok", + message="Dry run", run_info=run_info, dry_run_info=dict( cmd_expanded=cmd_expanded, pwd_full=pwd, - **{k: globbed[k].expand() for k in ('inputs', 'outputs')}, - ) + **{k: globbed[k].expand() for k in ("inputs", "outputs")}, + ), ) return - + + # now check history of outputs in un-finished slurm commands + if check_outputs: + output_conflict = check_output_conflict(ds, run_info["outputs"]) + if output_conflict: + yield get_status_dict( + "schedule", + ds=ds, + status="error", + message=( + "There are conflicting outputs with the previously scheduled jobs: {}. \n" + "Finish those jobs or adjust output for the current job first." + ).format(output_conflict), + ) + return + # TODO what happens in case of inject?? if not inject: cmd_exitcode, exc, slurm_job_id = _execute_slurm_command(cmd_expanded, pwd) - run_info['exit'] = cmd_exitcode + run_info["exit"] = cmd_exitcode slurm_outputs, slurm_env_file = get_slurm_output_files(slurm_job_id) run_info["outputs"].extend(slurm_outputs) run_info["outputs"].append(slurm_env_file) - run_info["slurm_run_outputs"]=slurm_outputs + run_info["slurm_run_outputs"] = slurm_outputs run_info["slurm_run_outputs"].append(slurm_env_file) - + # add the slurm job id to the run info run_info["slurm_job_id"] = slurm_job_id @@ -639,9 +707,9 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, if explicit or expand in ["outputs", "both"]: # also for explicit mode we have to re-glob to be able to save all # matching outputs - globbed['outputs'].expand(refresh=True) + globbed["outputs"].expand(refresh=True) if expand in ["outputs", "both"]: - run_info["outputs"] = globbed['outputs'].paths + run_info["outputs"] = globbed["outputs"].paths # create the run record, either as a string, or written to a file # depending on the config/request @@ -656,25 +724,29 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, else: schedule_msg = "SCHEDULE" prefix = f"[DATALAD {schedule_msg}] " - msg = prefix + u"""\ + msg = ( + prefix + + """\ {} === Do not change lines below === {} ^^^ Do not change lines above ^^^ """ + ) # append pending to the message if message is not None: message += f"\n Submitted batch job {slurm_job_id}: Pending" else: message = f"Submitted batch job {slurm_job_id}: Pending" - + msg = msg.format( message if message is not None else cmd_shorty, - '"{}"'.format(record) if record_path else record) - - #outputs_to_save = globbed['slurm_job_file'].expand_strict() - #outputs_to_save = [f"slurm-job-submission-{slurm_job_id}"] + '"{}"'.format(record) if record_path else record, + ) + + # outputs_to_save = globbed['slurm_job_file'].expand_strict() + # outputs_to_save = [f"slurm-job-submission-{slurm_job_id}"] outputs_to_save = [slurm_env_file] do_save = outputs_to_save is None or outputs_to_save msg_path = None @@ -683,8 +755,9 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, repo = ds.repo # must record path to be relative to ds.path to meet # result record semantics (think symlink resolution, etc) - msg_path = ds.pathobj / \ - repo.dot_git.relative_to(repo.pathobj) / "COMMIT_EDITMSG" + msg_path = ( + ds.pathobj / repo.dot_git.relative_to(repo.pathobj) / "COMMIT_EDITMSG" + ) msg_path.write_text(msg) expected_exit = rerun_info.get("exit", 0) if rerun_info else None @@ -694,7 +767,8 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, status = "ok" run_result = get_status_dict( - "run", ds=ds, + "run", + ds=ds, status=status, # use the abbrev. command as the message to give immediate clarity what # completed/errors in the generic result rendering @@ -711,8 +785,8 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, ) if record_path: # we the record is in a sidecar file, report its ID - run_result['record_id'] = record - for s in ('inputs', 'outputs'): + run_result["record_id"] = record + for s in ("inputs", "outputs"): # this enables callers to further inspect the outputs without # performing globbing again. Together with remove_outputs=True # these would be guaranteed to be the outcome of the executed @@ -721,81 +795,131 @@ def run_command(cmd, dataset=None, inputs=None, outputs=None, expand=None, # calling .expand_strict() again is largely reporting cached # information # (format: relative paths) - if yield_expanded in (s, 'both'): - run_result[f'expanded_{s}'] = globbed[s].expand_strict() + if yield_expanded in (s, "both"): + run_result[f"expanded_{s}"] = globbed[s].expand_strict() yield run_result if do_save: with chpwd(pwd): for r in Save.__call__( - dataset=ds_path, - path=outputs_to_save, - recursive=True, - message=msg, - jobs=jobs, - return_type='generator', - # we want this command and its parameterization to be in full - # control about the rendering of results, hence we must turn - # off internal rendering - result_renderer='disabled', - on_failure='ignore'): + dataset=ds_path, + path=outputs_to_save, + recursive=True, + message=msg, + jobs=jobs, + return_type="generator", + # we want this command and its parameterization to be in full + # control about the rendering of results, hence we must turn + # off internal rendering + result_renderer="disabled", + on_failure="ignore", + ): yield r + +def check_output_conflict(dset, outputs): + """ + Check if the outputs from the current scheduled job conflict with other unfinished jobs. + """ + ds_repo = dset.repo + # get branch + rev_branch = ( + ds_repo.get_corresponding_branch() or ds_repo.get_active_branch() or "HEAD" + ) + revrange = rev_branch + + rev_lines = ds_repo.get_revisions( + revrange, fmt="%H %P", options=["--reverse", "--topo-order"] + ) + if not rev_lines: + return + + conflict_commits = [] + for rev_line in rev_lines: + # The strip() below is necessary because, with the format above, a + # commit without any parent has a trailing space. (We could also use a + # custom `rev-list --parents ...` call to avoid this.) + fields = rev_line.strip().split(" ") + rev, parents = fields[0], fields[1:] + res = get_status_dict("run", ds=dset, commit=rev, parents=parents) + full_msg = ds_repo.format_commit("%B", rev) + try: + msg, info = get_schedule_info(dset, full_msg) + if msg and info: + # then we have a hit on the schedule + # check if a corresponding finish command exists + job_finished = check_finish_exists(dset, rev, rev_branch) + if not job_finished: + # check if there is any overlap between this job's outputs, + # and the outputs from the other unfinished job + commit_outputs = info["outputs"] + output_conflict = any( + output in outputs for output in commit_outputs + ) + if output_conflict: + conflict_commits.append(rev[:7]) + except ValueError as exc: + # Recast the error so the message includes the revision. + raise ValueError("Error on {}'s message".format(rev)) from exc + + return conflict_commits + + def get_slurm_output_files(job_id): """ Gets the relative paths to StdOut and StdErr files for a Slurm job. - + Args: job_id (str): The Slurm job ID - + Returns: list: List containing relative path(s) to output files. If StdOut and StdErr are the same file, returns a single path. - + Raises: subprocess.CalledProcessError: If scontrol command fails ValueError: If required file paths cannot be found in scontrol output """ # Run scontrol command and get output try: - result = subprocess.run(['scontrol', 'show', 'job', str(job_id)], - capture_output=True, - text=True, - check=True) + result = subprocess.run( + ["scontrol", "show", "job", str(job_id)], + capture_output=True, + text=True, + check=True, + ) except subprocess.CalledProcessError as e: raise subprocess.CalledProcessError( - e.returncode, - e.cmd, - f"Failed to get job information: {e.output}" + e.returncode, e.cmd, f"Failed to get job information: {e.output}" ) - + # Parse output to find StdOut and StdErr parsed_data = parse_slurm_output(result.stdout) - stdout_path = parsed_data.get('StdOut') - stderr_path = parsed_data.get('StdErr') + stdout_path = parsed_data.get("StdOut") + stderr_path = parsed_data.get("StdErr") if not stdout_path or not stderr_path: raise ValueError("Could not find StdOut or StdErr paths in scontrol output") cwd = Path.cwd() stdout_path = Path(stdout_path) - stderr_path = Path(stderr_path) - + stderr_path = Path(stderr_path) + if not stdout_path or not stderr_path: raise ValueError("Could not find StdOut or StdErr paths in scontrol output") - + # Get current working directory and convert to Path object cwd = Path.cwd() - + # Convert output paths to Path objects stdout_path = Path(stdout_path) stderr_path = Path(stderr_path) # Write parsed data to JSON file slurm_env_file = stdout_path.parent / f"slurm-job-{job_id}.env.json" - with open(slurm_env_file, 'w') as f: + with open(slurm_env_file, "w") as f: json.dump(parsed_data, f, indent=2) - + # Get relative paths try: rel_stdout = os.path.relpath(stdout_path, cwd) @@ -803,7 +927,7 @@ def get_slurm_output_files(job_id): rel_slurmenv = os.path.relpath(slurm_env_file, cwd) except ValueError as e: raise ValueError(f"Cannot compute relative path: {e}") - + # If paths are the same, return just one if rel_stdout == rel_stderr: return [rel_stdout], rel_slurmenv @@ -816,13 +940,13 @@ def parse_slurm_output(output): result = {} # TODO Is this necessary for privacy purposes? # What is useful to oneself vs for the community when pushing to git - excluded_keys = {'UserId', 'JobId'} - for line in output.split('\n'): + excluded_keys = {"UserId", "JobId"} + for line in output.split("\n"): # Split line into space-separated parts parts = line.strip().split() for part in parts: - if '=' in part: - key, value = part.split('=', 1) + if "=" in part: + key, value = part.split("=", 1) if key not in excluded_keys: result[key] = value return result From 16e64e2ebc0769ef374f3b658a6954ca13d8b3f2 Mon Sep 17 00:00:00 2001 From: Tim Callow Date: Wed, 18 Dec 2024 11:15:04 +0100 Subject: [PATCH 2/2] Use glob to expand wildcards in comparison of outputs with history --- datalad_slurm/schedule.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datalad_slurm/schedule.py b/datalad_slurm/schedule.py index 3640970..845fbe0 100644 --- a/datalad_slurm/schedule.py +++ b/datalad_slurm/schedule.py @@ -21,6 +21,7 @@ from argparse import REMAINDER from pathlib import Path from tempfile import mkdtemp +import glob import datalad import datalad.support.ansi_colors as ac @@ -821,6 +822,11 @@ def check_output_conflict(dset, outputs): """ Check if the outputs from the current scheduled job conflict with other unfinished jobs. """ + # expand the outputs + # TODO figure out how to use GlobbedPaths for this + outputs = [glob.glob(k) for k in outputs] + outputs = [x for sublist in outputs for x in sublist] + ds_repo = dset.repo # get branch rev_branch = ( @@ -852,7 +858,9 @@ def check_output_conflict(dset, outputs): if not job_finished: # check if there is any overlap between this job's outputs, # and the outputs from the other unfinished job - commit_outputs = info["outputs"] + # expand the outputs into a single list - why is this not default?? + commit_outputs = [glob.glob(k) for k in info["outputs"]] + commit_outputs = [x for sublist in commit_outputs for x in sublist] output_conflict = any( output in outputs for output in commit_outputs )