Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Sim mode at runtime #5721

Merged
merged 52 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7c4c1b9
wip on simulation broadcastable
wxtim Nov 16, 2023
c5f3a25
Allow Broadcasting modification of sim mode tasks
wxtim Nov 17, 2023
6f5320f
return to get_simulation. Cache call to time()
wxtim Jan 18, 2024
00c29a5
Cached timeout time for sim tasks on the sim_modes object.
wxtim Jan 19, 2024
f6b13b1
de-flake tests
wxtim Jan 22, 2024
ff6037c
Ensure that mode_settings are deleted from the task proxy when
wxtim Jan 29, 2024
0cd47a9
rationalize tests
wxtim Jan 30, 2024
c356bdf
r2r
wxtim Jan 31, 2024
d6e5aef
Save correct datetime format to the DB.
wxtim Feb 1, 2024
695e5a8
Response to review
wxtim Feb 6, 2024
a6f7606
small test fix
wxtim Feb 6, 2024
69b03c9
Update cylc/flow/task_job_mgr.py
wxtim Feb 8, 2024
2665d42
Apply suggestions from code review
wxtim Feb 9, 2024
89e5041
response to review
wxtim Feb 9, 2024
b097d33
Pass `task_events_manager` wholesale to sim_time_check.
wxtim Feb 12, 2024
7217c00
Rationalize existing tests and add a test for a bug discovered by Ronnie
wxtim Feb 13, 2024
970fa1d
Update tests/functional/modes/05-sim-trigger.t
wxtim Feb 13, 2024
312cf28
add a warning that fail try 1 only does not change on resubmit, only …
wxtim Feb 13, 2024
e2392af
Update cylc/flow/simulation.py
wxtim Feb 13, 2024
08f4345
Update cylc/flow/cfgspec/workflow.py
wxtim Feb 20, 2024
62d5897
fix flake8 issues
wxtim Feb 20, 2024
3832e7b
refactor test broken by sim-mode change
wxtim Feb 20, 2024
8c3bf61
Update tests/flakyfunctional/cylc-get-config/04-dummy-mode-output.t
wxtim Feb 20, 2024
c8d5aea
Apply suggestions from code review
wxtim Feb 26, 2024
42aa4b2
Fallback created for lack of start time in database.
wxtim Feb 26, 2024
42ad8cc
Ensure that broadcasts to fail cycle points triggers a re-parse
wxtim Feb 27, 2024
38db5dc
Use itask summary started time as sole arbiter of simulation
wxtim Feb 27, 2024
95d3dc4
ensure that broadcast checks and rejects unparsable fail cycle points
wxtim Feb 27, 2024
1058023
Fix a broken test, and some linting and typing issues
wxtim Feb 28, 2024
96db826
Update cylc/flow/network/resolvers.py
wxtim Feb 28, 2024
0b41424
Apply suggestions from code review
wxtim Mar 4, 2024
d93de69
Update changes.d/5721.feat.md
wxtim Mar 4, 2024
6268809
Update cylc/flow/simulation.py
wxtim Mar 4, 2024
5447530
Update changes.d/5721.feat.md
wxtim Mar 4, 2024
ab53227
fix broken test
wxtim Mar 4, 2024
d2b372e
make sim mode messages _look_ external
wxtim Mar 5, 2024
ceb3063
removed bc manager changes - one test is failing...
wxtim Mar 5, 2024
89db69c
Prevent totally invalid fail cycle points being accepted for simulation.
wxtim Mar 5, 2024
cd2fe55
Update cylc/flow/simulation.py
wxtim Mar 6, 2024
6773246
ensure than changing `fail try 1 only` doesn't cause failure
wxtim Mar 6, 2024
d147b0f
Prevent repeated use of sim_task_failed giving different answers (bas…
wxtim Mar 6, 2024
269078a
fix
wxtim Mar 6, 2024
b4f4bb1
Broadcast changes to simulated tasks in task_job_manager
wxtim Mar 11, 2024
370d2b6
test that clearing broadcasts works for sim tasks
wxtim Mar 11, 2024
b9e1d15
Update cylc/flow/simulation.py
wxtim Mar 11, 2024
b355ae2
Update cylc/flow/simulation.py
wxtim Mar 11, 2024
44abd5a
Update tests/integration/test_simulation.py
wxtim Mar 11, 2024
cb064a0
fix test
wxtim Mar 11, 2024
ef2fc58
Merge branch 'master' into feature.sim_mode_at_runtime
hjoliver Mar 14, 2024
6be7372
Update changes.d/5721.feat.md
wxtim Mar 14, 2024
389a0be
Update cylc/flow/simulation.py
wxtim Mar 14, 2024
9186ad1
Fix func test, after update on master.
hjoliver Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5721.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow task simulation mode settings to be changed dynamically using `cylc broadcast`.
15 changes: 14 additions & 1 deletion cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
from cylc.flow.cfgspec.workflow import SPEC
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.exceptions import PointParsingError
from cylc.flow.parsec.util import listjoin
from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride
from cylc.flow.parsec.validate import BroadcastConfigValidator

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.task_proxy import TaskProxy


ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"]
Expand Down Expand Up @@ -179,6 +180,18 @@ def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict:
addict(ret, self.broadcasts[cycle][namespace])
return ret

def get_updated_rtconfig(self, itask: 'TaskProxy') -> dict:
"""Retrieve updated rtconfig for a single task proxy"""
overrides = self.get_broadcast(
itask.tokens
)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
poverride(rtconfig, overrides, prepend=True)
else:
rtconfig = itask.tdef.rtconfig
return rtconfig

def load_db_broadcast_states(self, row_idx, row):
"""Load broadcast variables from runtime DB broadcast states row."""
if row_idx == 0:
Expand Down
6 changes: 6 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,12 @@ def get_script_common_text(this: str, example: Optional[str] = None):

Task instances must be set to fail by
:cylc:conf:`[..]fail cycle points`.

.. note::

This setting is designed for use with automatic
retries. Subsequent manual submissions will not
change the outcome of the task.
''')
Conf('disable task event handlers', VDR.V_BOOLEAN, True,
desc='''
Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,9 @@ def broadcast(
cycle_points, namespaces, settings)
if mode == 'clear_broadcast':
return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast(
cycle_points, namespaces, settings)
point_strings=cycle_points,
namespaces=namespaces,
cancel_settings=settings)
if mode == 'expire_broadcast':
return self.schd.task_events_mgr.broadcast_mgr.expire_broadcast(
cutoff)
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class CylcWorkflowDAO:
["flow_nums"],
["is_manual_submit", {"datatype": "INTEGER"}],
["try_num", {"datatype": "INTEGER"}],
# This is used to store simulation task start time across restarts.
["time_submit"],
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
["time_submit_exit"],
["submit_status", {"datatype": "INTEGER"}],
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,10 @@ async def main_loop(self) -> None:
if (
self.pool.config.run_mode('simulation')
and sim_time_check(
self.message_queue, self.pool.get_tasks())
self.task_events_mgr,
self.pool.get_tasks(),
self.workflow_db_mgr,
)
):
# A simulated task state change occurred.
self.reset_inactivity_timer()
Expand Down
195 changes: 147 additions & 48 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,118 @@
"""Utilities supporting simulation and skip modes
"""

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from time import time

from cylc.flow import LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.exceptions import PointParsingError
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.wallclock import get_unix_time_from_time_string

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from queue import Queue
from cylc.flow.cycling import PointBase
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.cycling import PointBase


@dataclass
class ModeSettings:
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"""A store of state for simulation modes.

Used instead of modifying the runtime config.
wxtim marked this conversation as resolved.
Show resolved Hide resolved

Args:
itask:
The task proxy this submission relates to.
broadcast_mgr:
The broadcast manager is used to apply any runtime alterations
pre simulated submission.
db_mgr:
The database manager must be provided for simulated jobs
that are being resumed after workflow restart. It is used to
extract the original scheduled finish time for the job.

Attrs:
simulated_run_length:
The length of time this simulated job will take to run in seconds.
timeout:
The wall-clock time at which this simulated job will finish as
a Unix epoch time.
sim_task_fails:
True, if this job is intended to fail when it finishes, else False.

"""
simulated_run_length: float = 0.0
sim_task_fails: bool = False
timeout: float = 0.0

def __init__(
self,
itask: 'TaskProxy',
db_mgr: 'WorkflowDatabaseManager',
rtconfig: Dict[str, Any]
):

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
try_num = None
if started_time is None:
# Get DB info
db_info = db_mgr.pri_dao.select_task_job(
itask.tokens['cycle'],
itask.tokens['task'],
itask.tokens['job'],
)

# Get the started time:
if db_info['time_submit']:
started_time = get_unix_time_from_time_string(
db_info["time_submit"])
itask.summary['started_time'] = started_time
else:
started_time = time()

# Get the try number:
try_num = db_info["try_num"]

# Parse fail cycle points:
if rtconfig != itask.tdef.rtconfig:
try:
rtconfig["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtconfig["simulation"]["fail cycle points"]
)
except PointParsingError as exc:
# Broadcast Fail CP didn't parse
LOG.warning(
'Broadcast fail cycle point was invalid:\n'
f' {exc.args[0]}'
)
rtconfig['simulation'][
'fail cycle points'
] = itask.tdef.rtconfig['simulation']['fail cycle points']

# Calculate simulation info:
self.simulated_run_length = (
get_simulated_run_len(rtconfig))
self.sim_task_fails = sim_task_failed(
rtconfig['simulation'],
itask.point,
try_num or itask.get_try_num()
)
self.timeout = started_time + self.simulated_run_length


def configure_sim_modes(taskdefs, sim_mode):
Expand All @@ -46,23 +139,17 @@ def configure_sim_modes(taskdefs, sim_mode):
for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
sleep_sec = get_simulated_run_len(rtc)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]
Copy link
Member Author

@wxtim wxtim Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MetRonnie Here is the cause of the value in the GUI's submission retry delays box. It also happens on master.

I think that it's probably as it's meant to be: In simulation mode, simulate retries after 1 second whatever the retry delay in live mode. However, I can't see that documented anywhere.


# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, sleep_sec) if dummy_mode else ""
if dummy_mode:
wxtim marked this conversation as resolved.
Show resolved Hide resolved
# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, get_simulated_run_len(rtc))

disable_platforms(rtc)

Expand All @@ -77,12 +164,13 @@ def configure_sim_modes(taskdefs, sim_mode):


def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
"""Get simulated run time.
"""Calculate simulation run time from a task's config.

rtc = run time config
"""
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']

if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
Expand Down Expand Up @@ -145,19 +233,26 @@ def parse_fail_cycle_points(
True
>>> this([])
[]
>>> this(None) is None
True
"""
f_pts: 'Optional[List[PointBase]]'
if 'all' in f_pts_orig:
f_pts: 'Optional[List[PointBase]]' = []
if (
f_pts_orig is None
or f_pts_orig and 'all' in f_pts_orig
):
f_pts = None
else:
elif f_pts_orig:
f_pts = []
for point_str in f_pts_orig:
f_pts.append(get_point(point_str).standardise())
return f_pts


def sim_time_check(
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
task_events_manager: 'TaskEventsManager',
itasks: 'List[TaskProxy]',
db_mgr: 'WorkflowDatabaseManager',
) -> bool:
"""Check if sim tasks have been "running" for as long as required.

Expand All @@ -166,38 +261,42 @@ def sim_time_check(
Returns:
True if _any_ simulated task state has changed.
"""
sim_task_state_changed = False
now = time()
sim_task_state_changed: bool = False
for itask in itasks:
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Started time is not set on restart
if itask.summary['started_time'] is None:
itask.summary['started_time'] = now
timeout = (
itask.summary['started_time'] +
itask.tdef.rtconfig['simulation']['simulated run length']
)
if now > timeout:
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
now_str = get_current_time_string()
if sim_task_failed(
itask.tdef.rtconfig['simulation'],
itask.point,
itask.get_try_num()
):
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)

# This occurs if the workflow has been restarted.
if itask.mode_settings is None:
wxtim marked this conversation as resolved.
Show resolved Hide resolved
rtconfig = task_events_manager.broadcast_mgr.get_updated_rtconfig(
itask)
itask.mode_settings = ModeSettings(
itask,
db_mgr,
rtconfig
)

if now > itask.mode_settings.timeout:
if itask.mode_settings.sim_task_fails:
task_events_manager.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
flag=task_events_manager.FLAG_RECEIVED
)
else:
# Simulate message outputs.
for msg in itask.tdef.rtconfig['outputs'].values():
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
)
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
task_events_manager.process_message(
itask, 'DEBUG', TASK_STATUS_SUCCEEDED,
flag=task_events_manager.FLAG_RECEIVED
)
# Simulate message outputs.
for msg in itask.tdef.rtconfig['outputs'].values():
task_events_manager.process_message(
itask, 'DEBUG', msg,
flag=task_events_manager.FLAG_RECEIVED
)

# We've finished this pseudo job, so delete all the mode settings.
itask.mode_settings = None
sim_task_state_changed = True
return sim_task_state_changed

Expand Down
Loading
Loading