diff --git a/README.md b/README.md index 5dea792..c0379e8 100644 --- a/README.md +++ b/README.md @@ -74,3 +74,12 @@ python my_script.py 9 Jobs that did not terminate properly, for example, it exceeded the walltime, can be resumed using the {batch_id} given to you upon launch. Of course, all this assuming your script is resumable. *Note: Jobs are always in a batch, even if it's a batch of one.* + +### SLURM clusters + +Smartdispatch can also run on SLURM clusters. +All features like `--gpusPerNode` or `--coresPerNode` are supported. +However you need to pass SLURM specific features using --sbatchFlags. For simplicity, --sbatchFlags supports short and long option definitions only with the following syntax: +-Cgpu6gb or --constraint=gpu6gb +For comparison, this would be invalid: +-C gpu6gb or --constraint gpu6gb. diff --git a/scripts/sd-launch-pbs b/scripts/sd-launch-pbs index 0b38733..f0190bc 100644 --- a/scripts/sd-launch-pbs +++ b/scripts/sd-launch-pbs @@ -7,6 +7,10 @@ import logging from smartdispatch import launch_jobs from smartdispatch import utils + +logger = logging.getLogger() + + LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS" CLUSTER_NAME = utils.detect_cluster() LAUNCHER = utils.get_launcher(CLUSTER_NAME) @@ -23,12 +27,25 @@ def main(): def parse_arguments(): parser = argparse.ArgumentParser() - parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') + parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub') parser.add_argument('pbs', type=str, help='PBS filename to launch.') parser.add_argument('path_job', type=str, help='Path to the job folder.') + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print informations about the process.\n" + " -v: INFO\n" + " -vv: DEBUG") + args = parser.parse_args() + if args.verbose == 0: + logging.basicConfig(level=logging.WARNING) + elif args.verbose == 1: + logging.basicConfig(level=logging.INFO) + elif args.verbose >= 2: + logging.basicConfig(level=logging.DEBUG) + return args diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 86904fa..79768a0 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -1,9 +1,10 @@ #!/usr/bin/env python2 # -*- coding: utf-8 -*- +import argparse +import logging import os import sys -import argparse import time as t from os.path import join as pjoin from textwrap import dedent @@ -16,9 +17,12 @@ from smartdispatch import get_available_queues from smartdispatch import launch_jobs from smartdispatch import utils -import logging import smartdispatch + +logger = logging.getLogger() + + LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS" CLUSTER_NAME = utils.detect_cluster() AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME) @@ -29,25 +33,52 @@ TIMEOUT_EXIT_CODE = 124 AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))' # By default, 60s before the maximum walltime. AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER) AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"' -AUTORESUME_PROLOG = 'WORKER_PIDS=""' +AUTORESUME_PROLOG = """ +WORKER_PIDS="" +VERBOSE={verbose} +""" AUTORESUME_EPILOG = """\ NEED_TO_RESUME=false +if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME + echo WORKER_PIDS=$WORKER_PIDS +fi for WORKER_PID in $WORKER_PIDS; do + if [ $VERBOSE = true ]; then + echo WORKER_PID=$WORKER_PID + fi wait "$WORKER_PID" RETURN_CODE=$? + if [ $VERBOSE = true ]; then + echo "RETURN_CODE is $RETURN_CODE while " \ + "timeout_exit_code is {timeout_exit_code}" + fi if [ $RETURN_CODE -eq {timeout_exit_code} ]; then NEED_TO_RESUME=true fi + if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME + fi done +if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME +fi if [ "$NEED_TO_RESUME" = true ]; then echo "Autoresuming using: {{launcher}} $PBS_FILENAME" - sd-launch-pbs --launcher {{launcher}} $PBS_FILENAME {{path_job}} + if [ $VERBOSE = true]; then + VERBOSE_OPTION="-vv" + else + VERBOSE_OPTION="" + fi + + sd-launch-pbs $VERBOSE_OPTION --launcher {{launcher}} $PBS_FILENAME {{path_job}} fi """.format(timeout_exit_code=TIMEOUT_EXIT_CODE) def main(): # Necessary if we want 'logging.info' to appear in stderr. + # TODO: Could we avoid this, can -v (verbose) be sufficiant? logging.root.setLevel(logging.INFO) args = parse_arguments() @@ -163,18 +194,25 @@ def main(): prolog = [] epilog = ['wait'] if args.autoresume: - prolog = [AUTORESUME_PROLOG] - epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)] + prolog = [ + AUTORESUME_PROLOG.format(verbose=str(args.verbose >= 2).lower())] + epilog = [ + AUTORESUME_EPILOG.format( + launcher=LAUNCHER if args.launcher is None else args.launcher, + path_job=path_job)] job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job) - + # generating default names per each jobs in each batch for pbs_id, pbs in enumerate(job_generator.pbs_list): proper_size_name = utils.jobname_generator(jobname, pbs_id) pbs.add_options(N=proper_size_name) - + if args.pbsFlags is not None: job_generator.add_pbs_flags(args.pbsFlags.split(' ')) + + if args.sbatchFlags is not None: + job_generator.add_sbatch_flags(args.sbatchFlags.split(' ')) pbs_filenames = job_generator.write_pbs_files(path_job_commands) # Launch the jobs @@ -187,10 +225,17 @@ def main(): def parse_arguments(): parser = argparse.ArgumentParser() + + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print informations about the process.\n" + " -v: INFO\n" + " -vv: DEBUG") + parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)') parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.') parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.') - parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') + parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub') parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.') parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.') # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).') @@ -206,6 +251,7 @@ def parse_arguments(): parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands") parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"') + parser.add_argument('--sbatchFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. Ex:--sbatchFlags="--qos=high --ofile.out"') subparsers = parser.add_subparsers(dest="mode") launch_parser = subparsers.add_parser('launch', help="Launch jobs.") @@ -226,6 +272,13 @@ def parse_arguments(): if args.coresPerCommand < 1: parser.error("coresPerNode must be at least 1") + if args.verbose == 0: + logging.basicConfig(level=logging.WARNING) + elif args.verbose == 1: + logging.basicConfig(level=logging.INFO) + elif args.verbose >= 2: + logging.basicConfig(level=logging.DEBUG) + return args diff --git a/smartdispatch/config/graham.json b/smartdispatch/config/graham.json new file mode 100644 index 0000000..edf86f4 --- /dev/null +++ b/smartdispatch/config/graham.json @@ -0,0 +1,46 @@ +{ + "cpu": { + "ram": 126, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 800 + }, + "large_mem": { + "ram": 252, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 56 + }, + "large_mem_500": { + "ram": 504, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 24 + }, + "large_mem_3000": { + "ram": 3024, + "cores": 64, + "max_walltime": "?", + "gpus": 0, + "nodes": 3 + }, + "gpu_1": { + "ram": 63, + "modules": ["cuda/8.0.44"], + "cores": 16, + "max_walltime": "648:00:00", + "gpus": 1, + "nodes": 320 + }, + "gpu_2": { + "ram": 126, + "modules": ["cuda/8.0.44"], + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 2, + "nodes": 160 + } +} diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index d2db23c..9465c8b 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -15,6 +15,8 @@ def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params= return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path) elif cluster_name == "hades": return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path) + elif utils.get_launcher(cluster_name) == "sbatch": + return SlurmJobGenerator(queue, commands, prolog, epilog, command_params, base_path) return JobGenerator(queue, commands, prolog, epilog, command_params, base_path) @@ -73,6 +75,23 @@ def add_pbs_flags(self, flags): pbs.add_resources(**resources) pbs.add_options(**options) + def add_sbatch_flags(self, flags): + options = {} + + for flag in flags: + split = flag.find('=') + if flag.startswith('--'): + if split == -1: + raise ValueError("Invalid SBATCH flag ({}), no '=' character found' ".format(flag)) + options[flag[:split].lstrip("-")] = flag[split+1:] + elif flag.startswith('-') and split == -1: + options[flag[1:2]] = flag[2:] + else: + raise ValueError("Invalid SBATCH flag ({}, is it a PBS flag?)".format(flag)) + + for pbs in self.pbs_list: + pbs.add_sbatch_options(**options) + def _generate_base_pbs(self): """ Generates PBS files allowing the execution of every commands on the given queue. """ nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command @@ -171,3 +190,72 @@ def _add_cluster_specific_rules(self): for pbs in self.pbs_list: # Remove forbidden ppn option. Default is 2 cores per gpu. pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) + + +class SlurmJobGenerator(JobGenerator): + + def __init__(self, *args, **kwargs): + super(SlurmJobGenerator, self).__init__(*args, **kwargs) + + def _adapt_options(self, pbs): + # Remove queue, there is no queue in Slurm + if "-q" in pbs.options: + del pbs.options["-q"] + + # SBATCH does not interpret options, they can only contain %A if we + # want to include job's name and %a to include job array's index + for option in ['-o', '-e']: + pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A', + pbs.options[option]) + + # Convert to Slurm's --export + # + # Warning: Slurm does **not** export variables defined locally such as + # variables defined along the command line. For ex: + # PBS_FILENAME=something sbatch --export=ALL somefile.sh + # would *not* export PBS_FILENAME to the script. + if pbs.options.pop('-V', None) is not None: + pbs.add_sbatch_options(export='ALL') + + def _adapt_commands(self, pbs): + pass + + def _adapt_resources(self, pbs): + # Set proper option for gpus + match = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']) + if match: + gpus = match.group(1) + pbs.add_resources(naccelerators=gpus) + pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", + pbs.resources['nodes']) + + # Set proper option for cpus + match = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']) + if match: + ppn = match.group(1) + pbs.add_resources(ncpus=ppn) + pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) + + def _adapt_variable_names(self, pbs): + for command_id, command in enumerate(pbs.commands): + pbs.commands[command_id] = command = re.sub( + "\$PBS_JOBID", "$SLURM_JOB_ID", command) + # NOTE: SBATCH_TIMELIMIT is **not** an official slurm environment + # variable, it needs to be set in the script. + pbs.commands[command_id] = command = re.sub( + "\$PBS_WALLTIME", "$SBATCH_TIMELIMIT", command) + + def _adapt_prolog(self, pbs): + # Set SBATCH_TIMELIMIT in the prolog, hence, before any code from + # commands and epilog. + pbs.add_to_prolog( + "SBATCH_TIMELIMIT=%s" % + utils.walltime_to_seconds(pbs.resources["walltime"])) + + def _add_cluster_specific_rules(self): + for pbs in self.pbs_list: + self._adapt_options(pbs) + self._adapt_resources(pbs) + self._adapt_variable_names(pbs) + self._adapt_prolog(pbs) + self._adapt_commands(pbs) diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index f8d7982..a5a2f58 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -35,6 +35,8 @@ def __init__(self, queue_name, walltime): self.options = OrderedDict() self.add_options(q=queue_name) + self.sbatch_options = OrderedDict() + # Declares that all environment variables in the qsub command's environment are to be exported to the batch job. self.add_options(V="") @@ -62,6 +64,22 @@ def add_options(self, **options): self.options["-" + option_name] = option_value + def add_sbatch_options(self, **options): + """ Adds sbatch options to this PBS file. + + Parameters + ---------- + **options : dict + each key is the name of a SBATCH option + """ + + for option_name, option_value in options.items(): + if len(option_name) == 1: + dash = "-" + else: + dash = "--" + self.sbatch_options[dash + option_name] = option_value + def add_resources(self, **resources): """ Adds resources to this PBS file. @@ -144,7 +162,9 @@ def save(self, filename): specified where to save this PBS file """ with open(filename, 'w') as pbs_file: + self.prolog.insert(0, "PBS_FILENAME=%s" % filename) pbs_file.write(str(self)) + self.prolog.pop(0) def __str__(self): pbs = [] @@ -159,6 +179,12 @@ def __str__(self): for resource_name, resource_value in self.resources.items(): pbs += ["#PBS -l {0}={1}".format(resource_name, resource_value)] + for option_name, option_value in self.sbatch_options.items(): + if option_name.startswith('--'): + pbs += ["#SBATCH {0}={1}".format(option_name, option_value)] + else: + pbs += ["#SBATCH {0} {1}".format(option_name, option_value)] + pbs += ["\n# Modules #"] for module in self.modules: pbs += ["module load " + module] diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 7214d9d..d735c23 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -1,16 +1,24 @@ from nose.tools import assert_true, assert_false, assert_equal, assert_raises - import os -import tempfile import shutil +import tempfile +import unittest + +try: + from mock import patch +except ImportError: + from unittest.mock import patch + from smartdispatch.queue import Queue from smartdispatch.job_generator import JobGenerator, job_generator_factory from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator +from smartdispatch.job_generator import SlurmJobGenerator class TestJobGenerator(object): pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30'] + sbatch_flags = ['--qos=high', '--output=file.out', '-Cminmemory'] def setUp(self): self.testing_dir = tempfile.mkdtemp() @@ -127,6 +135,32 @@ def test_add_pbs_flags_invalid(self): def test_add_pbs_flags_invalid_resource(self): assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee') + def _test_add_sbatch_flags(self, flags): + job_generator = JobGenerator(self.queue, self.commands) + job_generator.add_sbatch_flags(flags) + options = [] + + for flag in flags: + if flag.startswith('--'): + options += [flag] + elif flag.startswith('-'): + options += [(flag[:2] + ' ' + flag[2:]).strip()] + + for pbs in job_generator.pbs_list: + pbs_str = pbs.__str__() + for flag in options: + assert_equal(pbs_str.count(flag), 1) + + def test_add_sbatch_flags(self): + for flag in self.sbatch_flags: + yield self._test_add_sbatch_flags, [flag] + + yield self._test_add_sbatch_flags, [flag] + + def test_add_sbatch_flag_invalid(self): + invalid_flags = ["--qos high", "gpu", "-lfeature=k80"] + for flag in invalid_flags: + assert_raises(ValueError, self._test_add_sbatch_flags, flag) class TestGuilliminQueue(object): @@ -243,6 +277,87 @@ def test_pbs_split_2_job_nb_commands(self): assert_true("ppn=2" in str(self.pbs8[1])) +class TestSlurmQueue(unittest.TestCase): + + def setUp(self): + self.walltime = "10:00" + self.cores = 42 + self.mem_per_node = 32 + self.nb_cores_per_node = 1 + self.nb_gpus_per_node = 2 + self.queue = Queue("slurm", "mila", self.walltime, self.nb_cores_per_node, self.nb_gpus_per_node, self.mem_per_node) + + self.nb_of_commands = 4 + self.commands = ["echo %d; echo $PBS_JOBID; echo $PBS_WALLTIME" % i + for i in range(self.nb_of_commands)] + + self.prolog = ["echo prolog"] + self.epilog = ["echo $PBS_FILENAME"] + job_generator = SlurmJobGenerator( + self.queue, self.commands, prolog=self.prolog, epilog=self.epilog) + self.pbs = job_generator.pbs_list + + with patch.object(SlurmJobGenerator,'_add_cluster_specific_rules', side_effect=lambda: None): + dummy_generator = SlurmJobGenerator( + self.queue, self.commands, prolog=self.prolog, epilog=self.epilog) + self.dummy_pbs = dummy_generator.pbs_list + + def test_ppn_ncpus(self): + assert_true("ppn" in str(self.dummy_pbs[0])) + assert_true("ncpus" not in str(self.dummy_pbs[0])) + assert_true("ppn" not in str(self.pbs[0])) + assert_true("ncpus" in str(self.pbs[0])) + + def test_gpus_naccelerators(self): + assert_true("gpus" in str(self.dummy_pbs[0])) + assert_true("naccelerators" not in str(self.dummy_pbs[0])) + assert_true("gpus" not in str(self.pbs[0])) + assert_true("naccelerators" in str(self.pbs[0])) + + def test_queue(self): + assert_true("PBS -q" in str(self.dummy_pbs[0])) + assert_true("PBS -q" not in str(self.pbs[0])) + + def test_export(self): + assert_true("#PBS -V" in str(self.dummy_pbs[0])) + assert_true("#PBS -V" not in str(self.pbs[0])) + assert_true("#SBATCH --export=ALL" in str(self.pbs[0])) + + def test_outputs(self): + for std in ['-e', '-o']: + value = self.dummy_pbs[0].options[std] + assert_true("$PBS_JOBID" in value, + "$PBS_JOBID should be present in option %s: %s" % + (std, value)) + + value = self.pbs[0].options[std] + assert_true("$PBS_JOBID" not in value, + "$PBS_JOBID not should be present in option %s: %s" % + (std, value)) + assert_true("%A" in value, + "%%A should be present in option %s: %s" % + (std, value)) + + def test_job_id_env_var(self): + self.assertIn("$PBS_JOBID", str(self.dummy_pbs[0])) + self.assertNotIn("$SLURM_JOB_ID", str(self.dummy_pbs[0])) + + self.assertNotIn("$PBS_JOBID", str(self.pbs[0])) + self.assertIn("$SLURM_JOB_ID", str(self.pbs[0])) + + def test_walltime_env_var(self): + self.assertIn("$PBS_WALLTIME", str(self.dummy_pbs[0])) + self.assertNotIn("$SBATCH_TIMELIMIT", str(self.dummy_pbs[0])) + + self.assertNotIn("$PBS_WALLTIME", str(self.pbs[0])) + self.assertIn("$SBATCH_TIMELIMIT", str(self.pbs[0])) + + self.assertNotIn("SBATCH_TIMELIMIT=", + "\n".join(self.dummy_pbs[0].prolog)) + self.assertIn("SBATCH_TIMELIMIT=", + "\n".join(self.pbs[0].prolog)) + + class TestJobGeneratorFactory(object): def setUp(self): diff --git a/smartdispatch/tests/test_pbs.py b/smartdispatch/tests/test_pbs.py index 6088052..fcf8b59 100644 --- a/smartdispatch/tests/test_pbs.py +++ b/smartdispatch/tests/test_pbs.py @@ -1,7 +1,6 @@ from nose.tools import assert_true, assert_equal, assert_raises from numpy.testing import assert_array_equal - from smartdispatch.pbs import PBS import unittest import tempfile @@ -38,6 +37,16 @@ def test_add_options(self): assert_equal(self.pbs.options["-A"], "option2") assert_equal(self.pbs.options["-B"], "option3") + def test_add_sbatch_options(self): + self.pbs.add_sbatch_options(a="value1") + assert_equal(self.pbs.sbatch_options["-a"], "value1") + assert_equal(len(self.pbs.sbatch_options), 1) + self.pbs.sbatch_options.pop("-a") + self.pbs.add_sbatch_options(option1="value2", option2="value3") + assert_equal(self.pbs.sbatch_options["--option1"], "value2") + assert_equal(self.pbs.sbatch_options["--option2"], "value3") + assert_equal(len(self.pbs.sbatch_options), 2) + def test_add_resources(self): assert_equal(len(self.pbs.resources), 1) assert_equal(self.pbs.resources["walltime"], self.walltime) @@ -135,4 +144,5 @@ def test_str(self): def test_save(self): pbs_filename = os.path.join(self.testing_dir, "pbs.sh") self.pbs.save(pbs_filename) + self.pbs.prolog.insert(0, "PBS_FILENAME=%s" % pbs_filename) assert_equal(str(self.pbs), open(pbs_filename).read()) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 4eaef4e..9306faa 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -1,11 +1,89 @@ # -*- coding: utf-8 -*- -import unittest +import random +import string +import subprocess -from smartdispatch import utils +import unittest +try: + from mock import patch +except ImportError: + from unittest.mock import patch from nose.tools import assert_equal, assert_true + from numpy.testing import assert_array_equal +from smartdispatch import utils + + +class TestWalltimeToSeconds(unittest.TestCase): + def setUp(self): + self.format = dict( + days=random.randint(0, 10), + hours=random.randint(0, 23), + minutes=random.randint(0, 59), + seconds=random.randint(0, 59)) + + def _compute_seconds(self, days=0, hours=0, minutes=0, seconds=0): + return (((((days * 24) + hours) * 60) + minutes) * 60) + seconds + + def test_compute_seconds(self): + + date_format = dict( + days=2, + hours=3, + minutes=5, + seconds=7) + + total_seconds = 183907 + + self.assertEqual(self._compute_seconds(**date_format), total_seconds) + self.assertEqual(utils.walltime_to_seconds( + "{days}:{hours}:{minutes}:{seconds}".format(**date_format)), + total_seconds) + + def test_ddhhmmss(self): + seconds = utils.walltime_to_seconds( + "{days}:{hours}:{minutes}:{seconds}".format(**self.format)) + self.assertEqual(seconds, self._compute_seconds(**self.format)) + + def test_hhmmss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + + seconds = utils.walltime_to_seconds( + "{hours}:{minutes}:{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_mmss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + truncated_format.pop("hours") + + seconds = utils.walltime_to_seconds( + "{minutes}:{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_ss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + truncated_format.pop("hours") + truncated_format.pop("minutes") + + seconds = utils.walltime_to_seconds( + "{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_too_much_columns(self): + with self.assertRaises(ValueError): + seconds = utils.walltime_to_seconds( + "1:{days}:{hours}:{minutes}:{seconds}".format(**self.format)) + + def test_with_text(self): + with self.assertRaises(ValueError): + seconds = utils.walltime_to_seconds( + "{days}hoho:{hours}:{minutes}:{seconds}".format(**self.format)) + class PrintBoxedTests(unittest.TestCase): @@ -49,3 +127,102 @@ def test_slugify(): for arg, expected in testing_arguments: assert_equal(utils.slugify(arg), expected) + + +command_output = """\ +Server Max Tot Que Run Hld Wat Trn Ext Com Status +---------------- --- --- --- --- --- --- --- --- --- ---------- +gpu-srv1.{} 0 1674 524 121 47 0 0 22 960 Idle +""" + +slurm_command = """\ + Cluster ControlHost ControlPort RPC Share GrpJobs GrpTRES GrpSubmit MaxJobs MaxTRES MaxSubmit MaxWall QOS Def QOS +---------- --------------- ------------ ----- --------- ------- ------------- --------- ------- ------------- --------- ----------- -------------------- --------- + {} 132.204.24.224 6817 7680 1 normal +""" + + +class ClusterIdentificationTest(unittest.TestCase): + server_names = ["hades", "m", "guil", "helios", "hades"] + clusters = ["hades", "mammouth", "guillimin", "helios"] + command_output = command_output + + def __init__(self, *args, **kwargs): + super(ClusterIdentificationTest, self).__init__(*args, **kwargs) + self.detect_cluster = utils.detect_cluster + + def test_detect_cluster(self): + + with patch('smartdispatch.utils.Popen') as MockPopen: + mock_process = MockPopen.return_value + for name, cluster in zip(self.server_names, self.clusters): + mock_process.communicate.return_value = ( + self.command_output.format(name), "") + self.assertEquals(self.detect_cluster(), cluster) + + +class SlurmClusterIdentificationTest(ClusterIdentificationTest): + server_names = clusters = ["graham", "cedar", "mila"] + command_output = slurm_command + + def __init__(self, *args, **kwargs): + super(SlurmClusterIdentificationTest, self).__init__(*args, **kwargs) + self.detect_cluster = utils.get_slurm_cluster_name + + +class TestGetLauncher(unittest.TestCase): + longMessage = True + + N_RANDOM = 10 + RANDOM_SIZE = (2, 10) + + CLUSTER_NAMES = ["hades", "mammouth", "guillimin", "helios"] + + def _get_random_string(self): + return ''.join([random.choice(string.lowercase) + for i in xrange(random.randint(*self.RANDOM_SIZE))]) + + def _assert_launcher(self, desired, cluster_name): + if cluster_name in utils.MSUB_CLUSTERS: + desired = "msub" + + self.assertEqual( + desired, utils.get_launcher(cluster_name), + msg="for cluster %s" % cluster_name) + + def test_get_launcher(self): + self.assertEqual("msub", utils.get_launcher("helios")) + + # For supported launcher and random ones... + with patch('smartdispatch.utils.distutils') as mock_distutils: + + for launcher in utils.SUPPORTED_LAUNCHERS: + + mock_distutils.spawn.find_executable.side_effect = ( + lambda command: launcher if launcher == command else None) + + for cluster_name in self.CLUSTER_NAMES: + self._assert_launcher(launcher, cluster_name) + + for idx in range(self.N_RANDOM): + self._assert_launcher(launcher, self._get_random_string()) + + # Test if there was no *supported* launcher on the system + launcher = self._get_random_string() + mock_distutils.spawn.find_executable.side_effect = ( + lambda command: launcher if launcher == command else None) + + for cluster_name in self.CLUSTER_NAMES: + self._assert_launcher(None, cluster_name) + + for idx in range(self.N_RANDOM): + self._assert_launcher(None, self._get_random_string()) + + # Test if command_is_available only returns None + mock_distutils.spawn.find_executable.return_value = None + + for cluster_name in self.CLUSTER_NAMES: + self._assert_launcher(None, cluster_name) + + for idx in range(self.N_RANDOM): + self._assert_launcher(None, self._get_random_string()) diff --git a/smartdispatch/tests/verify_cedar.py b/smartdispatch/tests/verify_cedar.py new file mode 100644 index 0000000..94e4f7f --- /dev/null +++ b/smartdispatch/tests/verify_cedar.py @@ -0,0 +1,34 @@ +import os +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyCedarCluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 24 + GPUS_PER_NODE = 4 + + def get_arguments(self, **kwargs): + + kwargs = super(VerifyCedarCluster, self).get_arguments(**kwargs) + + if kwargs["gpusPerCommand"] == 0: + account = os.environ.get("CPU_SLURM_ACCOUNT") + else: + account = os.environ.get("GPU_SLURM_ACCOUNT") + + if "sbatchFlags" not in kwargs or len(kwargs["sbatchFlags"]) == 0: + kwargs["sbatchFlags"] = "--account=" + account + else: + kwargs["sbatchFlags"] += " --account=" + account + + return kwargs + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyCedarCluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) diff --git a/smartdispatch/tests/verify_graham.py b/smartdispatch/tests/verify_graham.py new file mode 100644 index 0000000..c95e7c0 --- /dev/null +++ b/smartdispatch/tests/verify_graham.py @@ -0,0 +1,33 @@ +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyGrahamCluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 32 + GPUS_PER_NODE = 2 + + def get_arguments(self, **kwargs): + + kwargs = super(VerifyGrahamCluster, self).get_arguments(**kwargs) + + if kwargs["gpusPerCommand"] == 0: + account = os.environ.get("CPU_SLURM_ACCOUNT") + else: + account = os.environ.get("GPU_SLURM_ACCOUNT") + + if "sbatchFlags" not in kwargs or len(kwargs["sbatchFlags"]) == 0: + kwargs["sbatchFlags"] = "--account=" + account + else: + kwargs["sbatchFlags"] += " --account=" + account + + return kwargs + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyGrahamCluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) diff --git a/smartdispatch/tests/verify_mila.py b/smartdispatch/tests/verify_mila.py new file mode 100644 index 0000000..3d14815 --- /dev/null +++ b/smartdispatch/tests/verify_mila.py @@ -0,0 +1,38 @@ +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyMILACluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 8 + GPUS_PER_NODE = 2 + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE) + + return kwargs + + def verify_simple_task_with_constraints(self, **kwargs): + + set_defaults( + kwargs, + gpusPerCommand=1, + sbatchFlags='"-C\"gpu12gb\""') + + self.base_verification(**kwargs) + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyMILACluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) diff --git a/smartdispatch/tests/verify_slurm_cluster.py b/smartdispatch/tests/verify_slurm_cluster.py new file mode 100644 index 0000000..a3c1aa6 --- /dev/null +++ b/smartdispatch/tests/verify_slurm_cluster.py @@ -0,0 +1,477 @@ +import datetime +import inspect +import functools +import getpass +import glob +import os +import pdb +import subprocess +import sys +import time +import traceback + +WALLTIME = 60 # seconds + +command_string = """\ +#!/usr/bin/env /bin/bash + +###################### +# Begin work section # +###################### + +echo "My SLURM_JOB_ID:" $SLURM_JOB_ID +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID + +echo called with option "$1" + +export HOME=`getent passwd $USER | cut -d':' -f6` +source ~/.bashrc +export THEANO_FLAGS=... +export PYTHONUNBUFFERED=1 +echo Running on $HOSTNAME + +if [ -e "paused$1.log" ] +then + echo "resuming $1" + touch resumed$1.log +else + echo "running $1 from scratch" + touch running$1.log +fi + + +# Test GPUs +echo "echo CUDA_VISIBLE_DEVICES" +echo $CUDA_VISIBLE_DEVICES +echo + +nvidia-smi + +# Test CPUs +# How? + +# Test resume +if [ ! -e "paused$1.log" ] +then + touch paused$1.log + echo "sleeping $1 %(sleep)s seconds" + sleep %(sleep)ss +fi + +echo completed $1 +mv paused$1.log completed$1.log +""" + + +def set_defaults(dictionary, **kwargs): + + for item, value in kwargs.iteritems(): + dictionary.setdefault(item, value) + + +def strfdelta(tdelta, fmt): + """ + From https://stackoverflow.com/a/8907269 + """ + + d = {} + d["hours"], rem = divmod(tdelta.seconds, 3600) + d["hours"] += tdelta.days * 24 + d["minutes"], d["seconds"] = divmod(rem, 60) + return fmt % d + + +def infer_verification_name(): + + for stack in inspect.stack(): + if stack[3].startswith("verify_"): + return stack[3] + + raise RuntimeError("Cannot infer verification name:\n %s" % + "\n".join(str(t) for t in traceback.format_stack())) + + +def build_argv(coresPerCommand, gpusPerCommand, walltime, coresPerNode, + gpusPerNode, batchName=None, commandsFile=None, + doNotLaunch=False, autoresume=False, pool=None, + sbatchFlags=None): + + if batchName is None: + batchName = infer_verification_name() + + argv = """ +-vv +--queueName dummy +--batchName %(batchName)s --walltime %(walltime)s +--coresPerCommand %(coresPerCommand)s +--gpusPerCommand %(gpusPerCommand)s +--coresPerNode %(coresPerNode)s +--gpusPerNode %(gpusPerNode)s + """ % dict(batchName=batchName, + walltime=strfdelta( + datetime.timedelta(seconds=walltime), + "%(hours)02d:%(minutes)02d:%(seconds)02d"), + coresPerCommand=coresPerCommand, + gpusPerCommand=gpusPerCommand, + coresPerNode=coresPerNode, + gpusPerNode=gpusPerNode) + + # File containing commands to launch. Each command must + # be on a seperate line. (Replaces commandAndOptions) + if commandsFile: + argv += " --commandsFile " + commandsFile + + # Generate all the files without launching the job. + if doNotLaunch: + argv += " --doNotLaunch" + + # Requeue the job when the running time hits the maximum + # walltime allowed on the cluster. Assumes that commands + # are resumable. + if autoresume: + argv += " --autoresume" + + # Number of workers that will be consuming commands. + # Default: Nb commands + if pool: + argv += " --pool " + pool + + # ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. + # Ex:--sbatchFlags="--qos=high --ofile.out" + if sbatchFlags: + argv += " --sbatchFlags=" + sbatchFlags + + return argv.replace("\n", " ") + + +def get_squeue(): + command = ("squeue -u %(username)s" % + dict(username=getpass.getuser())) + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate() + return stdout + + +def try_to_remove_file(filename_template, expected_number): + file_names = glob.glob(filename_template) + try: + i = 0 + for file_name in file_names: + i += 1 + os.remove(file_name) + except OSError as e: + print str(e) + + if i != expected_number: + print "Error: Expected %d files, found %d" % (expected_number, i) + else: + print "OK: All %d files %s were found:\n%s" % ( + expected_number, filename_template, + "\n".join(sorted(file_names))) + + +def minimum_requirement(attribute_name, minimum_value): + + def decorator(method): + + @functools.wraps(method) + def call(self, *args, **kwargs): + + # Method was called from another verification + try: + verification_name = infer_verification_name() + # Method was called directly + except RuntimeError: + verification_name = method.__name__ + + if not hasattr(self, attribute_name): + raise ValueError("Invalid requirement, object %s does not " + "have attribute %s" % + (self.__class__.__name__, attribute_name)) + + if getattr(self, attribute_name) >= minimum_value: + return method(self, *args, **kwargs) + else: + print ("%s does not have enough %s: %d." + "Skipping %s." % + (self.__class__.__name__, attribute_name, minimum_value, + verification_name)) + return None + + return call + + return decorator + + +class VerifySlurmCluster(object): + + WALLTIME = 60 + CORES_PER_NODE = 8 + GPUS_PER_NODE = 0 + + def __init__(self, debug=False, no_fork=False): + self.debug = debug + self.no_fork = no_fork + + def get_verification_methods(self, filtered_by=None): + methods = inspect.getmembers(self, predicate=inspect.ismethod) + + def filtering(item): + key = item[0] + + if not key.startswith("verify_"): + return False + elif filtered_by is not None and key not in filtered_by: + return False + + return True + + return dict(filter(filtering, methods)) + + def run_verifications(self, filtered_by=None): + if filtered_by is not None and len(filtered_by) == 0: + filtered_by = None + + verification_methods = self.get_verification_methods(filtered_by) + processes = [] + for verification_name, verification_fct in \ + verification_methods.iteritems(): + print "========%s" % ("=" * len(verification_name)) + print "Running %s" % verification_name + print "========%s\n\n" % ("=" * len(verification_name)) + + if self.debug or self.no_fork: + verification_fct() + else: + # fork the process in a new dir and new stdout, stderr + verification_dir = os.path.join( + os.getcwd(), self.__class__.__name__, verification_name) + + if not os.path.isdir(verification_dir): + os.makedirs(verification_dir) + + stdout = open(os.path.join(verification_dir, + "validation.out"), 'w') + stderr = open(os.path.join(verification_dir, + "validation.err"), 'w') + + popen = subprocess.Popen( + "/bin/bash", + shell=True, + stdin=subprocess.PIPE, + stdout=stdout, + stderr=stderr) + + popen.stdin.write("cd %s;" % verification_dir) + + script_path = os.path.join( + os.getcwd(), inspect.getfile(self.__class__)) + popen.stdin.write( + "python %s --no-fork %s;" % ( + script_path, verification_name)) + print "python %s --no-fork %s;" % ( + script_path, verification_name) + + processes.append(popen) + + for popen in processes: + # popen.communicate() + popen.terminate() + + def run_test(self, argv, command_string, command_arguments=""): + FILE_NAME = "test.sh" + + with open("test.sh", "w") as text_file: + text_file.write(command_string) + + command = ("smart-dispatch %s launch bash %s %s" % + (argv, FILE_NAME, command_arguments)) + print "running test with command: " + print command + + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate() + + print "\nstdout:" + print stdout.decode() + + print "\nstderr:" + print stderr.decode() + return stdout.split("\n")[-2].strip() + + def validate(self, root_dir, argv, squeue_wait, nb_of_commands=1, + resume=False): + + print "\nValidating arguments:" + print argv + + stdout = get_squeue() + number_of_process = stdout.count("\n") - 1 + + while number_of_process > 0: + root = os.path.join(root_dir, "commands") + for file_path in os.listdir(root): + if file_path.endswith(".sh"): + print file_path + print open(os.path.join(root, file_path), 'r').read() + + print stdout + print "Waiting %d seconds" % squeue_wait + time.sleep(squeue_wait) + stdout = get_squeue() + number_of_process = stdout.count("\n") - 1 + print stdout + print number_of_process + + try_to_remove_file("running*.log", expected_number=nb_of_commands) + try_to_remove_file("resumed*.log", + expected_number=nb_of_commands * int(resume)) + try_to_remove_file("completed*.log", expected_number=nb_of_commands) + + root = os.path.join(root_dir, "logs") + for file_path in reversed(sorted(os.listdir(root))): + if file_path.endswith(".err") or file_path.endswith(".out"): + print file_path + print open(os.path.join(root, file_path), 'r').read() + if self.debug: + pdb.set_trace() + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE) + + return kwargs + + def base_verification(self, sleep_time=0, command_arguments="", + resume=False, squeue_wait=None, nb_of_commands=1, + **kwargs): + + if squeue_wait is None and self.debug: + squeue_wait = sleep_time + 5 + elif squeue_wait is None: + squeue_wait = self.WALLTIME * 2 + + arguments = self.get_arguments(**kwargs) + argv = build_argv(**arguments) + + root_dir = self.run_test(argv, command_string % dict(sleep=sleep_time), + command_arguments=command_arguments) + self.validate(root_dir, argv, squeue_wait, nb_of_commands, + resume=resume) + + def verify_simple_task(self, **kwargs): + self.base_verification(**kwargs) + + def verify_simple_task_with_one_gpu(self, **kwargs): + set_defaults( + kwargs, + gpusPerCommand=1, + gpusPerNode=1) + + self.verify_simple_task(**kwargs) + + @minimum_requirement("GPUS_PER_NODE", 2) + def verify_simple_task_with_many_gpus(self, **kwargs): + + for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1): + arguments = kwargs.copy() + arguments["gpusPerCommand"] = gpus_per_command + + self.verify_simple_task(**arguments) + + @minimum_requirement("CORES_PER_NODE", 2) + def verify_many_task(self, **kwargs): + set_defaults( + kwargs, + nb_of_commands=self.CORES_PER_NODE) + + command_arguments = ( + "[%s]" % " ".join(str(i) for i in range(kwargs["nb_of_commands"]))) + + set_defaults( + kwargs, + command_arguments=command_arguments) + + self.verify_simple_task(**kwargs) + + @minimum_requirement("CORES_PER_NODE", 4) + def verify_many_task_with_many_cores(self, **kwargs): + for cores_per_command in xrange(2, self.CORES_PER_NODE): + if cores_per_command // self.CORES_PER_NODE <= 1: + break + + arguments = kwargs.copy() + arguments["cores_per_command"] = cores_per_command + arguments["nb_of_commands"] = ( + cores_per_command // + self.CORES_PER_NODE) + + self.many_task(**arguments) + + @minimum_requirement("GPUS_PER_NODE", 2) + def verify_many_task_with_one_gpu(self, **kwargs): + set_defaults( + kwargs, + nb_of_commands=self.GPUS_PER_NODE, + gpusPerCommand=1) + + self.verify_many_task(**kwargs) + + @minimum_requirement("GPUS_PER_NODE", 4) + def verify_many_task_with_many_gpus(self, **kwargs): + for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1): + if gpus_per_command // self.GPUS_PER_NODE <= 1: + break + + arguments = kwargs.copy() + arguments["gpusPerCommand"] = gpus_per_command + arguments["nb_of_commands"] = ( + gpus_per_command // + self.GPUS_PER_NODE) + + self.verify_many_task_with_one_gpu(**arguments) + + def verify_simple_task_with_autoresume_unneeded(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + walltime=walltime, + resume=False, + autoresume=True) + + self.verify_simple_task(**kwargs) + + def verify_simple_task_with_autoresume_needed(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + sleep_time=walltime, + walltime=walltime, + resume=True, + autoresume=True) + + self.verify_simple_task(**kwargs) + + def verify_many_task_with_autoresume_needed(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + sleep_time=walltime, + walltime=walltime, + resume=True, + autoresume=True) + + self.verify_many_task(**kwargs) + + # def verify_pool(self, **kwargs): + # pass diff --git a/smartdispatch/tests/verify_slurms_pbs_wrapper.py b/smartdispatch/tests/verify_slurms_pbs_wrapper.py new file mode 100644 index 0000000..b3c3c50 --- /dev/null +++ b/smartdispatch/tests/verify_slurms_pbs_wrapper.py @@ -0,0 +1,101 @@ +from glob import glob +import os +import time +import unittest +from subprocess import Popen, PIPE + +from smartdispatch.utils import get_slurm_cluster_name + +pbs_string = """\ +#!/usr/bin/env /bin/bash + +#PBS -N arrayJob +#PBS -o arrayJob_%A_%a.out +#PBS -l walltime=01:00:00 +{} + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + +# Checking which cluster is running the tests first +cluster = get_slurm_cluster_name() +to_skip = cluster in ['graham', 'cedar'] +message = "Test does not run on cluster {}".format(cluster) + +class TestSlurm(unittest.TestCase): + + def tearDown(self): + for file_name in (glob('*.out') + ["test.pbs"]): + os.remove(file_name) + + def _test_param(self, param_array, command_template, flag, string=pbs_string, output_array=None): + output_array = output_array or param_array + for param, output in zip(param_array, output_array): + param_command = pbs_string.format( + string.format(command_template.format(param)) + ) + with open("test.pbs", "w") as text_file: + text_file.write(param_command) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, _ = process.communicate() + stdout = stdout.decode() + self.assertIn("Submitted batch job", stdout) + job_id = stdout.split(" ")[-1].strip() + + time.sleep(0.25) + process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) + stdout, _ = process.communicate() + job_params = [c.strip() for c in stdout.decode().split("\n")[1:] if c != ''] + self.assertSequenceEqual(job_params, [output for _ in range(len(job_params))]) + + @unittest.skipIf(to_skip, message) + def test_priority(self): + self._test_param( + ['high', 'low'], + "#SBATCH --qos={}", + "qos", + pbs_string + ) + + def test_gres(self): + self._test_param( + ["1", "2"], + "#PBS -l naccelerators={}", + "gres", + pbs_string, + ["gpu:1", "gpu:2"] + ) + + def test_memory(self): + self._test_param( + ["2G", "4G"], + "#PBS -l mem={}", + "minmemory", + pbs_string + ) + + def test_nb_cpus(self): + self._test_param( + ["2", "3"], + "#PBS -l ncpus={}", + "mincpus", + pbs_string + ) + + @unittest.skipIf(to_skip, message) + def test_constraint(self): + self._test_param( + ["gpu6gb", "gpu8gb"], + "#PBS -l proc={}", + "feature", + pbs_string + ) + +if __name__ == '__main__': + unittest.main() diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 9135780..a0598d3 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -1,11 +1,38 @@ -import re +import distutils +import distutils.spawn import hashlib -import unicodedata import json +import logging +import re +import unicodedata -from distutils.util import strtobool from subprocess import Popen, PIPE + +logger = logging.getLogger(__name__) + + +TIME_REGEX = re.compile( + "^(?:(?:(?:(\d*):)?(\d*):)?(\d*):)?(\d*)$") + + +def walltime_to_seconds(walltime): + if not TIME_REGEX.match(walltime): + raise ValueError( + "Invalid walltime format: %s\n" + "It must be either DD:HH:MM:SS, HH:MM:SS, MM:SS or S" % + walltime) + + split = walltime.split(":") + + while len(split) < 4: + split = [0] + split + + days, hours, minutes, seconds = map(int, split) + + return (((((days * 24) + hours) * 60) + minutes) * 60) + seconds + + def jobname_generator(jobname, job_id): '''Crop the jobname to a maximum of 64 characters. Parameters @@ -17,7 +44,7 @@ def jobname_generator(jobname, job_id): Returns ------- str - The cropped version of the string. + The cropped version of the string. ''' # 64 - 1 since the total length including -1 should be less than 64 job_id = str(job_id) @@ -48,10 +75,10 @@ def yes_no_prompt(query, default=None): while True: try: answer = raw_input("{0}{1}".format(query, available_prompts[default])) - return strtobool(answer) + return distutils.strtobool(answer) except ValueError: if answer == '' and default is not None: - return strtobool(default) + return distutils.strtobool(default) def chunks(sequence, n): @@ -114,8 +141,10 @@ def detect_cluster(): try: output = Popen(["qstat", "-B"], stdout=PIPE).communicate()[0] except OSError: - # If qstat is not available we assume that the cluster is unknown. - return None + # If qstat is not available we assume that the cluster uses slurm. + # (Otherwise return None) + cluster_name = get_slurm_cluster_name() + return cluster_name # Get server name from status server_name = output.split('\n')[2].split(' ')[0] # Cleanup the name and return it @@ -130,9 +159,38 @@ def detect_cluster(): cluster_name = "hades" return cluster_name +def get_slurm_cluster_name(): + try: + popen = Popen("sacctmgr list cluster", stdout=PIPE, shell=True) + stdout, stderr = popen.communicate() + except OSError: + return None + + try: + stdout = stdout.decode() + cluster_name = stdout.splitlines()[2].strip().split(' ')[0] + except IndexError: + logger.debug(stderr) + return None + + return cluster_name + + +MSUB_CLUSTERS = ["helios"] +SUPPORTED_LAUNCHERS = ["qsub", "msub", "sbatch"] + + +def command_is_available(command): + return distutils.spawn.find_executable(command) is not None + def get_launcher(cluster_name): - if cluster_name == "helios": + # Gives priority to msub if qsub is also present + if cluster_name in MSUB_CLUSTERS: return "msub" - else: - return "qsub" + + for launcher in SUPPORTED_LAUNCHERS: + if command_is_available(launcher): + return launcher + + return None diff --git a/smartdispatch/workers/base_worker.py b/smartdispatch/workers/base_worker.py index b44f5d5..0fec970 100755 --- a/smartdispatch/workers/base_worker.py +++ b/smartdispatch/workers/base_worker.py @@ -76,6 +76,11 @@ def sigterm_handler(signal, frame): # Get job and node ID job_id = os.environ.get('PBS_JOBID', 'undefined') + + # It might be a slurm scheduler + if job_id == 'undefined': + job_id = os.environ.get('SLURM_JOB_ID', 'undefined') + node_name = os.environ.get('HOSTNAME', 'undefined') with open(stdout_filename, 'a') as stdout_file: