From c320c64a8441f425f1b579ca1dfe9763a6f50ef6 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Sun, 2 Apr 2023 16:55:39 +0200 Subject: [PATCH] `Scheduler`: Refactor interface to make it more generic The original `Scheduler` interface made the assumption that all interfaces would interact with the scheduler through a command line interface that would be invoked through a bash shell. However, this is not always the case. Prime example is the new FirecREST service, being developed by CSCS, that will allow to interact with the scheduler through a REST API. Due to the assumptions of the `Scheduler` interface, it was difficult to implement it for this use case. The `Scheduler` interface is made more generic, by removing the following (abstract) methods: * `_get_joblist_command` * `_parse_joblist_output` * `_get_submit_command` * `_parse_submit_output` * `submit_from_script` * `kill` * `_get_kill_command` * `_parse_kill_output` They are replaced by three abstract methods: * `submit_job` * `get_jobs` * `kill_job` The new interface no longer makes an assumption about how a plugin implements these methods. The first one should simply submit the job, given the location of the submission script on the remote computer. The second should return the status of the list of active jobs. And the final should kill a job and return the result. Unfortunately, this change is backwards incompatible and will break existing scheduler plugins. To simplify the migration pathway, a subclass `BashCliScheduler` is added. This implements the new `Scheduler` interface while maintaining the old interface. This means that this new class is a drop-in replacement of the old `Scheduler` class for existing plugins. The plugins that ship with `aiida-core` are all updated to subclass from `BashCliScheduler`. Any existing plugins that subclassed from these plugins will therefore not be affected whatsoever by these changes. --- src/aiida/engine/daemon/execmanager.py | 4 +- src/aiida/schedulers/__init__.py | 2 + src/aiida/schedulers/plugins/bash.py | 122 ++++++++++++++++ src/aiida/schedulers/plugins/direct.py | 4 +- src/aiida/schedulers/plugins/lsf.py | 7 +- .../schedulers/plugins/pbsbaseclasses.py | 7 +- src/aiida/schedulers/plugins/sge.py | 4 +- src/aiida/schedulers/plugins/slurm.py | 4 +- src/aiida/schedulers/scheduler.py | 138 +++++------------- tests/transports/test_all_plugins.py | 2 +- 10 files changed, 184 insertions(+), 110 deletions(-) create mode 100644 src/aiida/schedulers/plugins/bash.py diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index cdc4d9fcfd..744c6c66e4 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -372,7 +372,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | submit_script_filename = calculation.get_option('submit_script_filename') workdir = calculation.get_remote_workdir() - result = scheduler.submit_from_script(workdir, submit_script_filename) + result = scheduler.submit_job(workdir, submit_script_filename) if isinstance(result, str): calculation.set_job_id(result) @@ -531,7 +531,7 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: scheduler.set_transport(transport) # Call the proper kill method for the job ID of this calculation - result = scheduler.kill(job_id) + result = scheduler.kill_job(job_id) if result is not True: # Failed to kill because the job might have already been completed diff --git a/src/aiida/schedulers/__init__.py b/src/aiida/schedulers/__init__.py index b81d7f79c4..748e23b5d5 100644 --- a/src/aiida/schedulers/__init__.py +++ b/src/aiida/schedulers/__init__.py @@ -13,9 +13,11 @@ # fmt: off from .datastructures import * +from .plugins import * from .scheduler import * __all__ = ( + 'BashCliScheduler', 'JobInfo', 'JobResource', 'JobState', diff --git a/src/aiida/schedulers/plugins/bash.py b/src/aiida/schedulers/plugins/bash.py new file mode 100644 index 0000000000..875537f4d0 --- /dev/null +++ b/src/aiida/schedulers/plugins/bash.py @@ -0,0 +1,122 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Job scheduler that is interacted with through a CLI in bash.""" +from __future__ import annotations + +import abc + +from aiida.common.escaping import escape_for_bash +from aiida.engine.processes.exit_code import ExitCode +from aiida.schedulers.datastructures import JobInfo +from aiida.schedulers.scheduler import Scheduler, SchedulerError + +__all__ = ('BashCliScheduler',) + + +class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta): + """Job scheduler that is interacted with through a CLI in bash.""" + + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + """ + self.transport.chdir(working_directory) + result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename))) + return self._parse_submit_output(*result) + + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + with self.transport: + retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) + + joblist = self._parse_joblist_output(retval, stdout, stderr) + if as_dict: + jobdict = {job.job_id: job for job in joblist} + if None in jobdict: + raise SchedulerError('Found at least one job without jobid') + return jobdict + + return joblist + + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) + return self._parse_kill_output(retval, stdout, stderr) + + @abc.abstractmethod + def _get_submit_command(self, submit_script: str) -> str: + """Return the string to execute to submit a given script. + + .. warning:: the `submit_script` should already have been bash-escaped + + :param submit_script: the path of the submit script relative to the working directory. + :return: the string to execute to submit a given script. + """ + + @abc.abstractmethod + def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: + """Parse the output of the submit command returned by calling the `_get_submit_command` command. + + :return: a string with the job ID or an exit code if the submission failed because the submission script is + invalid and the job should be terminated. + """ + + @abc.abstractmethod + def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: + """Return the command to get the most complete description possible of currently active jobs. + + .. note:: + + Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done + according to the value returned by `self.get_feature('can_query_by_user')` + + :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. + :param user: either None, or a string with the username (to show only jobs of the specific user). + """ + + @abc.abstractmethod + def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: + """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. + + :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. + """ + + @abc.abstractmethod + def _get_kill_command(self, jobid: str) -> str: + """Return the command to kill the job with specified jobid.""" + + @abc.abstractmethod + def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: + """Parse the output of the kill command. + + :return: True if everything seems ok, False otherwise. + """ diff --git a/src/aiida/schedulers/plugins/direct.py b/src/aiida/schedulers/plugins/direct.py index 3c822a1dfc..3bc7812b5a 100644 --- a/src/aiida/schedulers/plugins/direct.py +++ b/src/aiida/schedulers/plugins/direct.py @@ -14,6 +14,8 @@ from aiida.schedulers import SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + ## From the ps man page on Mac OS X 10.12 # state The state is given by a sequence of characters, for example, # ``RWNA''. The first character indicates the run state of the @@ -75,7 +77,7 @@ def accepts_default_memory_per_machine(cls): return False -class DirectScheduler(aiida.schedulers.Scheduler): +class DirectScheduler(BashCliScheduler): """Support for the direct execution bypassing schedulers.""" _logger = aiida.schedulers.Scheduler._logger.getChild('direct') diff --git a/src/aiida/schedulers/plugins/lsf.py b/src/aiida/schedulers/plugins/lsf.py index aafeb2d167..33512ba944 100644 --- a/src/aiida/schedulers/plugins/lsf.py +++ b/src/aiida/schedulers/plugins/lsf.py @@ -16,6 +16,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobResource, JobState +from .bash import BashCliScheduler + # This maps LSF status codes to our own state list # # List of states from @@ -167,9 +169,10 @@ def accepts_default_mpiprocs_per_machine(cls): return False -class LsfScheduler(aiida.schedulers.Scheduler): +class LsfScheduler(BashCliScheduler): """Support for the IBM LSF scheduler - 'https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html' + + https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html """ _logger = aiida.schedulers.Scheduler._logger.getChild('lsf') diff --git a/src/aiida/schedulers/plugins/pbsbaseclasses.py b/src/aiida/schedulers/plugins/pbsbaseclasses.py index aa92f1369a..a525343607 100644 --- a/src/aiida/schedulers/plugins/pbsbaseclasses.py +++ b/src/aiida/schedulers/plugins/pbsbaseclasses.py @@ -11,9 +11,11 @@ import logging from aiida.common.escaping import escape_for_bash -from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError +from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource +from .bash import BashCliScheduler + _LOGGER = logging.getLogger(__name__) # This maps PbsPro status letters to our own status list @@ -95,8 +97,9 @@ def validate_resources(cls, **kwargs): return resources -class PbsBaseClass(Scheduler): +class PbsBaseClass(BashCliScheduler): """Base class with support for the PBSPro scheduler + (http://www.pbsworks.com/) and for PBS and Torque (http://www.adaptivecomputing.com/products/open-source/torque/). diff --git a/src/aiida/schedulers/plugins/sge.py b/src/aiida/schedulers/plugins/sge.py index 1b8d47e472..dc8c595b27 100644 --- a/src/aiida/schedulers/plugins/sge.py +++ b/src/aiida/schedulers/plugins/sge.py @@ -20,6 +20,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, ParEnvJobResource +from .bash import BashCliScheduler + # 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain': # Jobs Status: # 'qw' - Queued and waiting, @@ -87,7 +89,7 @@ class SgeJobResource(ParEnvJobResource): pass -class SgeScheduler(aiida.schedulers.Scheduler): +class SgeScheduler(BashCliScheduler): """Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)""" _logger = aiida.schedulers.Scheduler._logger.getChild('sge') diff --git a/src/aiida/schedulers/plugins/slurm.py b/src/aiida/schedulers/plugins/slurm.py index 836e728716..cbf5aec4af 100644 --- a/src/aiida/schedulers/plugins/slurm.py +++ b/src/aiida/schedulers/plugins/slurm.py @@ -15,6 +15,8 @@ from aiida.schedulers import Scheduler, SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + # This maps SLURM state codes to our own status list ## List of states from the man page of squeue @@ -140,7 +142,7 @@ def validate_resources(cls, **kwargs): return resources -class SlurmScheduler(Scheduler): +class SlurmScheduler(BashCliScheduler): """Support for the SLURM scheduler (http://slurm.schedmd.com/).""" _logger = Scheduler._logger.getChild('slurm') diff --git a/src/aiida/schedulers/scheduler.py b/src/aiida/schedulers/scheduler.py index 78962d8d47..1bdba3dc44 100644 --- a/src/aiida/schedulers/scheduler.py +++ b/src/aiida/schedulers/scheduler.py @@ -124,6 +124,44 @@ def create_job_resource(cls, **kwargs): assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource) return cls._job_resource_class(**kwargs) + @abc.abstractmethod + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + :returns: + """ + + @abc.abstractmethod + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + + @abc.abstractmethod + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + def get_submit_script(self, job_tmpl: JobTemplate) -> str: """Return the submit script as a string. @@ -286,19 +324,6 @@ def _get_run_line(self, codes_info: list[JobTemplateCodeInfo], codes_run_mode: C raise NotImplementedError('Unrecognized code run mode') - @abc.abstractmethod - def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: - """Return the command to get the most complete description possible of currently active jobs. - - .. note:: - - Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done - according to the value returned by `self.get_feature('can_query_by_user')` - - :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. - :param user: either None, or a string with the username (to show only jobs of the specific user). - """ - def _get_detailed_job_info_command(self, job_id: str) -> dict[str, t.Any]: """Return the command to run to get detailed information for a given job. @@ -331,41 +356,6 @@ def get_detailed_job_info(self, job_id: str) -> dict[str, str | int]: return detailed_job_info - @abc.abstractmethod - def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: - """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. - - :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. - """ - - def get_jobs( - self, - jobs: list[str] | None = None, - user: str | None = None, - as_dict: bool = False, - ) -> list[JobInfo] | dict[str, JobInfo]: - """Return the list of currently active jobs. - - .. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`. - - :param list jobs: a list of jobs to check; only these are checked - :param str user: a string with a user: only jobs of this user are checked - :param list as_dict: if False (default), a list of JobInfo objects is returned. If True, a dictionary is - returned, having as key the job_id and as value the JobInfo object. - :return: list of active jobs - """ - with self.transport: - retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) - - joblist = self._parse_joblist_output(retval, stdout, stderr) - if as_dict: - jobdict = {job.job_id: job for job in joblist} - if None in jobdict: - raise SchedulerError('Found at least one job without jobid') - return jobdict - - return joblist - @property def transport(self): """Return the transport set for this scheduler.""" @@ -381,58 +371,6 @@ def set_transport(self, transport: Transport): """ self._transport = transport - @abc.abstractmethod - def _get_submit_command(self, submit_script: str) -> str: - """Return the string to execute to submit a given script. - - .. warning:: the `submit_script` should already have been bash-escaped - - :param submit_script: the path of the submit script relative to the working directory. - :return: the string to execute to submit a given script. - """ - - @abc.abstractmethod - def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: - """Parse the output of the submit command returned by calling the `_get_submit_command` command. - - :return: a string with the job ID or an exit code if the submission failed because the submission script is - invalid and the job should be terminated. - """ - - def submit_from_script(self, working_directory: str, submit_script: str) -> str | ExitCode: - """Submit the submission script to the scheduler. - - :return: return a string with the job ID in a valid format to be used for querying. - """ - self.transport.chdir(working_directory) - result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script))) - return self._parse_submit_output(*result) - - def kill(self, jobid: str) -> bool: - """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. - - ..note:: - - On some schedulers, even if the command is accepted, it may take some seconds for the job to actually - disappear from the queue. - - :param jobid: the job ID to be killed - :return: True if everything seems ok, False otherwise. - """ - retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) - return self._parse_kill_output(retval, stdout, stderr) - - @abc.abstractmethod - def _get_kill_command(self, jobid: str) -> str: - """Return the command to kill the job with specified jobid.""" - - @abc.abstractmethod - def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: - """Parse the output of the kill command. - - :return: True if everything seems ok, False otherwise. - """ - def parse_output( self, detailed_job_info: dict[str, str | int] | None = None, diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 42f01dae10..20f219d852 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1294,7 +1294,7 @@ def test_asynchronous_execution(self, custom_transport): transport.putfile(tmpf.name, script_fname) timestamp_before = time.time() - job_id_string = scheduler.submit_from_script('/tmp', script_fname) + job_id_string = scheduler.submit_job('/tmp', script_fname) elapsed_time = time.time() - timestamp_before # We want to get back control. If it takes < 5 seconds, it means that it is not blocking