Skip to content

Commit

Permalink
Made execution time limit unsettable by reload or broadcast. (#5902)
Browse files Browse the repository at this point in the history
* Made execution time limit unsettable by reload or broadcast.
Formerly a suppress statement was supressing cases when we
wanted to set itask.summary['execution time limit'] = None.

Added tests.

* response to verbal review

* Apply suggestions from code review

Co-authored-by: Oliver Sanders <[email protected]>

* Prevent false positives in test for broadcast.
Test broadcast using _prep_submit_task_job
rather than _prep_submit_task_job_impl so that
the fake broadcast is applied.

* Update changes.d/5902.fix.md

Co-authored-by: Hilary James Oliver <[email protected]>

* made function less tolerant

---------

Co-authored-by: Oliver Sanders <[email protected]>
Co-authored-by: Hilary James Oliver <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent 9948532 commit 2ec98dd
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 5 deletions.
1 change: 1 addition & 0 deletions changes.d/5902.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload.
35 changes: 30 additions & 5 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Union, Optional

from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
Expand Down Expand Up @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
itask.submit_num] = itask.platform['name']

itask.summary['job_runner_name'] = itask.platform['job runner']
with suppress(TypeError):
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float(
rtconfig['execution time limit']
)

# None is an allowed non-float number for Execution time limit.
itask.summary[
self.KEY_EXECUTE_TIME_LIMIT
] = self.get_execution_time_limit(rtconfig['execution time limit'])

# Location of job file, etc
self._create_job_log_path(workflow, itask)
Expand All @@ -1281,6 +1282,30 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
job_d=job_d
)

@staticmethod
def get_execution_time_limit(
config_execution_time_limit: Any
) -> Union[None, float]:
"""Get execution time limit from config and process it.
If the etl from the config is a Falsy then return None.
Otherwise try and parse value as float.
Examples:
>>> from pytest import raises
>>> this = TaskJobManager.get_execution_time_limit
>>> this(None)
>>> this("54")
54.0
>>> this({})
>>> with raises(ValueError):
... this('🇳🇿')
"""
if config_execution_time_limit:
return float(config_execution_time_limit)
return None

def get_job_conf(
self,
workflow,
Expand Down
58 changes: 58 additions & 0 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import suppress
import logging
from typing import Any as Fixture

Expand Down Expand Up @@ -128,3 +129,60 @@ async def test__run_job_cmd_logs_platform_lookup_fail(
warning = caplog.records[-1]
assert warning.levelname == 'ERROR'
assert 'Unable to run command jobs-poll' in warning.msg


async def test__prep_submit_task_job_impl_handles_execution_time_limit(
flow: Fixture,
scheduler: Fixture,
start: Fixture,
):
"""Ensure that emptying the execution time limit unsets it.
Previously unsetting the etl by either broadcast or reload
would not unset a previous etl.
See https://github.com/cylc/cylc-flow/issues/5891
"""
id_ = flow({
"scheduling": {
"cycling mode": "integer",
"graph": {"R1": "a"}
},
"runtime": {
"root": {},
"a": {
"script": "sleep 10",
"execution time limit": 'PT5S'
}
}
})

# Run in live mode - function not called in sim mode.
schd = scheduler(id_, run_mode='live')
async with start(schd):
task_a = schd.pool.get_tasks()[0]
# We're not interested in the job file stuff, just
# in the summary state.
with suppress(FileExistsError):
schd.task_job_mgr._prep_submit_task_job_impl(
schd.workflow, task_a, task_a.tdef.rtconfig)
assert task_a.summary['execution_time_limit'] == 5.0

# If we delete the etl it gets deleted in the summary:
task_a.tdef.rtconfig['execution time limit'] = None
with suppress(FileExistsError):
schd.task_job_mgr._prep_submit_task_job_impl(
schd.workflow, task_a, task_a.tdef.rtconfig)
assert not task_a.summary.get('execution_time_limit', '')

# put everything back and test broadcast too.
task_a.tdef.rtconfig['execution time limit'] = 5.0
task_a.summary['execution_time_limit'] = 5.0
schd.broadcast_mgr.broadcasts = {
'1': {'a': {'execution time limit': None}}}
with suppress(FileExistsError):
# We run a higher level function here to ensure
# that the broadcast is applied.
schd.task_job_mgr._prep_submit_task_job(
schd.workflow, task_a)
assert not task_a.summary.get('execution_time_limit', '')

0 comments on commit 2ec98dd

Please sign in to comment.