Skip to content

Commit

Permalink
Implement Skip Mode
Browse files Browse the repository at this point in the history
* Add `[runtime][<namespace>]run mode` and `[runtime][<namespace>][skip]`.
* Spin run mode functionality into separate modules.
* Run sim mode check with every main loop - we don't know if any tasks are
  in sim mode from the scheduler, but it doesn't cost much to check
  if none are.
* Implemented separate job "submission" pathway switching.
* Implemented skip mode, including output control logic.
* Add a linter and a validation check for tasks in nonlive modes,
  and for combinations of outputs
* Enabled setting outputs as if task ran in skip mode using
  `cylc set --out skip`.
* Testing for the above.

Schema: use `Enum` for task run mode instead of `String` (#61)

* Schema: use `Enum` for task run mode instead of `String`

* Tidy

fixup merge

fix broken functional test

Improve cylc set --out skip
* Improve documentation of feature in cylc set --help
* Allow cylc set --out=skip,optional_output
* Test the above

Remove test: We don't want users opting out of validating
[runtime][ns][simulation/skip] because we can now
changes these in a running workflow.

stop users opting out of validating workflows without validating ski/simulation taskdef sections

added tests for db entries in nonlive mode

ensure db entries for all four modes are correct.

move the change file toi the correct name

get localhost of platforms 'simulation' 'skip' or 'dummy' not defined. (They probably shouldn't be, but that's a site specific choice...)

fix tests with extra messages surfaces by using log_filter

make cylc clean and remote tidy not try to clean or tidy platforms

stop dummy mode appearing to submit twice

Prevent cleanup from attempting to remote clean platforms skip and simulation

Update cylc/flow/run_modes/skip.py

Co-authored-by: Oliver Sanders <[email protected]>

fix small issues from OS review

response to review: Make satisfaction method correct according the proposal

Response to review
* Allow only run modes skip and live to be selectable in the config.
* Disallow workflow run modes sim and dummy from being overridden.
* Attach Run mode to task_proxy rather than the task def.

Response to review
* Allow only run modes skip and live to be selectable in the config.
* Disallow workflow run modes sim and dummy from being overridden.
* Attach Run mode to task_proxy rather than the task def.

don't run sim time check unless workflow in sim mode

test that skip mode is only applied to a single task.

remove now illegal items from test

Response to review:
- Remove Workflow Mode for tasks and make them default to live.
- Ensure that we are checking (assert) log_filters.
- Remove need for polling in functional test. :)

usin enums

Apply suggestions from code review

Co-authored-by: Oliver Sanders <[email protected]>
  • Loading branch information
wxtim and oliver-sanders committed Sep 5, 2024
1 parent 293c3cc commit d354fad
Show file tree
Hide file tree
Showing 70 changed files with 2,586 additions and 816 deletions.
1 change: 1 addition & 0 deletions changes.d/6039.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a new mode task run mode "skip" which overrides workflow live mode task submission.
59 changes: 59 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
fail_if_platform_and_host_conflict, get_platform_deprecated_settings,
is_platform_definition_subshell)
from cylc.flow.task_events_mgr import EventData
from cylc.flow.task_state import RunMode


# Regex to check whether a string is a command
REC_COMMAND = re.compile(r'(`|\$\()\s*(.*)\s*([`)])$')
Expand Down Expand Up @@ -1334,6 +1336,27 @@ def get_script_common_text(this: str, example: Optional[str] = None):
"[platforms][<platform name>]submission retry delays"
)
)
Conf(
'run mode', VDR.V_STRING,
options=list(RunMode.OVERRIDING_MODES.value) + [''],
default='',
desc=f'''
For a workflow run in live mode run this task in skip
mode.
{RunMode.LIVE.value}:
{RunMode.LIVE.describe()}
{RunMode.SKIP.value}:
{RunMode.SKIP.describe()}
.. seealso::
:ref:`task-run-modes`
.. versionadded:: 8.4.0
''')
with Conf('meta', desc=r'''
Metadata for the task or task family.
Expand Down Expand Up @@ -1406,7 +1429,43 @@ def get_script_common_text(this: str, example: Optional[str] = None):
determine how an event handler responds to task failure
events.
''')
with Conf('skip', desc='''
Task configuration for task :ref:`task-run-modes.skip`.
For a full description of skip run mode see
:ref:`task-run-modes.skip`.
.. versionadded:: 8.4.0
'''):
Conf(
'outputs',
VDR.V_STRING_LIST,
desc='''
Outputs to be emitted by a task in skip mode.
* By default, all required outputs will be generated
plus succeeded if success is optional.
* If skip-mode outputs is specified and does not
include either succeeded or failed then succeeded
will be produced.
* The outputs submitted and started are always
produced and do not need to be defined in outputs.
.. versionadded:: 8.4.0
'''
)
Conf(
'disable task event handlers',
VDR.V_BOOLEAN,
default=True,
desc='''
Task event handlers are turned off by default for
skip mode tasks. Changing this setting to ``False``
will re-enable task event handlers.
.. versionadded:: 8.4.0
'''
)
with Conf('simulation', desc='''
Task configuration for workflow *simulation* and *dummy* run
modes.
Expand Down
9 changes: 5 additions & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
from cylc.flow.workflow_status import RunMode, StopMode
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED, RunMode)
from cylc.flow.workflow_status import StopMode

from metomi.isodatetime.parsers import TimePointParser

Expand Down Expand Up @@ -247,7 +248,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Poll pollable tasks or a task or family if options are provided."""
validate.is_tasks(tasks)
yield
if schd.get_run_mode() == RunMode.SIMULATION:
if schd.get_run_mode() == RunMode.SIMULATION.value:
yield 0
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
schd.task_job_mgr.poll_task_jobs(schd.workflow, itasks)
Expand All @@ -260,7 +261,7 @@ async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
validate.is_tasks(tasks)
yield
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
if schd.get_run_mode() == RunMode.SIMULATION:
if schd.get_run_mode() == RunMode.SIMULATION.value:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
Expand Down
10 changes: 4 additions & 6 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
)
from cylc.flow.print_tree import print_tree
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.run_modes.nonlive import run_mode_validate_checks
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
Expand All @@ -99,6 +99,7 @@
get_trigger_completion_variable_maps,
trigger_to_completion_variable,
)
from cylc.flow.task_state import RunMode
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
from cylc.flow.unicode_rules import (
Expand All @@ -114,7 +115,6 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.xtrigger_mgr import XtriggerCollator

if TYPE_CHECKING:
Expand Down Expand Up @@ -513,10 +513,6 @@ def __init__(

self.process_runahead_limit()

run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

self._check_task_event_handlers()
Expand Down Expand Up @@ -567,6 +563,8 @@ def __init__(

self.mem_log("config.py: end init config")

run_mode_validate_checks(self.taskdefs)

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message PbRuntime {
optional string environment = 16;
optional string outputs = 17;
optional string completion = 18;
optional string run_mode = 19;
}


Expand Down
100 changes: 50 additions & 50 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit d354fad

Please sign in to comment.