diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py
index 5103c1ccdb3..dce1b0316a0 100644
--- a/cylc/flow/cfgspec/workflow.py
+++ b/cylc/flow/cfgspec/workflow.py
@@ -1338,8 +1338,8 @@ def get_script_common_text(this: str, example: Optional[str] = None):
)
Conf(
'run mode', VDR.V_STRING,
- options=list(RunMode.OVERRIDING_MODES.value),
- default=RunMode.LIVE.value,
+ options=list(RunMode.OVERRIDING_MODES.value) + [''],
+ default='',
desc=f'''
For a workflow run in live mode run this task in skip
mode.
diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py
index 1b700c8d838..60daf0f9ed7 100644
--- a/cylc/flow/run_modes/skip.py
+++ b/cylc/flow/run_modes/skip.py
@@ -46,6 +46,10 @@ def submit_task_job(
Returns:
True - indicating that TaskJobManager need take no further action.
"""
+ # Don't do anything if task is held:
+ if itask.state.is_held:
+ return True
+
task_job_mgr._set_retry_timers(itask, rtconfig)
itask.summary['started_time'] = now[0]
itask.waiting_on_job_prep = False
@@ -56,14 +60,12 @@ def submit_task_job(
'install target': 'localhost',
'hosts': ['localhost'],
'disable task event handlers':
- rtconfig['skip']['disable task event handlers']
+ rtconfig['skip']['disable task event handlers'],
+ 'execution polling intervals': []
}
itask.platform['name'] = RunMode.SKIP.value
itask.summary['job_runner_name'] = RunMode.SKIP.value
itask.run_mode = RunMode.SKIP.value
- task_job_mgr.task_events_mgr.process_message(
- itask, INFO, TASK_OUTPUT_SUBMITTED,
- )
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
@@ -95,34 +97,40 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
A list of outputs to emit.
"""
- # Always produce `submitted` output:
- # (No need to produce `started` as this is automatically handled by
- # task event manager for jobless modes)
- result: List[str] = [TASK_OUTPUT_SUBMITTED]
+ # Always produce `submitted` & `started` outputs first:
+ result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED]
conf_outputs = list(rtconfig['skip']['outputs'])
- # Remove started or submitted from our list of outputs:
- for out in {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}:
- if out in conf_outputs:
- conf_outputs.remove(out)
-
- # Send the rest of our outputs, unless they are succeed or failed,
+ # Send the rest of our outputs, unless they are succeeded or failed,
# which we hold back, to prevent warnings about pre-requisites being
# unmet being shown because a "finished" output happens to come first.
- for message in itask.state.outputs.iter_required_messages():
+ for message in itask.state.outputs.iter_required_messages(
+ exclude=(
+ 'succeeded' if TASK_OUTPUT_FAILED in conf_outputs else 'failed')
+ ):
trigger = itask.state.outputs._message_to_trigger[message]
# Send message unless it be succeeded/failed.
if (
- trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED}
+ trigger not in {
+ TASK_OUTPUT_SUCCEEDED,
+ TASK_OUTPUT_FAILED,
+ TASK_OUTPUT_SUBMITTED,
+ TASK_OUTPUT_STARTED,
+ }
and (not conf_outputs or trigger in conf_outputs)
):
result.append(message)
+ # Add optional outputs specified in skip settings:
+ for message, trigger in itask.state.outputs._message_to_trigger.items():
+ if trigger in conf_outputs and trigger not in result:
+ result.append(message)
+
# Send succeeded/failed last.
if TASK_OUTPUT_FAILED in conf_outputs:
result.append(TASK_OUTPUT_FAILED)
- else:
+ elif TASK_OUTPUT_SUCCEEDED and TASK_OUTPUT_SUCCEEDED not in result:
result.append(TASK_OUTPUT_SUCCEEDED)
return result
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 830c8f38025..5bf6ccae66a 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -770,7 +770,7 @@ def process_message(
# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
- if itask.run_mode not in RunMode.JOBLESS_MODES.value:
+ if itask.run_mode != RunMode.SIMULATION.value:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
@@ -1383,8 +1383,12 @@ def _process_message_succeeded(self, itask, event_time, forced):
"run_status": 0,
"time_run_exit": event_time,
})
- # Update mean elapsed time only on task succeeded.
- if itask.summary['started_time'] is not None:
+ # Update mean elapsed time only on task succeeded,
+ # and only if task is running in live mode:
+ if (
+ itask.summary['started_time'] is not None
+ and itask.run_mode == RunMode.LIVE.value
+ ):
itask.tdef.elapsed_times.append(
itask.summary['finished_time'] -
itask.summary['started_time'])
@@ -1463,7 +1467,7 @@ def _process_message_submitted(
)
itask.set_summary_time('submitted', event_time)
- if itask.run_mode in RunMode.JOBLESS_MODES.value:
+ if itask.run_mode == RunMode.SIMULATION.value:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
@@ -1500,7 +1504,7 @@ def _process_message_submitted(
'submitted',
event_time,
)
- if itask.run_mode in RunMode.JOBLESS_MODES.value:
+ if itask.run_mode == RunMode.SIMULATION.value:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index b37570371d8..192e57fc73e 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1009,14 +1009,21 @@ def _nonlive_submit_task_jobs(
workflow: str,
workflow_run_mode: str,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
- """Simulation mode task jobs submission.
+ """Identify task mode and carry out alternative submission
+ paths if required:
+
+ * Simulation: Job submission.
+ * Skip: Entire job lifecycle happens here!
+ * Dummy: Pre-submission preparation (removing task scripts content)
+ before returning to live pathway.
+ * Live: return to main submission pathway without doing anything.
Returns:
lively_tasks:
A list of tasks which require subsequent
processing **as if** they were live mode tasks.
(This includes live and dummy mode tasks)
- ghostly_tasks:
+ nonlive_tasks:
A list of tasks which require no further processing
because their apparent execution is done entirely inside
the scheduler. (This includes skip and simulation mode tasks).
@@ -1027,33 +1034,38 @@ def _nonlive_submit_task_jobs(
now = (now, get_time_string_from_unix_time(now))
for itask in itasks:
- # Handle broadcasts:
+ # Get task config with broadcasts applied:
rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
itask)
+
# Apply task run mode
if workflow_run_mode in RunMode.NON_OVERRIDABLE_MODES.value:
# Task run mode cannot override workflow run-mode sim or dummy:
run_mode = workflow_run_mode
else:
- run_mode = rtconfig['run mode'] or workflow_run_mode
+ # If workflow mode is skip or live and task mode is set,
+ # override workflow mode, else use workflow mode.
+ run_mode = rtconfig.get('run mode', None) or workflow_run_mode
+ # Store the run mode of the this submission:
itask.run_mode = run_mode
- # Submit nonlive tasks, or add live-like tasks to list
- # of tasks to put through live submission pipeline -
- # We decide based on the output of the submit method:
- is_done = False
+ # Submit nonlive tasks, or add live-like (live or dummy)
+ # tasks to list of tasks to put through live
+ # submission pipeline - We decide based on the output
+ # of the submit method:
+ is_nonlive = False
if run_mode == RunMode.DUMMY.value:
- is_done = dummy_submit_task_job(
+ is_nonlive = dummy_submit_task_job(
self, itask, rtconfig, workflow, now)
elif run_mode == RunMode.SIMULATION.value:
- is_done = simulation_submit_task_job(
+ is_nonlive = simulation_submit_task_job(
self, itask, rtconfig, workflow, now)
elif run_mode == RunMode.SKIP.value:
- is_done = skip_submit_task_job(
+ is_nonlive = skip_submit_task_job(
self, itask, rtconfig, now)
# Assign task to list:
- if is_done:
+ if is_nonlive:
nonlive_tasks.append(itask)
else:
lively_tasks.append(itask)
diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py
index 1af37e1554e..7ccabf79986 100644
--- a/cylc/flow/task_outputs.py
+++ b/cylc/flow/task_outputs.py
@@ -37,6 +37,7 @@
if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
+ from typing_extensions import Literal
# Standard task output strings, used for triggering.
@@ -194,6 +195,7 @@ def get_completion_expression(tdef: 'TaskDef') -> str:
def get_optional_outputs(
expression: str,
outputs: Iterable[str],
+ exclude: "Optional[Literal['succeeded'], Literal['failed']]" = None
) -> Dict[str, Optional[bool]]:
"""Determine which outputs in an expression are optional.
@@ -202,6 +204,8 @@ def get_optional_outputs(
The completion expression.
outputs:
All outputs that apply to this task.
+ exclude:
+ Exclude this output.
Returns:
dict: compvar: is_optional
@@ -236,6 +240,9 @@ def get_optional_outputs(
# all completion variables which could appear in the expression
all_compvars = {trigger_to_completion_variable(out) for out in outputs}
+ # Allows exclusion of additional outcomes:
+ extra_excludes = {exclude: False} if exclude else {}
+
return { # output: is_optional
# the outputs that are used in the expression
**{
@@ -247,6 +254,7 @@ def get_optional_outputs(
# (pre-conditions are considered separately)
'expired': False,
'submit_failed': False,
+ **extra_excludes
},
)
for output in used_compvars
@@ -609,7 +617,10 @@ def _is_compvar_complete(self, compvar: str) -> Optional[bool]:
else:
raise KeyError(compvar)
- def iter_required_messages(self) -> Iterator[str]:
+ def iter_required_messages(
+ self,
+ exclude=None
+ ) -> Iterator[str]:
"""Yield task messages that are required for this task to be complete.
Note, in some cases tasks might not have any required messages,
@@ -618,7 +629,9 @@ def iter_required_messages(self) -> Iterator[str]:
for compvar, is_optional in get_optional_outputs(
self._completion_expression,
set(self._message_to_compvar.values()),
+ exclude=exclude
).items():
+ # breakpoint(header=f"=== {compvar=}, {is_optional=} ===")
if is_optional is False:
for message, _compvar in self._message_to_compvar.items():
if _compvar == compvar:
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 5e3ab605eb1..59c0a627299 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -1969,6 +1969,7 @@ def _set_outputs_itask(
rtconfig = bc_mgr.get_updated_rtconfig(itask)
outputs.remove(RunMode.SKIP.value)
skips = get_skip_mode_outputs(itask, rtconfig)
+ itask.run_mode = RunMode.SKIP.value
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs)
outputs = list(set(outputs + skips))
diff --git a/tests/functional/cylc-config/00-simple/section2.stdout b/tests/functional/cylc-config/00-simple/section2.stdout
index 559d1c2556c..049db739435 100644
--- a/tests/functional/cylc-config/00-simple/section2.stdout
+++ b/tests/functional/cylc-config/00-simple/section2.stdout
@@ -15,7 +15,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[meta]]]
title =
description =
@@ -94,7 +94,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[meta]]]
title =
description =
@@ -173,7 +173,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[meta]]]
title =
description =
@@ -252,7 +252,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
@@ -332,7 +332,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
@@ -412,7 +412,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
@@ -492,7 +492,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
@@ -572,7 +572,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
@@ -652,7 +652,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
@@ -732,7 +732,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
@@ -812,7 +812,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
@@ -892,7 +892,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
@@ -972,7 +972,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
- run mode = live
+ run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
diff --git a/tests/functional/run_modes/06-run-mode-overrides.t b/tests/functional/run_modes/06-run-mode-overrides.t
index 3dd12acb584..f6d4faafb30 100644
--- a/tests/functional/run_modes/06-run-mode-overrides.t
+++ b/tests/functional/run_modes/06-run-mode-overrides.t
@@ -52,14 +52,15 @@ workflow_run_ok "${TEST_NAME}:run" \
cylc play "${WORKFLOW_NAME}" \
--no-detach \
--mode skip \
- --set='changemode="live"'
+ --set='changemode="live"' \
+ --final-cycle-point=1000
JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1000"
run_ok "${TEST_NAME}:run mode=live" ls "${JOB_LOGS}/live_"
-run_ok "${TEST_NAME}:run mode=default" ls "${JOB_LOGS}/default_"
+run_fail "${TEST_NAME}:run mode=default" ls "${JOB_LOGS}/default_"
run_fail "${TEST_NAME}:run mode=skip" ls "${JOB_LOGS}/skip_"
-JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1001"
-named_grep_ok "${TEST_NAME}:run mode=live" "===.*===" "${JOB_LOGS}/default_/NN/job.out"
+JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1000"
+named_grep_ok "${TEST_NAME}:run mode=live" "===.*===" "${JOB_LOGS}/live_/NN/job.out"
purge
exit 0
diff --git a/tests/functional/run_modes/06-run-mode-overrides/flow.cylc b/tests/functional/run_modes/06-run-mode-overrides/flow.cylc
index baa4a9f572e..6d1b1258833 100644
--- a/tests/functional/run_modes/06-run-mode-overrides/flow.cylc
+++ b/tests/functional/run_modes/06-run-mode-overrides/flow.cylc
@@ -5,7 +5,6 @@
[scheduling]
initial cycle point = 1000
final cycle point = 1001
- # stop after cycle point = 1000
[[graph]]
R1/1000 = default_ & live_ & skip_ => end
R1/1001 = end[-P1Y] => broadcaster => default_
diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py
index 7d368789a2d..f9ab318e0e6 100644
--- a/tests/integration/run_modes/test_mode_overrides.py
+++ b/tests/integration/run_modes/test_mode_overrides.py
@@ -15,6 +15,14 @@
# along with this program. If not, see .
"""Test that using [runtime][TASK]run mode works in each mode.
+Point 3 of the Skip Mode proposal
+https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+| The run mode should be controlled by a new task configuration
+| [runtime][]run mode with the default being live.
+| As a runtime configuration, this can be defined in the workflow
+| for development / testing purposes or set by cylc broadcast.
+
n.b: This is pretty much a functional test and
probably ought to be labelled as such, but uses the
integration test framework.
@@ -51,8 +59,16 @@ async def test_run_mode_override_from_config(
# Live task has been really submitted:
assert log_filter(log, contains=expect_template.format('live'))
- # Default is the same as live:
- assert log_filter(log, contains=expect_template.format('default'))
+
+ # Default is the same as workflow:
+ if workflow_run_mode == 'live':
+ assert log_filter(log, contains=expect_template.format('default'))
+ else:
+ assert log_filter(
+ log, contains='[1000/default_/01:running] => succeeded')
+ assert not log_filter(
+ log, contains=expect_template.format('default'))
+
# Skip task has run, but not actually been submitted:
assert log_filter(log, contains='[1000/skip_/01:running] => succeeded')
assert not log_filter(log, contains=expect_template.format('skip'))
diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py
index 5f03d1717a5..42ddca128ce 100644
--- a/tests/integration/run_modes/test_nonlive.py
+++ b/tests/integration/run_modes/test_nonlive.py
@@ -14,10 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-# Define here to ensure test doesn't just mirror code:
from typing import Any, Dict
-
+# Define here to ensure test doesn't just mirror code:
KGO = {
'live': {
'flow_nums': '[1]',
@@ -88,3 +87,34 @@ async def test_task_jobs(flow, scheduler, start):
schd.workflow_db_mgr.pub_dao.select_task_job(1, mode))
assert taskdata == kgo, (
f'Mode {mode}: incorrect db entries.')
+
+
+async def test_mean_task_time(flow, scheduler, run, complete):
+ """Non-live tasks are not added to the list of task times,
+ so skipping tasks will not affect how long Cylc expects tasks to run.
+ """
+ schd = scheduler(flow({
+ 'scheduling': {
+ 'initial cycle point': '1000',
+ 'final cycle point': '1002',
+ 'graph': {'P1Y': 'foo'}}
+ }), run_mode='live')
+
+ async with run(schd):
+ tasks = schd.pool.get_tasks()
+ tdef = tasks[0].tdef
+ assert list(tdef.elapsed_times) == []
+
+ # Make the task run in skip mode at one cycle:
+ schd.broadcast_mgr.put_broadcast(
+ ['1000'], ['foo'], [{'run mode': 'skip'}])
+
+ # Submit two tasks:
+ schd.task_job_mgr.submit_task_jobs(
+ schd.workflow,
+ tasks[:2],
+ schd.server.curve_auth,
+ schd.server.client_pub_key_dir
+ )
+ await complete(schd, '10010101T0000Z/foo')
+ assert len(tdef.elapsed_times) == 1
diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py
index bc867e8a08a..bc9f29116f2 100644
--- a/tests/integration/run_modes/test_skip.py
+++ b/tests/integration/run_modes/test_skip.py
@@ -69,6 +69,13 @@ async def test_broadcast_changes_set_skip_outputs(
):
"""When cylc set --out skip is used, task outputs are updated with
broadcasts.
+
+ Skip mode proposal point 4
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+ | The cylc set --out option should accept the skip value which should
+ | set the outputs defined in [runtime][][skip]outputs.
+ | The skip keyword should not be allowed in custom outputs.
"""
wid = flow({
'scheduling': {'graph': {'R1': 'foo:expect_this'}},
@@ -89,3 +96,154 @@ async def test_broadcast_changes_set_skip_outputs(
assert 'expect_this' in foo_outputs
assert foo_outputs['expect_this'] == '(manually completed)'
+
+
+async def test_skip_mode_outputs(
+ flow, scheduler, reftest,
+):
+ """Nearly a functional test of the output emission of skip mode tasks
+
+ Skip mode proposal point 2
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+ """
+ graph = """
+ # By default, all required outputs will be generated
+ # plus succeeded if success is optional:
+ foo? & foo:required_out => success_if_optional & required_outs
+
+ # The outputs submitted and started are always produced
+ # and do not need to be defined in outputs:
+ foo:submitted => submitted_always
+ foo:started => started_always
+
+ # If outputs is specified and does not include either
+ # succeeded or failed then succeeded will be produced.
+ opt:optional_out? => optional_outs_produced
+
+ should_fail:fail => did_fail
+ """
+ wid = flow({
+ 'scheduling': {'graph': {'R1': graph}},
+ 'runtime': {
+ 'root': {
+ 'run mode': 'skip',
+ 'outputs': {
+ 'required_out': 'the plans have been on display...',
+ 'optional_out': 'its only four light years away...'
+ }
+ },
+ 'opt': {
+ 'skip': {
+ 'outputs': 'optional_out'
+ }
+ },
+ 'should_fail': {
+ 'skip': {
+ 'outputs': 'failed'
+ }
+ }
+ }
+ })
+ schd = scheduler(wid, run_mode='live', paused_start=False)
+ assert await reftest(schd) == {
+ ('1/did_fail', ('1/should_fail',),),
+ ('1/foo', None,),
+ ('1/opt', None,),
+ ('1/optional_outs_produced', ('1/opt',),),
+ ('1/required_outs', ('1/foo', '1/foo',),),
+ ('1/should_fail', None,),
+ ('1/started_always', ('1/foo',),),
+ ('1/submitted_always', ('1/foo',),),
+ ('1/success_if_optional', ('1/foo', '1/foo',),),
+ }
+
+
+async def test_doesnt_release_held_tasks(
+ one_conf, flow, scheduler, start, log_filter
+):
+ """Point 5 of the proposal
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+ | Tasks with run mode = skip will continue to abide by the is_held
+ | flag as normal.
+
+ """
+ schd = scheduler(flow(one_conf), run_mode='skip')
+ async with start(schd) as log:
+ itask = schd.pool.get_tasks()[0]
+ msg = 'held tasks shoudn\'t {}'
+
+ # Set task to held and check submission in skip mode doesn't happen:
+ itask.state.is_held = True
+ schd.task_job_mgr.submit_task_jobs(
+ schd.workflow,
+ [itask],
+ schd.server.curve_auth,
+ schd.server.client_pub_key_dir,
+ run_mode=schd.get_run_mode()
+ )
+ assert not log_filter(log, contains='=> running'), msg.format('run')
+ assert not log_filter(log, contains='=> succeeded'), msg.format(
+ 'succeed')
+
+ # Release held task and assert that it now skips successfully:
+ schd.pool.release_held_tasks(['1/one'])
+ schd.task_job_mgr.submit_task_jobs(
+ schd.workflow,
+ [itask],
+ schd.server.curve_auth,
+ schd.server.client_pub_key_dir,
+ run_mode=schd.get_run_mode()
+ )
+ assert log_filter(log, contains='=> running'), msg.format('run')
+ assert log_filter(log, contains='=> succeeded'), msg.format('succeed')
+
+
+async def test_force_trigger_doesnt_change_mode(
+ flow, scheduler, run, complete
+):
+ """Point 6 from the skip mode proposal
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+ | Force-triggering a task will not override the run mode.
+ """
+ wid = flow({
+ 'scheduling': {'graph': {'R1': 'slow => skip'}},
+ 'runtime': {
+ 'slow': {'script': 'sleep 6'},
+ 'skip': {'script': 'exit 1', 'run mode': 'skip'}
+ }
+ })
+ schd = scheduler(wid, run_mode='live', paused_start=False)
+ async with run(schd):
+ schd.pool.force_trigger_tasks(['1/skip'], [1])
+ # This will timeout if the skip task has become live on triggering:
+ await complete(schd, '1/skip', timeout=6)
+
+
+async def test_prereqs_marked_satisfied_by_skip_mode(
+ flow, scheduler, start, log_filter, complete
+):
+ """Point 8 from the skip mode proposal
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+ | When tasks are run in skip mode, the prerequisites which correspond
+ | to the outputs they generate should be marked as "satisfied by skip mode"
+ | rather than "satisfied naturally" for provenance reasons.
+ """
+ schd = scheduler(flow({
+ 'scheduling': {'graph': {'R1': 'foo => bar'}}
+ }), run_mode='skip')
+
+ async with start(schd) as log:
+ foo, = schd.pool.get_tasks()
+ schd.task_job_mgr.submit_task_jobs(
+ schd.workflow,
+ [foo],
+ schd.server.curve_auth,
+ schd.server.client_pub_key_dir,
+ run_mode=schd.get_run_mode()
+ )
+ bar, = schd.pool.get_tasks()
+ satisfied_message, = bar.state.prerequisites[0]._satisfied.values()
+ assert satisfied_message == 'satisfied by skip mode'
diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py
index fef15e3e3dc..3da32733ffc 100644
--- a/tests/integration/utils/flow_tools.py
+++ b/tests/integration/utils/flow_tools.py
@@ -115,6 +115,10 @@ def __make_scheduler(id_: str, **opts: Any) -> Scheduler:
schd.workflow_db_mgr.on_workflow_shutdown()
+def caplogprinter(caplog):
+ _ = [print(i) for i in caplog.messages]
+
+
@asynccontextmanager
async def _start_flow(
caplog: Optional[pytest.LogCaptureFixture],
@@ -124,6 +128,8 @@ async def _start_flow(
"""Start a scheduler but don't set the main loop running."""
if caplog:
caplog.set_level(level, CYLC_LOG)
+ # Debug functionality
+ caplog.print = lambda: caplogprinter(caplog)
await schd.install()
@@ -154,6 +160,8 @@ async def _run_flow(
"""Start a scheduler and set the main loop running."""
if caplog:
caplog.set_level(level, CYLC_LOG)
+ # Debug functionality
+ caplog.print = lambda: caplogprinter(caplog)
await schd.install()
diff --git a/tests/unit/run_modes/test_nonlive.py b/tests/unit/run_modes/test_nonlive.py
index 6056da1fd83..71695f2c96b 100644
--- a/tests/unit/run_modes/test_nonlive.py
+++ b/tests/unit/run_modes/test_nonlive.py
@@ -25,6 +25,13 @@ def test_run_mode_validate_checks(monkeypatch, caplog):
"""It warns us if we've set a task config to nonlive mode.
(And not otherwise)
+
+ Point 3 from the skip mode proposal
+ https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
+
+ | If the run mode is set to simulation or skip in the workflow
+ | configuration, then cylc validate and cylc lint should produce
+ | warning (similar to development features in other languages / systems).
"""
taskdefs = {
f'{run_mode}_task': SimpleNamespace(
diff --git a/tests/unit/run_modes/test_skip.py b/tests/unit/run_modes/test_skip.py
index f83eaf9d476..f5ad89381d7 100644
--- a/tests/unit/run_modes/test_skip.py
+++ b/tests/unit/run_modes/test_skip.py
@@ -86,12 +86,7 @@ def test_process_outputs(outputs, required, expect):
2. Sends every required output.
3. If failed is set send failed
4. If failed in not set send succeeded.
-
- n.b: The real process message function sends the TASK_OUTPUT_STARTED
- message for free, so there is no reference to that here.
"""
-
-
# Create a mocked up task-proxy:
rtconf = {'skip': {'outputs': outputs}}
itask = SimpleNamespace(
@@ -99,8 +94,8 @@ def test_process_outputs(outputs, required, expect):
rtconfig=rtconf),
state=SimpleNamespace(
outputs=SimpleNamespace(
- iter_required_messages=lambda: iter(required),
+ iter_required_messages=lambda exclude: iter(required),
_message_to_trigger={v: v for v in required}
)))
- assert process_outputs(itask, rtconf) == ['submitted'] + expect
+ assert process_outputs(itask, rtconf) == ['submitted', 'started'] + expect