From afd62ab89ace580f5966af6b1cdc7a4868fe52ce Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:56:25 +0100 Subject: [PATCH] wip --- cylc/flow/run_modes/skip.py | 38 +++++++------- cylc/flow/task_events_mgr.py | 6 +-- cylc/flow/task_job_mgr.py | 1 - cylc/flow/task_outputs.py | 15 +++++- cylc/flow/task_pool.py | 1 + tests/integration/run_modes/test_skip.py | 63 ++++++++++++++++++++++++ tests/unit/run_modes/test_skip.py | 6 +-- 7 files changed, 104 insertions(+), 26 deletions(-) diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 1b700c8d838..262f0c4c034 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -56,14 +56,12 @@ def submit_task_job( 'install target': 'localhost', 'hosts': ['localhost'], 'disable task event handlers': - rtconfig['skip']['disable task event handlers'] + rtconfig['skip']['disable task event handlers'], + 'execution polling intervals': [] } itask.platform['name'] = RunMode.SKIP.value itask.summary['job_runner_name'] = RunMode.SKIP.value itask.run_mode = RunMode.SKIP.value - task_job_mgr.task_events_mgr.process_message( - itask, INFO, TASK_OUTPUT_SUBMITTED, - ) task_job_mgr.workflow_db_mgr.put_insert_task_jobs( itask, { 'time_submit': now[1], @@ -95,34 +93,40 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]: A list of outputs to emit. """ - # Always produce `submitted` output: - # (No need to produce `started` as this is automatically handled by - # task event manager for jobless modes) - result: List[str] = [TASK_OUTPUT_SUBMITTED] + # Always produce `submitted` & `started` outputs first: + result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED] conf_outputs = list(rtconfig['skip']['outputs']) - # Remove started or submitted from our list of outputs: - for out in {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}: - if out in conf_outputs: - conf_outputs.remove(out) - - # Send the rest of our outputs, unless they are succeed or failed, + # Send the rest of our outputs, unless they are succeeded or failed, # which we hold back, to prevent warnings about pre-requisites being # unmet being shown because a "finished" output happens to come first. - for message in itask.state.outputs.iter_required_messages(): + for message in itask.state.outputs.iter_required_messages( + exclude=( + 'succeeded' if TASK_OUTPUT_FAILED in conf_outputs else 'failed') + ): trigger = itask.state.outputs._message_to_trigger[message] # Send message unless it be succeeded/failed. if ( - trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED} + trigger not in { + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_STARTED, + } and (not conf_outputs or trigger in conf_outputs) ): result.append(message) + # Add optional outputs specified in skip settings: + for message, trigger in itask.state.outputs._message_to_trigger.items(): + if trigger in conf_outputs and trigger not in result: + result.append(message) + # Send succeeded/failed last. if TASK_OUTPUT_FAILED in conf_outputs: result.append(TASK_OUTPUT_FAILED) - else: + elif TASK_OUTPUT_SUCCEEDED and TASK_OUTPUT_SUCCEEDED not in result: result.append(TASK_OUTPUT_SUCCEEDED) return result diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index a1473e33e8f..6b5ce5b9b79 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -770,7 +770,7 @@ def process_message( # ... but either way update the job ID in the job proxy (it only # comes in via the submission message). - if itask.run_mode not in RunMode.JOBLESS_MODES.value: + if itask.run_mode not in RunMode.SIMULATION.value: job_tokens = itask.tokens.duplicate( job=str(itask.submit_num) ) @@ -1464,7 +1464,7 @@ def _process_message_submitted( ) itask.set_summary_time('submitted', event_time) - if itask.run_mode in RunMode.JOBLESS_MODES.value: + if itask.run_mode in RunMode.SIMULATION.value: # Simulate job started as well. itask.set_summary_time('started', event_time) if itask.state_reset(TASK_STATUS_RUNNING, forced=forced): @@ -1501,7 +1501,7 @@ def _process_message_submitted( 'submitted', event_time, ) - if itask.run_mode in RunMode.JOBLESS_MODES.value: + if itask.run_mode in RunMode.SIMULATION.value: # Simulate job started as well. self.data_store_mgr.delta_job_time( job_tokens, diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index b37570371d8..3d399563bb2 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -1037,7 +1037,6 @@ def _nonlive_submit_task_jobs( else: run_mode = rtconfig['run mode'] or workflow_run_mode itask.run_mode = run_mode - # Submit nonlive tasks, or add live-like tasks to list # of tasks to put through live submission pipeline - # We decide based on the output of the submit method: diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 1af37e1554e..7ccabf79986 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: from cylc.flow.taskdef import TaskDef + from typing_extensions import Literal # Standard task output strings, used for triggering. @@ -194,6 +195,7 @@ def get_completion_expression(tdef: 'TaskDef') -> str: def get_optional_outputs( expression: str, outputs: Iterable[str], + exclude: "Optional[Literal['succeeded'], Literal['failed']]" = None ) -> Dict[str, Optional[bool]]: """Determine which outputs in an expression are optional. @@ -202,6 +204,8 @@ def get_optional_outputs( The completion expression. outputs: All outputs that apply to this task. + exclude: + Exclude this output. Returns: dict: compvar: is_optional @@ -236,6 +240,9 @@ def get_optional_outputs( # all completion variables which could appear in the expression all_compvars = {trigger_to_completion_variable(out) for out in outputs} + # Allows exclusion of additional outcomes: + extra_excludes = {exclude: False} if exclude else {} + return { # output: is_optional # the outputs that are used in the expression **{ @@ -247,6 +254,7 @@ def get_optional_outputs( # (pre-conditions are considered separately) 'expired': False, 'submit_failed': False, + **extra_excludes }, ) for output in used_compvars @@ -609,7 +617,10 @@ def _is_compvar_complete(self, compvar: str) -> Optional[bool]: else: raise KeyError(compvar) - def iter_required_messages(self) -> Iterator[str]: + def iter_required_messages( + self, + exclude=None + ) -> Iterator[str]: """Yield task messages that are required for this task to be complete. Note, in some cases tasks might not have any required messages, @@ -618,7 +629,9 @@ def iter_required_messages(self) -> Iterator[str]: for compvar, is_optional in get_optional_outputs( self._completion_expression, set(self._message_to_compvar.values()), + exclude=exclude ).items(): + # breakpoint(header=f"=== {compvar=}, {is_optional=} ===") if is_optional is False: for message, _compvar in self._message_to_compvar.items(): if _compvar == compvar: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 5e3ab605eb1..59c0a627299 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1969,6 +1969,7 @@ def _set_outputs_itask( rtconfig = bc_mgr.get_updated_rtconfig(itask) outputs.remove(RunMode.SKIP.value) skips = get_skip_mode_outputs(itask, rtconfig) + itask.run_mode = RunMode.SKIP.value outputs = self._standardise_outputs( itask.point, itask.tdef, outputs) outputs = list(set(outputs + skips)) diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index bc867e8a08a..6b18c0d467b 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -69,6 +69,9 @@ async def test_broadcast_changes_set_skip_outputs( ): """When cylc set --out skip is used, task outputs are updated with broadcasts. + + Skip mode proposal point 4 + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md """ wid = flow({ 'scheduling': {'graph': {'R1': 'foo:expect_this'}}, @@ -89,3 +92,63 @@ async def test_broadcast_changes_set_skip_outputs( assert 'expect_this' in foo_outputs assert foo_outputs['expect_this'] == '(manually completed)' + + +async def test_skip_mode_outputs( + flow, scheduler, reftest, +): + """Nearly a functional test of the output emission of skip mode tasks + + Skip mode proposal point 2 + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md + """ + graph = """ + # By default, all required outputs will be generated + # plus succeeded if success is optional: + foo? & foo:required_out => success_if_optional & required_outs + + # The outputs submitted and started are always produced + # and do not need to be defined in outputs: + foo:submitted => submitted_always + foo:started => started_always + + # If outputs is specified and does not include either + # succeeded or failed then succeeded will be produced. + opt:optional_out? => optional_outs_produced + + should_fail:fail => did_fail + """ + wid = flow({ + 'scheduling': {'graph': {'R1': graph}}, + 'runtime': { + 'root': { + 'run mode': 'skip', + 'outputs': { + 'required_out': 'the plans have been on display...', + 'optional_out': 'its only four light years away...' + } + }, + 'opt': { + 'skip': { + 'outputs': 'optional_out' + } + }, + 'should_fail': { + 'skip': { + 'outputs': 'failed' + } + } + } + }) + schd = scheduler(wid, run_mode='live', paused_start=False) + assert await reftest(schd) == { + ('1/did_fail', ('1/should_fail',),), + ('1/foo', None,), + ('1/opt', None,), + ('1/optional_outs_produced', ('1/opt',),), + ('1/required_outs', ('1/foo', '1/foo',),), + ('1/should_fail', None,), + ('1/started_always', ('1/foo',),), + ('1/submitted_always', ('1/foo',),), + ('1/success_if_optional', ('1/foo', '1/foo',),), + } diff --git a/tests/unit/run_modes/test_skip.py b/tests/unit/run_modes/test_skip.py index f83eaf9d476..58d70925ba9 100644 --- a/tests/unit/run_modes/test_skip.py +++ b/tests/unit/run_modes/test_skip.py @@ -90,8 +90,6 @@ def test_process_outputs(outputs, required, expect): n.b: The real process message function sends the TASK_OUTPUT_STARTED message for free, so there is no reference to that here. """ - - # Create a mocked up task-proxy: rtconf = {'skip': {'outputs': outputs}} itask = SimpleNamespace( @@ -99,8 +97,8 @@ def test_process_outputs(outputs, required, expect): rtconfig=rtconf), state=SimpleNamespace( outputs=SimpleNamespace( - iter_required_messages=lambda: iter(required), + iter_required_messages=lambda exclude: iter(required), _message_to_trigger={v: v for v in required} ))) - assert process_outputs(itask, rtconf) == ['submitted'] + expect + assert process_outputs(itask, rtconf) == ['submitted', 'started'] + expect