diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index cdc4d9fcfd..744c6c66e4 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -372,7 +372,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | submit_script_filename = calculation.get_option('submit_script_filename') workdir = calculation.get_remote_workdir() - result = scheduler.submit_from_script(workdir, submit_script_filename) + result = scheduler.submit_job(workdir, submit_script_filename) if isinstance(result, str): calculation.set_job_id(result) @@ -531,7 +531,7 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: scheduler.set_transport(transport) # Call the proper kill method for the job ID of this calculation - result = scheduler.kill(job_id) + result = scheduler.kill_job(job_id) if result is not True: # Failed to kill because the job might have already been completed diff --git a/src/aiida/schedulers/__init__.py b/src/aiida/schedulers/__init__.py index b81d7f79c4..748e23b5d5 100644 --- a/src/aiida/schedulers/__init__.py +++ b/src/aiida/schedulers/__init__.py @@ -13,9 +13,11 @@ # fmt: off from .datastructures import * +from .plugins import * from .scheduler import * __all__ = ( + 'BashCliScheduler', 'JobInfo', 'JobResource', 'JobState', diff --git a/src/aiida/schedulers/plugins/bash.py b/src/aiida/schedulers/plugins/bash.py new file mode 100644 index 0000000000..875537f4d0 --- /dev/null +++ b/src/aiida/schedulers/plugins/bash.py @@ -0,0 +1,122 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Job scheduler that is interacted with through a CLI in bash.""" +from __future__ import annotations + +import abc + +from aiida.common.escaping import escape_for_bash +from aiida.engine.processes.exit_code import ExitCode +from aiida.schedulers.datastructures import JobInfo +from aiida.schedulers.scheduler import Scheduler, SchedulerError + +__all__ = ('BashCliScheduler',) + + +class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta): + """Job scheduler that is interacted with through a CLI in bash.""" + + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + """ + self.transport.chdir(working_directory) + result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename))) + return self._parse_submit_output(*result) + + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + with self.transport: + retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) + + joblist = self._parse_joblist_output(retval, stdout, stderr) + if as_dict: + jobdict = {job.job_id: job for job in joblist} + if None in jobdict: + raise SchedulerError('Found at least one job without jobid') + return jobdict + + return joblist + + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) + return self._parse_kill_output(retval, stdout, stderr) + + @abc.abstractmethod + def _get_submit_command(self, submit_script: str) -> str: + """Return the string to execute to submit a given script. + + .. warning:: the `submit_script` should already have been bash-escaped + + :param submit_script: the path of the submit script relative to the working directory. + :return: the string to execute to submit a given script. + """ + + @abc.abstractmethod + def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: + """Parse the output of the submit command returned by calling the `_get_submit_command` command. + + :return: a string with the job ID or an exit code if the submission failed because the submission script is + invalid and the job should be terminated. + """ + + @abc.abstractmethod + def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: + """Return the command to get the most complete description possible of currently active jobs. + + .. note:: + + Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done + according to the value returned by `self.get_feature('can_query_by_user')` + + :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. + :param user: either None, or a string with the username (to show only jobs of the specific user). + """ + + @abc.abstractmethod + def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: + """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. + + :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. + """ + + @abc.abstractmethod + def _get_kill_command(self, jobid: str) -> str: + """Return the command to kill the job with specified jobid.""" + + @abc.abstractmethod + def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: + """Parse the output of the kill command. + + :return: True if everything seems ok, False otherwise. + """ diff --git a/src/aiida/schedulers/plugins/direct.py b/src/aiida/schedulers/plugins/direct.py index 3c822a1dfc..3bc7812b5a 100644 --- a/src/aiida/schedulers/plugins/direct.py +++ b/src/aiida/schedulers/plugins/direct.py @@ -14,6 +14,8 @@ from aiida.schedulers import SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + ## From the ps man page on Mac OS X 10.12 # state The state is given by a sequence of characters, for example, # ``RWNA''. The first character indicates the run state of the @@ -75,7 +77,7 @@ def accepts_default_memory_per_machine(cls): return False -class DirectScheduler(aiida.schedulers.Scheduler): +class DirectScheduler(BashCliScheduler): """Support for the direct execution bypassing schedulers.""" _logger = aiida.schedulers.Scheduler._logger.getChild('direct') diff --git a/src/aiida/schedulers/plugins/lsf.py b/src/aiida/schedulers/plugins/lsf.py index aafeb2d167..33512ba944 100644 --- a/src/aiida/schedulers/plugins/lsf.py +++ b/src/aiida/schedulers/plugins/lsf.py @@ -16,6 +16,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobResource, JobState +from .bash import BashCliScheduler + # This maps LSF status codes to our own state list # # List of states from @@ -167,9 +169,10 @@ def accepts_default_mpiprocs_per_machine(cls): return False -class LsfScheduler(aiida.schedulers.Scheduler): +class LsfScheduler(BashCliScheduler): """Support for the IBM LSF scheduler - 'https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html' + + https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html """ _logger = aiida.schedulers.Scheduler._logger.getChild('lsf') diff --git a/src/aiida/schedulers/plugins/pbsbaseclasses.py b/src/aiida/schedulers/plugins/pbsbaseclasses.py index aa92f1369a..a525343607 100644 --- a/src/aiida/schedulers/plugins/pbsbaseclasses.py +++ b/src/aiida/schedulers/plugins/pbsbaseclasses.py @@ -11,9 +11,11 @@ import logging from aiida.common.escaping import escape_for_bash -from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError +from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource +from .bash import BashCliScheduler + _LOGGER = logging.getLogger(__name__) # This maps PbsPro status letters to our own status list @@ -95,8 +97,9 @@ def validate_resources(cls, **kwargs): return resources -class PbsBaseClass(Scheduler): +class PbsBaseClass(BashCliScheduler): """Base class with support for the PBSPro scheduler + (http://www.pbsworks.com/) and for PBS and Torque (http://www.adaptivecomputing.com/products/open-source/torque/). diff --git a/src/aiida/schedulers/plugins/sge.py b/src/aiida/schedulers/plugins/sge.py index 1b8d47e472..dc8c595b27 100644 --- a/src/aiida/schedulers/plugins/sge.py +++ b/src/aiida/schedulers/plugins/sge.py @@ -20,6 +20,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, ParEnvJobResource +from .bash import BashCliScheduler + # 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain': # Jobs Status: # 'qw' - Queued and waiting, @@ -87,7 +89,7 @@ class SgeJobResource(ParEnvJobResource): pass -class SgeScheduler(aiida.schedulers.Scheduler): +class SgeScheduler(BashCliScheduler): """Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)""" _logger = aiida.schedulers.Scheduler._logger.getChild('sge') diff --git a/src/aiida/schedulers/plugins/slurm.py b/src/aiida/schedulers/plugins/slurm.py index 836e728716..cbf5aec4af 100644 --- a/src/aiida/schedulers/plugins/slurm.py +++ b/src/aiida/schedulers/plugins/slurm.py @@ -15,6 +15,8 @@ from aiida.schedulers import Scheduler, SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + # This maps SLURM state codes to our own status list ## List of states from the man page of squeue @@ -140,7 +142,7 @@ def validate_resources(cls, **kwargs): return resources -class SlurmScheduler(Scheduler): +class SlurmScheduler(BashCliScheduler): """Support for the SLURM scheduler (http://slurm.schedmd.com/).""" _logger = Scheduler._logger.getChild('slurm') diff --git a/src/aiida/schedulers/scheduler.py b/src/aiida/schedulers/scheduler.py index 78962d8d47..1bdba3dc44 100644 --- a/src/aiida/schedulers/scheduler.py +++ b/src/aiida/schedulers/scheduler.py @@ -124,6 +124,44 @@ def create_job_resource(cls, **kwargs): assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource) return cls._job_resource_class(**kwargs) + @abc.abstractmethod + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + :returns: + """ + + @abc.abstractmethod + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + + @abc.abstractmethod + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + def get_submit_script(self, job_tmpl: JobTemplate) -> str: """Return the submit script as a string. @@ -286,19 +324,6 @@ def _get_run_line(self, codes_info: list[JobTemplateCodeInfo], codes_run_mode: C raise NotImplementedError('Unrecognized code run mode') - @abc.abstractmethod - def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: - """Return the command to get the most complete description possible of currently active jobs. - - .. note:: - - Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done - according to the value returned by `self.get_feature('can_query_by_user')` - - :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. - :param user: either None, or a string with the username (to show only jobs of the specific user). - """ - def _get_detailed_job_info_command(self, job_id: str) -> dict[str, t.Any]: """Return the command to run to get detailed information for a given job. @@ -331,41 +356,6 @@ def get_detailed_job_info(self, job_id: str) -> dict[str, str | int]: return detailed_job_info - @abc.abstractmethod - def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: - """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. - - :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. - """ - - def get_jobs( - self, - jobs: list[str] | None = None, - user: str | None = None, - as_dict: bool = False, - ) -> list[JobInfo] | dict[str, JobInfo]: - """Return the list of currently active jobs. - - .. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`. - - :param list jobs: a list of jobs to check; only these are checked - :param str user: a string with a user: only jobs of this user are checked - :param list as_dict: if False (default), a list of JobInfo objects is returned. If True, a dictionary is - returned, having as key the job_id and as value the JobInfo object. - :return: list of active jobs - """ - with self.transport: - retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) - - joblist = self._parse_joblist_output(retval, stdout, stderr) - if as_dict: - jobdict = {job.job_id: job for job in joblist} - if None in jobdict: - raise SchedulerError('Found at least one job without jobid') - return jobdict - - return joblist - @property def transport(self): """Return the transport set for this scheduler.""" @@ -381,58 +371,6 @@ def set_transport(self, transport: Transport): """ self._transport = transport - @abc.abstractmethod - def _get_submit_command(self, submit_script: str) -> str: - """Return the string to execute to submit a given script. - - .. warning:: the `submit_script` should already have been bash-escaped - - :param submit_script: the path of the submit script relative to the working directory. - :return: the string to execute to submit a given script. - """ - - @abc.abstractmethod - def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: - """Parse the output of the submit command returned by calling the `_get_submit_command` command. - - :return: a string with the job ID or an exit code if the submission failed because the submission script is - invalid and the job should be terminated. - """ - - def submit_from_script(self, working_directory: str, submit_script: str) -> str | ExitCode: - """Submit the submission script to the scheduler. - - :return: return a string with the job ID in a valid format to be used for querying. - """ - self.transport.chdir(working_directory) - result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script))) - return self._parse_submit_output(*result) - - def kill(self, jobid: str) -> bool: - """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. - - ..note:: - - On some schedulers, even if the command is accepted, it may take some seconds for the job to actually - disappear from the queue. - - :param jobid: the job ID to be killed - :return: True if everything seems ok, False otherwise. - """ - retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) - return self._parse_kill_output(retval, stdout, stderr) - - @abc.abstractmethod - def _get_kill_command(self, jobid: str) -> str: - """Return the command to kill the job with specified jobid.""" - - @abc.abstractmethod - def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: - """Parse the output of the kill command. - - :return: True if everything seems ok, False otherwise. - """ - def parse_output( self, detailed_job_info: dict[str, str | int] | None = None, diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 42f01dae10..20f219d852 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1294,7 +1294,7 @@ def test_asynchronous_execution(self, custom_transport): transport.putfile(tmpf.name, script_fname) timestamp_before = time.time() - job_id_string = scheduler.submit_from_script('/tmp', script_fname) + job_id_string = scheduler.submit_job('/tmp', script_fname) elapsed_time = time.time() - timestamp_before # We want to get back control. If it takes < 5 seconds, it means that it is not blocking