From e25c888ecaa73f8d6b4513e4f49db63c940a6f3e Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 11:08:58 -0400 Subject: [PATCH 01/18] =?UTF-8?q?=F0=9F=94=A7=20add=20slurm=20updates=20fr?= =?UTF-8?q?om=20v7.0.0=20on=20the=20latest=20py2=20working=20version?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/abstractBatchSystem.py | 39 +++ src/toil/batchSystems/slurm.py | 316 ++++++++++++++----- src/toil/lib/misc.py | 59 ++++ 3 files changed, 334 insertions(+), 80 deletions(-) diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 5d87676c8e..ce5ed377e7 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -16,6 +16,7 @@ standard_library.install_aliases() from future.utils import with_metaclass from builtins import object +import enum import os import shutil import logging @@ -37,6 +38,44 @@ logger = logging.getLogger(__name__) +# Value to use as exitStatus in UpdatedBatchJobInfo.exitStatus when status is not available. +EXIT_STATUS_UNAVAILABLE_VALUE = 255 + +class BatchJobExitReason(enum.IntEnum): + FINISHED = 1 + """Successfully finished.""" + FAILED = 2 + """Job finished, but failed.""" + LOST = 3 + """Preemptable failure (job's executing host went away).""" + KILLED = 4 + """Job killed before finishing.""" + ERROR = 5 + """Internal error.""" + MEMLIMIT = 6 + """Job hit batch system imposed memory limit.""" + MISSING = 7 + """Job disappeared from the scheduler without actually stopping, so Toil killed it.""" + MAXJOBDURATION = 8 + """Job ran longer than --maxJobDuration, so Toil killed it.""" + PARTITION = 9 + """Job was not able to talk to the leader via the job store, so Toil declared it failed.""" + + + @classmethod + def to_string(cls, value): + """ + Convert to human-readable string. + + Given an int that may be or may be equal to a value from the enum, + produce the string value of its matching enum entry, or a stringified + int. + """ + try: + return cls(value).name + except ValueError: + return str(value) + # A class containing the information required for worker cleanup on shutdown of the batch system. WorkerCleanupInfo = namedtuple('WorkerCleanupInfo', ( diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index b7eed8a9b6..0703e47e24 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -28,10 +28,43 @@ from six import iteritems from toil.batchSystems import MemoryString -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE +from toil.lib.misc import CalledProcessErrorStderr, call_command logger = logging.getLogger(__name__) +TERMINAL_STATES = { + "BOOT_FAIL": BatchJobExitReason.LOST, + "CANCELLED": BatchJobExitReason.KILLED, + "COMPLETED": BatchJobExitReason.FINISHED, + "DEADLINE": BatchJobExitReason.KILLED, + "FAILED": BatchJobExitReason.FAILED, + "NODE_FAIL": BatchJobExitReason.LOST, + "OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT, + "PREEMPTED": BatchJobExitReason.KILLED, + "REVOKED": BatchJobExitReason.KILLED, + "SPECIAL_EXIT": BatchJobExitReason.FAILED, + "TIMEOUT": BatchJobExitReason.KILLED +} + +# If a job is in one of these states, it might eventually move to a different +# state. +NONTERMINAL_STATES = { + "CONFIGURING", + "COMPLETING", + "PENDING", + "RUNNING", + "RESV_DEL_HOLD", + "REQUEUE_FED", + "REQUEUE_HOLD", + "REQUEUED", + "RESIZING", + "SIGNALING", + "STAGE_OUT", + "STOPPED", + "SUSPENDED" +} + class SlurmBatchSystem(AbstractGridEngineBatchSystem): class Worker(AbstractGridEngineBatchSystem.Worker): @@ -46,7 +79,7 @@ def getRunningJobIDs(self): # -h for no header # --format to get jobid i, state %t and time days-hours:minutes:seconds - lines = subprocess.check_output(['squeue', '-h', '--format', '%i %t %M']).decode('utf-8').split('\n') + lines = call_command(['squeue', '-h', '--format', '%i %t %M'], quiet=True).split('\n') for line in lines: values = line.split() if len(values) < 3: @@ -59,14 +92,14 @@ def getRunningJobIDs(self): return times def killJob(self, jobID): - subprocess.check_call(['scancel', self.getBatchSystemID(jobID)]) + call_command(['scancel', self.getBatchSystemID(jobID)]) def prepareSubmission(self, cpu, memory, jobID, command, jobName): return self.prepareSbatch(cpu, memory, jobID, jobName) + ['--wrap={}'.format(command)] def submitJob(self, subLine): try: - output = subprocess.check_output(subLine, stderr=subprocess.STDOUT).decode('utf-8') + output = call_command(subLine) # sbatch prints a line like 'Submitted batch job 2954103' result = int(output.strip().split()[-1]) logger.debug("sbatch submitted job %d", result) @@ -75,94 +108,217 @@ def submitJob(self, subLine): logger.error("sbatch command failed") raise e - def getJobExitCode(self, slurmJobID): - logger.debug("Getting exit code for slurm job %d", int(slurmJobID)) - - state, rc = self._getJobDetailsFromSacct(slurmJobID) - - if rc == -999: - state, rc = self._getJobDetailsFromScontrol(slurmJobID) - - logger.debug("s job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): + def getJobExitCode(self, batchJobID): + """ + Get job exit code for given batch job ID. + :param batchJobID: string of the form "[.]". + :return: integer job exit code. + """ + logger.debug("Getting exit code for slurm job %d", int(batchJobID)) + + job_id = int(batchJobID.split('.')[0]) + status_dict = self._get_job_details([job_id]) + status = status_dict[job_id] + return self._get_job_return_code(status) + + def _get_job_details(self, job_id_list): + """ + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + Fetch job details from Slurm's accounting system or job control system. + :param job_id_list: list of integer Job IDs. + :return: dict of job statuses, where key is the integer job ID, and value is a tuple + containing the job's state and exit code. + """ + try: + status_dict = self._getJobDetailsFromSacct(job_id_list) + except CalledProcessErrorStderr: + status_dict = self._getJobDetailsFromScontrol(job_id_list) + return status_dict + + def _get_job_return_code(self, status): + """ + Given a Slurm return code, status pair, summarize them into a Toil return code, exit reason pair. + + The return code may have already been OR'd with the 128-offset + Slurm-reported signal. + + Slurm will report return codes of 0 even if jobs time out instead + of succeeding: + + 2093597|TIMEOUT|0:0 + 2093597.batch|CANCELLED|0:15 + + So we guarantee here that, if the Slurm status string is not a + successful one as defined in + , we + will not return a successful return code. + + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + :param status: tuple containing the job's state and it's return code from Slurm. + :return: the job's return code for Toil if it's completed, otherwise None. + """ + state, rc = status + + if state not in TERMINAL_STATES: + # Don't treat the job as exited yet return None + + exit_reason = TERMINAL_STATES[state] + + if exit_reason == BatchJobExitReason.FINISHED: + # The only state that should produce a 0 ever is COMPLETED. So + # if the job is COMPLETED and the exit reason is thus FINISHED, + # pass along the code it has. + return (rc, exit_reason) + + if rc == 0: + # The job claims to be in a state other than COMPLETED, but + # also to have not encountered a problem. Say the exit status + # is unavailable. + return (EXIT_STATUS_UNAVAILABLE_VALUE, exit_reason) + + # If the code is nonzero, pass it along. + return (rc, exit_reason) + + def _canonicalize_state(self, state): + """ + Turn a state string form SLURM into just the state token like "CANCELED". + """ + + # Slurm will sometimes send something like "CANCELED by 30065" in + # the state column for some reason. - return rc + state_token = state + + if " " in state_token: + state_token = state.split(" ", 1)[0] + + if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES: + raise RuntimeError("Toil job in unimplemented Slurm state " + state) - def _getJobDetailsFromSacct(self, slurmJobID): - # SLURM job exit codes are obtained by running sacct. + return state_token + + def _getJobDetailsFromSacct(self, job_id_list): + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ + job_ids = ",".join(str(id) for id in job_id_list) args = ['sacct', - '-n', # no header - '-j', str(slurmJobID), # job - '--format', 'State,ExitCode', # specify output columns - '-P', # separate columns with pipes - '-S', '1970-01-01'] # override start time limit - - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - rc = process.returncode - - if rc != 0: - # no accounting system or some other error - return (None, -999) - - for line in process.stdout: - values = line.decode('utf-8').strip().split('|') - if len(values) < 2: + '-n', # no header + '-j', job_ids, # job + '--format', 'JobIDRaw,State,ExitCode', # specify output columns + '-P', # separate columns with pipes + '-S', '1970-01-01'] # override start time limit + stdout = call_command(args, quiet=True) + + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `sacct`. + job_statuses = {} + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + + for line in stdout.splitlines(): + values = line.strip().split('|') + if len(values) < 3: continue - state, exitcode = values - logger.debug("sacct job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - status, signal = [int(n) for n in exitcode.split(':')] + job_id_raw, state, exitcode = values + state = self._canonicalize_state(state) + logger.debug("%s state of job %s is %s", args[0], job_id_raw, state) + # JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps. + job_id_parts = job_id_raw.split(".") + if len(job_id_parts) > 1: + continue + job_id = int(job_id_parts[0]) + status, signal = (int(n) for n in exitcode.split(':')) if signal > 0: # A non-zero signal may indicate e.g. an out-of-memory killed job status = 128 + signal - logger.debug("sacct exit code is %s, returning status %d", exitcode, status) - return (state, status) - logger.debug("Did not find exit code for job in sacct output") - return None + logger.debug("%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + job_statuses[job_id] = state, status + logger.debug("%s returning job statuses: %s", args[0], job_statuses) + return job_statuses - def _getJobDetailsFromScontrol(self, slurmJobID): + def _getJobDetailsFromScontrol(self, job_id_list): + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `scontrol`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ args = ['scontrol', 'show', - 'job', - str(slurmJobID)] - - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - job = dict() - for line in process.stdout: - values = line.decode('utf-8').strip().split() - - # If job information is not available an error is issued: - # slurm_load_jobs error: Invalid job id specified - # There is no job information, so exit. - if len(values)>0 and values[0] == 'slurm_load_jobs': - return (None, None) - - # Output is in the form of many key=value pairs, multiple pairs on each line - # and multiple lines in the output. Each pair is pulled out of each line and - # added to a dictionary - for v in values: - bits = v.split('=') - job[bits[0]] = bits[1] - - state = job['JobState'] - try: - exitcode = job['ExitCode'] - if exitcode is not None: - status, signal = [int(n) for n in exitcode.split(':')] - if signal > 0: - # A non-zero signal may indicate e.g. an out-of-memory killed job - status = 128 + signal - logger.debug("scontrol exit code is %s, returning status %d", exitcode, status) - rc = status - else: + 'job'] + # `scontrol` can only return information about a single job, + # or all the jobs it knows about. + if len(job_id_list) == 1: + args.append(str(job_id_list[0])) + + stdout = call_command(args, quiet=True) + + # Job records are separated by a blank line. + if isinstance(stdout, str): + job_records = stdout.strip().split('\n\n') + elif isinstance(stdout, bytes): + job_records = stdout.decode('utf-8').strip().split('\n\n') + + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `scontrol`. + job_statuses = {} + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + + # `scontrol` will report "No jobs in the system", if there are no jobs in the system, + # and if no job-id was passed as argument to `scontrol`. + if len(job_records) > 0 and job_records[0] == "No jobs in the system": + return job_statuses + + for record in job_records: + job = {} + for line in record.splitlines(): + for item in line.split(): + # Output is in the form of many key=value pairs, multiple pairs on each line + # and multiple lines in the output. Each pair is pulled out of each line and + # added to a dictionary. + # Note: In some cases, the value itself may contain white-space. So, if we find + # a key without a value, we consider that key part of the previous value. + bits = item.split('=', 1) + if len(bits) == 1: + job[key] += ' ' + bits[0] + else: + key = bits[0] + job[key] = bits[1] + # The first line of the record contains the JobId. Stop processing the remainder + # of this record, if we're not interested in this job. + job_id = int(job['JobId']) + if job_id not in job_id_list: + logger.debug("%s job %d is not in the list", args[0], job_id) + break + if job_id not in job_id_list: + continue + state = job['JobState'] + state = self._canonicalize_state(state) + logger.debug("%s state of job %s is %s", args[0], job_id, state) + try: + exitcode = job['ExitCode'] + if exitcode is not None: + status, signal = (int(n) for n in exitcode.split(':')) + if signal > 0: + # A non-zero signal may indicate e.g. an out-of-memory killed job + status = 128 + signal + logger.debug("%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + rc = status + else: + rc = None + except KeyError: rc = None - except KeyError: - rc = None - - return (state, rc) + job_statuses[job_id] = (state, rc) + logger.debug("%s returning job statuses: %s", args[0], job_statuses) + return job_statuses """ Implementation-specific helper methods diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index e682e3310f..805827cfed 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -1,19 +1,34 @@ import random from six.moves import xrange from math import sqrt +import datetime import errno +import logging import os import shutil +import subprocess import sys import time import socket from contextlib import contextmanager + if sys.version_info[0] < 3: # Define a usable FileNotFoundError as will be raised by os.remove on a # nonexistent file. FileNotFoundError = OSError +logger = logging.getLogger(__name__) +class CalledProcessErrorStderr(subprocess.CalledProcessError): + """Version of CalledProcessError that include stderr in the error message if it is set""" + + def __str__(self): + if (self.returncode < 0) or (self.stderr is None): + return str(super(CalledProcessErrorStderr, self)) + else: + err = self.stderr if isinstance(self.stderr, str) else self.stderr.decode("ascii", errors="replace") + return "Command '%s' exit status %d: %s" % (self.cmd, self.returncode, err) + def mkdir_p(path): """The equivalent of mkdir -p""" @@ -243,3 +258,47 @@ def atomic_copyobj(src_fh, dest_path, length=16384): with AtomicFileCreate(dest_path) as dest_path_tmp: with open(dest_path_tmp, 'wb') as dest_path_fh: shutil.copyfileobj(src_fh, dest_path_fh, length=length) + + +def call_command(cmd, *args, input=None, timeout=None, useCLocale=True, env=None, quiet=False): + """ + Simplified calling of external commands. + + If the process fails, CalledProcessErrorStderr is raised. + + The captured stderr is always printed, regardless of + if an exception occurs, so it can be logged. + + Always logs the command at debug log level. + + :param quiet: If True, do not log the command output. If False (the + default), do log the command output at debug log level. + + :param useCLocale: If True, C locale is forced, to prevent failures that + can occur in some batch systems when using UTF-8 locale. + + :returns: Command standard output, decoded as utf-8. + """ + + # NOTE: Interface MUST be kept in sync with call_sacct and call_scontrol in + # test_slurm.py, which monkey-patch this! + + # using non-C locales can cause GridEngine commands, maybe other to + # generate errors + if useCLocale: + env = dict(os.environ) if env is None else dict(env) # copy since modifying + env["LANGUAGE"] = env["LC_ALL"] = "C" + + logger.debug("run command: {}".format(" ".join(cmd))) + start_time = datetime.datetime.now() + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + encoding='utf-8', errors="replace", env=env) + stdout, stderr = proc.communicate(input=input, timeout=timeout) + end_time = datetime.datetime.now() + runtime = (end_time - start_time).total_seconds() + sys.stderr.write(stderr) + if proc.returncode != 0: + logger.debug("command failed in {}s: {}: {}".format(runtime, " ".join(cmd), stderr.rstrip())) + raise CalledProcessErrorStderr(proc.returncode, cmd, output=stdout, stderr=stderr) + logger.debug("command succeeded in {}s: {}{}".format(runtime, " ".join(cmd), (': ' + stdout.rstrip()) if not quiet else '')) + return stdout \ No newline at end of file From e1cb5b9310257f61c0db26422e037cca9f339ae4 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 17:23:33 -0400 Subject: [PATCH 02/18] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20attempt=20to=20add?= =?UTF-8?q?=20retry=20with=20OOM=20error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 56 ++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 0703e47e24..cb1ba0e7c3 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -15,6 +15,7 @@ from __future__ import absolute_import from __future__ import division from builtins import str +from collections import defaultdict from past.utils import old_div import logging import os @@ -33,6 +34,9 @@ logger = logging.getLogger(__name__) + +MAX_MEMORY = 60 * 1e9 + TERMINAL_STATES = { "BOOT_FAIL": BatchJobExitReason.LOST, "CANCELLED": BatchJobExitReason.KILLED, @@ -67,8 +71,27 @@ class SlurmBatchSystem(AbstractGridEngineBatchSystem): + def __init__(self, *args, **kwargs): + """Create a mapping table for JobIDs to JobNodes.""" + super().__init__(*args, **kwargs) + self.Id2Node = {} + self.resourceRetryCount = defaultdict(set) + + def issueBatchJob(self, jobDesc, job_environment=None): + """Load the jobDesc into the JobID mapping table.""" + jobID = super().issueBatchJob(jobDesc, job_environment) + self.Id2Node[jobID] = jobDesc + return jobID + + class Worker(AbstractGridEngineBatchSystem.Worker): + def forgetJob(self, jobID): + """Remove jobNode from the mapping table when forgetting.""" + self.boss.Id2Node.pop(jobID, None) + self.boss.resourceRetryCount.pop(jobID, None) + return super().forgetJob(jobID) + def getRunningJobIDs(self): # Should return a dictionary of Job IDs and number of seconds times = {} @@ -119,7 +142,36 @@ def getJobExitCode(self, batchJobID): job_id = int(batchJobID.split('.')[0]) status_dict = self._get_job_details([job_id]) status = status_dict[job_id] - return self._get_job_return_code(status) + exit_code = self._get_job_return_code(status) + + if exit_code[1] == BatchJobExitReason.MEMLIMIT: + # If job was killed because of memory, retry it with more memory. + status = self._get_job_return_code(self._customRetry(job_id)) + return exit_code + + def _customRetry(self, jobID): + """Retry job if killed by slurm due to memlimit problems.""" + try: + jobNode = self.boss.Id2Node[jobID] + except KeyError: + logger.error("Can't resource retry %s, jobNode not found", jobID) + return 1 + + retry_type = "memlimit" + jobNode.jobName = (jobNode.jobName or "") + "OOM resource retry" + memory = jobNode.memory * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY + sbatch_line = self.prepareSubmission(jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName) + + if retry_type not in self.boss.resourceRetryCount[jobID]: + slurm_job_id = self.submitJob(sbatch_line) + self.batchJobIDs[jobID] = (slurm_job_id, None) + self.boss.resourceRetryCount[jobID].add(retry_type) + logger.info("Detected job killed by SLURM, attempting retry: %s", slurm_job_id) + else: + logger.error("Can't retry for %s twice: %s", retry_type, jobID) + return 1 + + return None def _get_job_details(self, job_id_list): """ @@ -393,7 +445,7 @@ def obtainSystemConstants(cls): # --format to get memory, cpu max_cpu = 0 max_mem = MemoryString('0') - lines = subprocess.check_output(['sinfo', '-Nhe', '--format', '%m %c']).decode('utf-8').split('\n') + lines = call_command(['sinfo', '-Nhe', '--format', '%m %c'], quiet=True).split('\n') for line in lines: values = line.split() if len(values) < 2: From 00ef8acf0dd431cdb4fb022b79afea5adc63649f Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 21:59:58 -0400 Subject: [PATCH 03/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20enum,=20import=20and?= =?UTF-8?q?=20*args=20errors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.py | 4 +++- src/toil/batchSystems/slurm.py | 3 ++- src/toil/lib/misc.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index a5a62b9ab2..ccc5864873 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ def runSetup(): addict = 'addict<=2.2.0' sphinx = 'sphinx==1.7.5' pathlib2 = 'pathlib2==2.3.2' + enum34 = 'enum34==1.1.10' core_reqs = [ dill, @@ -71,7 +72,8 @@ def runSetup(): subprocess32, addict, sphinx, - pathlib2] + pathlib2, + enum34] aws_reqs = [ boto, diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index cb1ba0e7c3..41ef2c3cfd 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -29,7 +29,8 @@ from six import iteritems from toil.batchSystems import MemoryString -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem +from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE from toil.lib.misc import CalledProcessErrorStderr, call_command logger = logging.getLogger(__name__) diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index 805827cfed..7bd6314096 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -260,7 +260,7 @@ def atomic_copyobj(src_fh, dest_path, length=16384): shutil.copyfileobj(src_fh, dest_path_fh, length=length) -def call_command(cmd, *args, input=None, timeout=None, useCLocale=True, env=None, quiet=False): +def call_command(cmd, input=None, timeout=None, useCLocale=True, env=None, quiet=False, *args): """ Simplified calling of external commands. From 4d6eba9a617e15a8e1f39bfb677a82bb2a16edc9 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:02:28 -0400 Subject: [PATCH 04/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20python2=20syntaxis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 41ef2c3cfd..7c823ee013 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -74,13 +74,13 @@ class SlurmBatchSystem(AbstractGridEngineBatchSystem): def __init__(self, *args, **kwargs): """Create a mapping table for JobIDs to JobNodes.""" - super().__init__(*args, **kwargs) + super(SlurmBatchSystem, self).__init__(*args, **kwargs) self.Id2Node = {} self.resourceRetryCount = defaultdict(set) def issueBatchJob(self, jobDesc, job_environment=None): """Load the jobDesc into the JobID mapping table.""" - jobID = super().issueBatchJob(jobDesc, job_environment) + jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc, job_environment) self.Id2Node[jobID] = jobDesc return jobID @@ -91,7 +91,7 @@ def forgetJob(self, jobID): """Remove jobNode from the mapping table when forgetting.""" self.boss.Id2Node.pop(jobID, None) self.boss.resourceRetryCount.pop(jobID, None) - return super().forgetJob(jobID) + return super(SlurmBatchSystem.Worker, self).forgetJob(jobID) def getRunningJobIDs(self): # Should return a dictionary of Job IDs and number of seconds From bc0f1a0ee309f58287c553d033888764b1610dad Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:06:11 -0400 Subject: [PATCH 05/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20python2=20syntaxis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/lib/misc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index 7bd6314096..0b6535a08a 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -291,8 +291,7 @@ def call_command(cmd, input=None, timeout=None, useCLocale=True, env=None, quiet logger.debug("run command: {}".format(" ".join(cmd))) start_time = datetime.datetime.now() - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - encoding='utf-8', errors="replace", env=env) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate(input=input, timeout=timeout) end_time = datetime.datetime.now() runtime = (end_time - start_time).total_seconds() From f6cfb002920ac2c5d19fc8e40df7918fe5698010 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:07:23 -0400 Subject: [PATCH 06/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20python2=20syntaxis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/lib/misc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index 0b6535a08a..edde2e8244 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -292,7 +292,7 @@ def call_command(cmd, input=None, timeout=None, useCLocale=True, env=None, quiet logger.debug("run command: {}".format(" ".join(cmd))) start_time = datetime.datetime.now() proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate(input=input, timeout=timeout) + stdout, stderr = proc.communicate(input=input) end_time = datetime.datetime.now() runtime = (end_time - start_time).total_seconds() sys.stderr.write(stderr) From 9a4bfbd67c1de24f01203af403a3d2edd7291783 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:09:18 -0400 Subject: [PATCH 07/18] =?UTF-8?q?=F0=9F=90=9B=20remove=20job=20env=20from?= =?UTF-8?q?=20issueBatchJob?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 7c823ee013..49f2ca0e46 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -78,9 +78,9 @@ def __init__(self, *args, **kwargs): self.Id2Node = {} self.resourceRetryCount = defaultdict(set) - def issueBatchJob(self, jobDesc, job_environment=None): + def issueBatchJob(self, jobDesc): """Load the jobDesc into the JobID mapping table.""" - jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc, job_environment) + jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc) self.Id2Node[jobID] = jobDesc return jobID From 4aea8eeeb85e408d3ae4550521971c0ee84f63c7 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:25:34 -0400 Subject: [PATCH 08/18] =?UTF-8?q?=F0=9F=90=9B=20handle=20exit=5Fcode=20Non?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 49f2ca0e46..aea37c3aa8 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -145,7 +145,7 @@ def getJobExitCode(self, batchJobID): status = status_dict[job_id] exit_code = self._get_job_return_code(status) - if exit_code[1] == BatchJobExitReason.MEMLIMIT: + if exit_code is not None and exit_code[1] == BatchJobExitReason.MEMLIMIT: # If job was killed because of memory, retry it with more memory. status = self._get_job_return_code(self._customRetry(job_id)) return exit_code From fc6c1dadfc08b7b41675edae1a11d883d185db95 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 30 May 2024 22:55:34 -0400 Subject: [PATCH 09/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20exit=5Fcode=20return?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index aea37c3aa8..94acef0be7 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -147,7 +147,7 @@ def getJobExitCode(self, batchJobID): if exit_code is not None and exit_code[1] == BatchJobExitReason.MEMLIMIT: # If job was killed because of memory, retry it with more memory. - status = self._get_job_return_code(self._customRetry(job_id)) + exit_code = self._customRetry(job_id) return exit_code def _customRetry(self, jobID): From 32b6e660684f89ab2526139c9febfeef6d9f3558 Mon Sep 17 00:00:00 2001 From: Juan Esteban Arango Ossa Date: Thu, 11 Jul 2024 12:00:08 -0400 Subject: [PATCH 10/18] =?UTF-8?q?=F0=9F=94=A7=20catch=20additional=20errno?= =?UTF-8?q?=20codes=20when=20linking=20fileJobStore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/jobStores/fileJobStore.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index f71cf902f7..28d0f2c9bd 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -417,6 +417,7 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # It worked! return except OSError as e: + # For the list of the possible errno codes, see: https://linux.die.net/man/2/symlink if e.errno == errno.EEXIST: # Overwrite existing file, emulating shutil.copyfile(). os.unlink(localFilePath) @@ -426,7 +427,12 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # Now we succeeded and don't need to copy return + elif e.errno == errno.EPERM: + # On some filesystems, the creation of symbolic links is not possible. + # In this case, we try to make a hard link. + pass else: + logger.error(f"Unexpected OSError when reading file '{jobStoreFilePath}' from job store") raise # If we get here, symlinking isn't an option. @@ -440,23 +446,38 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # It worked! return except OSError as e: + # For the list of the possible errno codes, see: https://linux.die.net/man/2/symlink if e.errno == errno.EEXIST: # Overwrite existing file, emulating shutil.copyfile(). os.unlink(localFilePath) # It would be very unlikely to fail again for same reason but possible # nonetheless in which case we should just give up. os.link(jobStoreFilePath, localFilePath) - # Now we succeeded and don't need to copy return elif e.errno == errno.EXDEV: # It's a cross-device link even though it didn't appear to be. # Just keep going and hit the file copy case. pass + # See https://github.com/DataBiosphere/toil/pull/4284 + elif e.errno == errno.EPERM: + # It's a cross-device link even though it didn't appear to be. + # Just keep going and hit the file copy case. + pass + elif e.errno == errno.EPERM: + # On some filesystems, hardlinking could be disallowed by permissions. + # In this case, we also fall back to making a complete copy. + pass + elif e.errno == errno.ELOOP: + # Too many symbolic links were encountered. Just keep going and hit the + # file copy case. + pass + elif e.errno == errno.EMLINK: + # The maximum number of links to file is reached. Just keep going and + # hit the file copy case. + pass else: - logger.critical('Unexpected OSError when reading file from job store') - logger.critical('jobStoreFilePath: ' + jobStoreFilePath + ' ' + str(os.path.exists(jobStoreFilePath))) - logger.critical('localFilePath: ' + localFilePath + ' ' + str(os.path.exists(localFilePath))) + logger.error("Unexpected OSError when reading file " + jobStoreFilePath + " from job store") raise # If we get here, neither a symlink nor a hardlink will work. From 1c7a1094b6e509f9eff4ec0aed4776f02708da37 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Thu, 11 Jul 2024 17:06:57 -0400 Subject: [PATCH 11/18] =?UTF-8?q?=F0=9F=94=A7=20fix=20exit=20code=20return?= =?UTF-8?q?=20from=20slurm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 10 ++++++---- src/toil/jobStores/fileJobStore.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 94acef0be7..59e56bdd63 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -143,9 +143,12 @@ def getJobExitCode(self, batchJobID): job_id = int(batchJobID.split('.')[0]) status_dict = self._get_job_details([job_id]) status = status_dict[job_id] - exit_code = self._get_job_return_code(status) - - if exit_code is not None and exit_code[1] == BatchJobExitReason.MEMLIMIT: + exit_status = self._get_job_return_code(status) + if exit_status is None: + return None + + exit_code, exit_reason = exit_status + if exit_reason == BatchJobExitReason.MEMLIMIT: # If job was killed because of memory, retry it with more memory. exit_code = self._customRetry(job_id) return exit_code @@ -171,7 +174,6 @@ def _customRetry(self, jobID): else: logger.error("Can't retry for %s twice: %s", retry_type, jobID) return 1 - return None def _get_job_details(self, job_id_list): diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 28d0f2c9bd..e266a045c5 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -432,7 +432,7 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # In this case, we try to make a hard link. pass else: - logger.error(f"Unexpected OSError when reading file '{jobStoreFilePath}' from job store") + logger.error("Unexpected OSError when reading file " + jobStoreFilePath + " from job store") raise # If we get here, symlinking isn't an option. From 48ab174c37f203f01f118b648e0b6c570384b253 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Thu, 11 Jul 2024 17:20:50 -0400 Subject: [PATCH 12/18] =?UTF-8?q?=F0=9F=94=A7=20add=20env=20TOIL=5FSLURM?= =?UTF-8?q?=5FPER=5FCPU=20to=20control=20memory=20in=20sbatch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 59e56bdd63..3b4ff5b7b4 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -20,13 +20,9 @@ import logging import os from pipes import quote -from toil import subprocess -import time import math # Python 3 compatibility imports -from six.moves.queue import Empty, Queue -from six import iteritems from toil.batchSystems import MemoryString from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem @@ -394,7 +390,11 @@ def prepareSbatch(self, cpu, mem, jobID, jobName): if mem is not None: # memory passed in is in bytes, but slurm expects megabytes - sbatch_line.append('--mem={}'.format(old_div(int(mem), 2 ** 20))) + per_cpu = os.getenv("TOIL_SLURM_PER_CPU") + if per_cpu == "Y": + sbatch_line.append('--mem-per-cpu={}'.format(old_div(int(mem), 2 ** 20))) + else: + sbatch_line.append('--mem={}'.format(old_div(int(mem), 2 ** 20))) if cpu is not None: sbatch_line.append('--cpus-per-task={}'.format(int(math.ceil(cpu)))) From 90ecfd0b44bae2be844147e01fd23617b6d59d12 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 12 Jul 2024 15:04:17 -0400 Subject: [PATCH 13/18] =?UTF-8?q?=F0=9F=90=9B=20fix=20toil=20->=20slurm=20?= =?UTF-8?q?job=20dict=20to=20rrerun=20memory=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 48 ++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 3b4ff5b7b4..8ec513003e 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -25,7 +25,7 @@ # Python 3 compatibility imports from toil.batchSystems import MemoryString -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, with_retries from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE from toil.lib.misc import CalledProcessErrorStderr, call_command @@ -136,19 +136,28 @@ def getJobExitCode(self, batchJobID): """ logger.debug("Getting exit code for slurm job %d", int(batchJobID)) - job_id = int(batchJobID.split('.')[0]) - status_dict = self._get_job_details([job_id]) - status = status_dict[job_id] + slurm_job_id = int(batchJobID.split('.')[0]) + status_dict = self._get_job_details([slurm_job_id]) + status = status_dict[slurm_job_id] + exit_status = self._get_job_return_code(status) if exit_status is None: return None exit_code, exit_reason = exit_status if exit_reason == BatchJobExitReason.MEMLIMIT: - # If job was killed because of memory, retry it with more memory. - exit_code = self._customRetry(job_id) + # Retry job with 2x memory if it was killed because of memory + jobID = self._getJobID(slurm_job_id) + exit_code = self._customRetry(jobID) return exit_code + def _getJobID(self, slurm_job_id): + """Get toil job ID from the slurm job ID.""" + job_ids_dict = {slurm_job[0]: toil_job for toil_job, slurm_job in self.batchJobIDs.items()} + if slurm_job_id not in job_ids_dict: + raise RuntimeError("Unknown slurmJobID, could not be converted") + return job_ids_dict[slurm_job_id] + def _customRetry(self, jobID): """Retry job if killed by slurm due to memlimit problems.""" try: @@ -158,32 +167,37 @@ def _customRetry(self, jobID): return 1 retry_type = "memlimit" - jobNode.jobName = (jobNode.jobName or "") + "OOM resource retry" - memory = jobNode.memory * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY - sbatch_line = self.prepareSubmission(jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName) - if retry_type not in self.boss.resourceRetryCount[jobID]: - slurm_job_id = self.submitJob(sbatch_line) - self.batchJobIDs[jobID] = (slurm_job_id, None) + # Submit job with 2x memory + jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry" + memory = jobNode.memory * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY + + sbatch_line = self.prepareSubmission(jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName) + logger.debug("Running %r", sbatch_line) + new_slurm_id = with_retries(self.submitJob, sbatch_line) + self.batchJobIDs[jobID] = (new_slurm_id, None) self.boss.resourceRetryCount[jobID].add(retry_type) - logger.info("Detected job killed by SLURM, attempting retry: %s", slurm_job_id) + logger.info("Detected job killed by SLURM, attempting retry with 2x memory: %s", new_slurm_id) + + with self.runningJobsLock: + self.runningJobs.add(jobID) else: logger.error("Can't retry for %s twice: %s", retry_type, jobID) return 1 return None - def _get_job_details(self, job_id_list): + def _get_job_details(self, batch_job_ids): """ Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. Fetch job details from Slurm's accounting system or job control system. - :param job_id_list: list of integer Job IDs. + :param batch_job_ids: list of integer Job IDs. :return: dict of job statuses, where key is the integer job ID, and value is a tuple containing the job's state and exit code. """ try: - status_dict = self._getJobDetailsFromSacct(job_id_list) + status_dict = self._getJobDetailsFromSacct(batch_job_ids) except CalledProcessErrorStderr: - status_dict = self._getJobDetailsFromScontrol(job_id_list) + status_dict = self._getJobDetailsFromScontrol(batch_job_ids) return status_dict def _get_job_return_code(self, status): From 991eda7768edbb3f48ba62b26536309f0ca5ce7b Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 12 Jul 2024 15:05:43 -0400 Subject: [PATCH 14/18] =?UTF-8?q?=F0=9F=94=A7=20skip=20error=20code=20EPER?= =?UTF-8?q?M=20when=20kill=20pid=20fails?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/fileStores/abstractFileStore.py | 3 +++ src/toil/lib/misc.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/toil/fileStores/abstractFileStore.py b/src/toil/fileStores/abstractFileStore.py index 8801c190b2..25f8530c1e 100644 --- a/src/toil/fileStores/abstractFileStore.py +++ b/src/toil/fileStores/abstractFileStore.py @@ -465,6 +465,9 @@ def _pidExists(pid): if err.errno == errno.ESRCH: # ESRCH == No such process return False + elif err.errno == errno.EPERM: + # EPERM == operation not permitted + return False else: raise else: diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index edde2e8244..847da8cfb5 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -300,4 +300,4 @@ def call_command(cmd, input=None, timeout=None, useCLocale=True, env=None, quiet logger.debug("command failed in {}s: {}: {}".format(runtime, " ".join(cmd), stderr.rstrip())) raise CalledProcessErrorStderr(proc.returncode, cmd, output=stdout, stderr=stderr) logger.debug("command succeeded in {}s: {}{}".format(runtime, " ".join(cmd), (': ' + stdout.rstrip()) if not quiet else '')) - return stdout \ No newline at end of file + return stdout From e1f915995a0ec99b8fddedb29720e0edc2a2e013 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Mon, 15 Jul 2024 15:28:49 -0400 Subject: [PATCH 15/18] =?UTF-8?q?=F0=9F=94=A7=20allow=20to=20retry=20twice?= =?UTF-8?q?=20for=20oom=20retry.=20Log=20retry=20slurm=20job=20id=20and=20?= =?UTF-8?q?resources?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 43 +++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 8ec513003e..6309fc642b 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -27,12 +27,14 @@ from toil.batchSystems import MemoryString from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, with_retries from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE +from toil.lib.humanize import bytes2human from toil.lib.misc import CalledProcessErrorStderr, call_command logger = logging.getLogger(__name__) MAX_MEMORY = 60 * 1e9 +OUT_OF_MEM_RETRIES = 2 TERMINAL_STATES = { "BOOT_FAIL": BatchJobExitReason.LOST, @@ -72,7 +74,7 @@ def __init__(self, *args, **kwargs): """Create a mapping table for JobIDs to JobNodes.""" super(SlurmBatchSystem, self).__init__(*args, **kwargs) self.Id2Node = {} - self.resourceRetryCount = defaultdict(set) + self.resourceRetryCount = defaultdict(int) def issueBatchJob(self, jobDesc): """Load the jobDesc into the JobID mapping table.""" @@ -148,7 +150,7 @@ def getJobExitCode(self, batchJobID): if exit_reason == BatchJobExitReason.MEMLIMIT: # Retry job with 2x memory if it was killed because of memory jobID = self._getJobID(slurm_job_id) - exit_code = self._customRetry(jobID) + exit_code = self._customRetry(jobID, slurm_job_id) return exit_code def _getJobID(self, slurm_job_id): @@ -158,31 +160,40 @@ def _getJobID(self, slurm_job_id): raise RuntimeError("Unknown slurmJobID, could not be converted") return job_ids_dict[slurm_job_id] - def _customRetry(self, jobID): - """Retry job if killed by slurm due to memlimit problems.""" + def _customRetry(self, jobID, slurm_job_id): + """Increase the job memory 2x and retry, when it's killed by memlimit problems.""" try: jobNode = self.boss.Id2Node[jobID] except KeyError: logger.error("Can't resource retry %s, jobNode not found", jobID) return 1 - retry_type = "memlimit" - if retry_type not in self.boss.resourceRetryCount[jobID]: - # Submit job with 2x memory - jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry" - memory = jobNode.memory * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY + job_retries = self.boss.resourceRetryCount[jobID] + if job_retries < OUT_OF_MEM_RETRIES: + jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry " + str(job_retries) + memory = jobNode.memory * (job_retries + 1) * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY - sbatch_line = self.prepareSubmission(jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName) + sbatch_line = self.prepareSubmission( + jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName + ) logger.debug("Running %r", sbatch_line) - new_slurm_id = with_retries(self.submitJob, sbatch_line) - self.batchJobIDs[jobID] = (new_slurm_id, None) - self.boss.resourceRetryCount[jobID].add(retry_type) - logger.info("Detected job killed by SLURM, attempting retry with 2x memory: %s", new_slurm_id) - + new_slurm_job_id = with_retries(self.submitJob, sbatch_line) + self.batchJobIDs[jobID] = (new_slurm_job_id, None) + self.boss.resourceRetryCount[jobID] += 1 + logger.info( + "Detected job %s killed by SLURM, attempting retry with 2x memory: %s", + slurm_job_id, new_slurm_job_id + ) + logger.info( + "Issued job %s with job batch system ID: " + "%s and cores: %s, disk: %s, and memory: %s", + jobNode, str(new_slurm_job_id), int(jobNode.cores), + bytes2human(jobNode.disk), bytes2human(memory) + ) with self.runningJobsLock: self.runningJobs.add(jobID) else: - logger.error("Can't retry for %s twice: %s", retry_type, jobID) + logger.error("Can't retry job %s for memlimit more than twice") return 1 return None From c71ac1c1dee0e3cca29a38502f3b955ffffa1094 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Mon, 22 Jul 2024 14:20:49 -0400 Subject: [PATCH 16/18] =?UTF-8?q?=F0=9F=94=A7=20move=20instead=20of=20rena?= =?UTF-8?q?me=20in=20fileJobStore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/jobStores/fileJobStore.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index e266a045c5..3e30a2638b 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -221,8 +221,11 @@ def update(self, job): # function is atomic. with open(self._getJobFileName(job.jobStoreID) + ".new", 'wb') as f: pickle.dump(job, f) - # This should be atomic for the file system - os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + try: + os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + except OSError: + # Try move when renaming between different file systems fail. + shutil.move(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) def delete(self, jobStoreID): # The jobStoreID is the relative path to the directory containing the job, @@ -588,7 +591,12 @@ def readStatsAndLogging(self, callback, readAll=False): newName = tempFile.rsplit('.', 1)[0] + '.new' newAbsTempFile = os.path.join(tempDir, newName) # Mark this item as read - os.rename(absTempFile, newAbsTempFile) + try: + os.rename(absTempFile, newAbsTempFile) + except OSError: + # Try move as rename fail between different file systems + shutil.move(absTempFile, newAbsTempFile) + return numberOfFilesProcessed ########################################## From 5acca47e0c409ebe7db892a8ad8b9b3910253615 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Tue, 23 Jul 2024 11:41:07 -0400 Subject: [PATCH 17/18] =?UTF-8?q?=F0=9F=94=A7=20resubmit=20OOM=20falied=20?= =?UTF-8?q?if=20memory=20<=20MAX=5FMEMORY?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 6309fc642b..54d3dbdc2d 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -33,7 +33,7 @@ logger = logging.getLogger(__name__) -MAX_MEMORY = 60 * 1e9 +MAX_MEMORY = 256 * 1e9 # More than 256 GB is hard to get OUT_OF_MEM_RETRIES = 2 TERMINAL_STATES = { @@ -151,6 +151,8 @@ def getJobExitCode(self, batchJobID): # Retry job with 2x memory if it was killed because of memory jobID = self._getJobID(slurm_job_id) exit_code = self._customRetry(jobID, slurm_job_id) + elif exit_reason == BatchJobExitReason.FINISHED: + pass #self._collectMetrics(slurm_job_id) return exit_code def _getJobID(self, slurm_job_id): @@ -169,7 +171,7 @@ def _customRetry(self, jobID, slurm_job_id): return 1 job_retries = self.boss.resourceRetryCount[jobID] - if job_retries < OUT_OF_MEM_RETRIES: + if job_retries < OUT_OF_MEM_RETRIES and jobNode.memory < MAX_MEMORY: jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry " + str(job_retries) memory = jobNode.memory * (job_retries + 1) * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY @@ -274,6 +276,21 @@ def _canonicalize_state(self, state): return state_token + def _collectMetrics(self, job_id): + """Print Slurm Job Metrics to file.""" + slurm_jobs_details = os.path.join(self.boss.config.writeLogs, "slurm_metrics.txt") + args = [ + 'sacct', + '-n' if os.path.isfile(slurm_jobs_details) else '', + '-X', + '-j', + str(job_id), + '--format=JobID,JobName%20,AllocCPUS,State,ExitCode,Start,End,Elapsed,NodeList,ReqMem,MaxRSS,MaxVMSize' + '>>', + slurm_jobs_details + ] + call_command(args, quiet=True) + def _getJobDetailsFromSacct(self, job_id_list): """ Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`. @@ -284,6 +301,7 @@ def _getJobDetailsFromSacct(self, job_id_list): job_ids = ",".join(str(id) for id in job_id_list) args = ['sacct', '-n', # no header + '-X', # Only main job '-j', job_ids, # job '--format', 'JobIDRaw,State,ExitCode', # specify output columns '-P', # separate columns with pipes From 5f8373cf4d16030fbff4c58bf8541a58696eee64 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 2 Aug 2024 14:21:27 -0400 Subject: [PATCH 18/18] =?UTF-8?q?=F0=9F=90=9B=20catch=20slurm=20sacct=20er?= =?UTF-8?q?rors=20when=20not=20available?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 54d3dbdc2d..a4be9eede2 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -306,7 +306,10 @@ def _getJobDetailsFromSacct(self, job_id_list): '--format', 'JobIDRaw,State,ExitCode', # specify output columns '-P', # separate columns with pipes '-S', '1970-01-01'] # override start time limit - stdout = call_command(args, quiet=True) + try: + stdout = call_command(args, quiet=True) + except CalledProcessErrorStderr as error: + raise error # Collect the job statuses in a dict; key is the job-id, value is a tuple containing # job state and exit status. Initialize dict before processing output of `sacct`.