Skip to content

Commit

Permalink
Merge 'upstream/master' into 8.3.x-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jul 10, 2024
2 parents dd170d4 + e28e0c0 commit 55872e4
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 17 deletions.
1 change: 1 addition & 0 deletions changes.d/6137.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New Cylc lint rule: S014: Don't use job runner specific execution time limit directives, use execution time limit.
5 changes: 2 additions & 3 deletions cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Set up the cylc environment."""

import os
import logging

import os

CYLC_LOG = 'cylc'

Expand Down Expand Up @@ -53,7 +52,7 @@ def environ_init():

environ_init()

__version__ = '8.3.3.dev'
__version__ = '8.4.0.dev'


def iter_entry_points(entry_point_name):
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/job_runner_handlers/loadleveler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class LoadlevelerHandler():
re.compile("^llsubmit: Processed command file through Submit Filter:")]
SUBMIT_CMD_TMPL = "llsubmit '%(job)s'"
VACATION_SIGNAL = "USR1"
TIME_LIMIT_DIRECTIVE = "wall_clock_limit"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -96,8 +97,8 @@ def format_directives(self, job_conf):
directives["output"] = job_file_path + ".out"
directives["error"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("wall_clock_limit") is None):
directives["wall_clock_limit"] = "%d,%d" % (
directives.get(self.TIME_LIMIT_DIRECTIVE) is None):
directives[self.TIME_LIMIT_DIRECTIVE] = "%d,%d" % (
job_conf["execution_time_limit"] + 60,
job_conf["execution_time_limit"])
for key, value in list(job_conf["directives"].items()):
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/job_runner_handlers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class LSFHandler():
POLL_CMD = "bjobs"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P<id>\d+)>")
SUBMIT_CMD_TMPL = "bsub"
TIME_LIMIT_DIRECTIVE = "-W"

@classmethod
def format_directives(cls, job_conf):
Expand All @@ -82,8 +83,11 @@ def format_directives(cls, job_conf):
)
directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if job_conf["execution_time_limit"] and directives.get("-W") is None:
directives["-W"] = str(math.ceil(
if (
job_conf["execution_time_limit"]
and directives.get(cls.TIME_LIMIT_DIRECTIVE) is None
):
directives[cls.TIME_LIMIT_DIRECTIVE] = str(math.ceil(
job_conf["execution_time_limit"] / 60))
for key, value in list(job_conf["directives"].items()):
directives[key] = value
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/job_runner_handlers/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class MoabHandler:
POLL_CMD = "checkjob"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"""\A\s*(?P<id>\S+)\s*\Z""")
SUBMIT_CMD_TMPL = "msub '%(job)s'"
TIME_LIMIT_DIRECTIVE = "-l walltime"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -91,8 +92,9 @@ def format_directives(self, job_conf):
directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l walltime") is None):
directives["-l walltime"] = "%d" % job_conf["execution_time_limit"]
directives.get(self.TIME_LIMIT_DIRECTIVE) is None):
directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[
"execution_time_limit"]
# restartable?
directives.update(job_conf["directives"])
lines = []
Expand Down
10 changes: 7 additions & 3 deletions cylc/flow/job_runner_handlers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class PBSHandler:
POLL_CANT_CONNECT_ERR = "Connection refused"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"^\s*(?P<id>\d+)", re.M)
SUBMIT_CMD_TMPL = "qsub '%(job)s'"
TIME_LIMIT_DIRECTIVE = "-l walltime"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -105,9 +106,12 @@ def format_directives(self, job_conf):

directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l walltime") is None):
directives["-l walltime"] = "%d" % job_conf["execution_time_limit"]
if (
job_conf["execution_time_limit"]
and directives.get(self.TIME_LIMIT_DIRECTIVE) is None
):
directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[
"execution_time_limit"]
for key, value in list(job_conf["directives"].items()):
directives[key] = value
lines = []
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/job_runner_handlers/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
-cwd =
-q = foo
-l h_data = 1024M
-l h_rt = 24:00:00
These are written to the top of the job script like this:
Expand Down Expand Up @@ -76,6 +75,7 @@ class SGEHandler:
POLL_CMD = "qstat"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"\D+(?P<id>\d+)\D+")
SUBMIT_CMD_TMPL = "qsub '%(job)s'"
TIME_LIMIT_DIRECTIVE = "-l h_rt"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -88,8 +88,8 @@ def format_directives(self, job_conf):
directives['-o'] = job_file_path + ".out"
directives['-e'] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l h_rt") is None):
directives["-l h_rt"] = "%d:%02d:%02d" % (
directives.get(self.TIME_LIMIT_DIRECTIVE) is None):
directives[self.TIME_LIMIT_DIRECTIVE] = "%d:%02d:%02d" % (
job_conf["execution_time_limit"] / 3600,
(job_conf["execution_time_limit"] / 60) % 60,
job_conf["execution_time_limit"] % 60)
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/job_runner_handlers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class SLURMHandler():
# Separator between het job directive sections
SEP_HETJOB = "#SBATCH hetjob"

TIME_LIMIT_DIRECTIVE = "--time"

@classmethod
def filter_poll_many_output(cls, out):
"""Return list of job IDs extracted from job poll stdout.
Expand All @@ -161,8 +163,8 @@ def format_directives(cls, job_conf):
directives['--output'] = job_file_path.replace('%', '%%') + ".out"
directives['--error'] = job_file_path.replace('%', '%%') + ".err"
if (job_conf["execution_time_limit"] and
directives.get("--time") is None):
directives["--time"] = "%d:%02d" % (
directives.get(cls.TIME_LIMIT_DIRECTIVE) is None):
directives[cls.TIME_LIMIT_DIRECTIVE] = "%d:%02d" % (
job_conf["execution_time_limit"] / 60,
job_conf["execution_time_limit"] % 60)
for key, value in list(job_conf['directives'].items()):
Expand Down
55 changes: 55 additions & 0 deletions cylc/flow/scripts/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
max-line-length = 130 # Max line length for linting
"""
import functools
import pkgutil
from pathlib import Path
import re
import sys
Expand Down Expand Up @@ -78,6 +79,8 @@
from cylc.flow import LOG
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
from cylc.flow import job_runner_handlers
from cylc.flow.job_runner_mgr import JobRunnerManager
from cylc.flow.loggingutil import set_timestamps
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
Expand Down Expand Up @@ -162,6 +165,34 @@
}


def get_wallclock_directives():
"""Get a set of directives equivalent to execution time limit"""
job_runner_manager = JobRunnerManager()
directives = {}
for module in pkgutil.iter_modules(job_runner_handlers.__path__):
directive = getattr(
job_runner_manager._get_sys(module.name),
'TIME_LIMIT_DIRECTIVE',
None
)
if directive:
directives[module.name] = directive
return directives


WALLCLOCK_DIRECTIVES = get_wallclock_directives()


def check_wallclock_directives(line: str) -> Union[Dict[str, str], bool]:
"""Check for job runner specific directives
equivelent to exection time limit.
"""
for directive in set(WALLCLOCK_DIRECTIVES.values()):
if line.strip().startswith(directive):
return {'directive': line.strip()}
return False


def check_jinja2_no_shebang(
line: str,
file: Path,
Expand Down Expand Up @@ -533,6 +564,30 @@ def list_wrapper(line: str, check: Callable) -> Optional[Dict[str, str]]:
'S013': {
'short': 'Items should be indented in 4 space blocks.',
FUNCTION: check_indentation
},
'S014': {
'short': (
'Use ``[runtime][TASK]execution time limit``'
' rather than job runner directive: ``{directive}``.'
),
'rst': (
'Using ``[runtime][TASK]execution time limit`` is'
' recommended in preference to using job runner'
' directives because it allows Cylc to retain awareness'
' of whether the job should have finished, even if contact'
' with the target job runner\'s platform has been lost.'
' \n\nThe following directives are considered equivelent to'
' execution time limit:\n * '
)
+ '\n * '.join((
f'``{directive}`` ({job_runner})'
for job_runner, directive in WALLCLOCK_DIRECTIVES.items()
)) + (
'\n\n.. note:: Using ``execution time limit`` which'
' is automatically translated to the job runner\'s timeout'
' directive can make your workflow more portable.'
),
FUNCTION: check_wallclock_directives,
}
}
# Subset of deprecations which are tricky (impossible?) to scrape from the
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/scripts/test_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@
something\t
[[bar]]
platform = $(some-script foo)
[[[directives]]]
-l walltime = 666
[[baz]]
platform = `no backticks`
""" + (
Expand Down

0 comments on commit 55872e4

Please sign in to comment.