Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Sep 3, 2024
1 parent 6d659f7 commit afd62ab
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 26 deletions.
38 changes: 21 additions & 17 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ def submit_task_job(
'install target': 'localhost',
'hosts': ['localhost'],
'disable task event handlers':
rtconfig['skip']['disable task event handlers']
rtconfig['skip']['disable task event handlers'],
'execution polling intervals': []
}
itask.platform['name'] = RunMode.SKIP.value
itask.summary['job_runner_name'] = RunMode.SKIP.value
itask.run_mode = RunMode.SKIP.value
task_job_mgr.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED,
)
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
Expand Down Expand Up @@ -95,34 +93,40 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
A list of outputs to emit.
"""
# Always produce `submitted` output:
# (No need to produce `started` as this is automatically handled by
# task event manager for jobless modes)
result: List[str] = [TASK_OUTPUT_SUBMITTED]
# Always produce `submitted` & `started` outputs first:
result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED]

conf_outputs = list(rtconfig['skip']['outputs'])

# Remove started or submitted from our list of outputs:
for out in {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}:
if out in conf_outputs:
conf_outputs.remove(out)

# Send the rest of our outputs, unless they are succeed or failed,
# Send the rest of our outputs, unless they are succeeded or failed,
# which we hold back, to prevent warnings about pre-requisites being
# unmet being shown because a "finished" output happens to come first.
for message in itask.state.outputs.iter_required_messages():
for message in itask.state.outputs.iter_required_messages(
exclude=(
'succeeded' if TASK_OUTPUT_FAILED in conf_outputs else 'failed')
):
trigger = itask.state.outputs._message_to_trigger[message]
# Send message unless it be succeeded/failed.
if (
trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED}
trigger not in {
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
and (not conf_outputs or trigger in conf_outputs)
):
result.append(message)

# Add optional outputs specified in skip settings:
for message, trigger in itask.state.outputs._message_to_trigger.items():
if trigger in conf_outputs and trigger not in result:
result.append(message)

# Send succeeded/failed last.
if TASK_OUTPUT_FAILED in conf_outputs:
result.append(TASK_OUTPUT_FAILED)
else:
elif TASK_OUTPUT_SUCCEEDED and TASK_OUTPUT_SUCCEEDED not in result:
result.append(TASK_OUTPUT_SUCCEEDED)

return result
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def process_message(

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
if itask.run_mode not in RunMode.JOBLESS_MODES.value:
if itask.run_mode not in RunMode.SIMULATION.value:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
Expand Down Expand Up @@ -1464,7 +1464,7 @@ def _process_message_submitted(
)

itask.set_summary_time('submitted', event_time)
if itask.run_mode in RunMode.JOBLESS_MODES.value:
if itask.run_mode in RunMode.SIMULATION.value:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
Expand Down Expand Up @@ -1501,7 +1501,7 @@ def _process_message_submitted(
'submitted',
event_time,
)
if itask.run_mode in RunMode.JOBLESS_MODES.value:
if itask.run_mode in RunMode.SIMULATION.value:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,6 @@ def _nonlive_submit_task_jobs(
else:
run_mode = rtconfig['run mode'] or workflow_run_mode
itask.run_mode = run_mode

# Submit nonlive tasks, or add live-like tasks to list
# of tasks to put through live submission pipeline -
# We decide based on the output of the submit method:
Expand Down
15 changes: 14 additions & 1 deletion cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
from typing_extensions import Literal


# Standard task output strings, used for triggering.
Expand Down Expand Up @@ -194,6 +195,7 @@ def get_completion_expression(tdef: 'TaskDef') -> str:
def get_optional_outputs(
expression: str,
outputs: Iterable[str],
exclude: "Optional[Literal['succeeded'], Literal['failed']]" = None
) -> Dict[str, Optional[bool]]:
"""Determine which outputs in an expression are optional.
Expand All @@ -202,6 +204,8 @@ def get_optional_outputs(
The completion expression.
outputs:
All outputs that apply to this task.
exclude:
Exclude this output.
Returns:
dict: compvar: is_optional
Expand Down Expand Up @@ -236,6 +240,9 @@ def get_optional_outputs(
# all completion variables which could appear in the expression
all_compvars = {trigger_to_completion_variable(out) for out in outputs}

# Allows exclusion of additional outcomes:
extra_excludes = {exclude: False} if exclude else {}

return { # output: is_optional
# the outputs that are used in the expression
**{
Expand All @@ -247,6 +254,7 @@ def get_optional_outputs(
# (pre-conditions are considered separately)
'expired': False,
'submit_failed': False,
**extra_excludes
},
)
for output in used_compvars
Expand Down Expand Up @@ -609,7 +617,10 @@ def _is_compvar_complete(self, compvar: str) -> Optional[bool]:
else:
raise KeyError(compvar)

def iter_required_messages(self) -> Iterator[str]:
def iter_required_messages(
self,
exclude=None
) -> Iterator[str]:
"""Yield task messages that are required for this task to be complete.
Note, in some cases tasks might not have any required messages,
Expand All @@ -618,7 +629,9 @@ def iter_required_messages(self) -> Iterator[str]:
for compvar, is_optional in get_optional_outputs(
self._completion_expression,
set(self._message_to_compvar.values()),
exclude=exclude
).items():
# breakpoint(header=f"=== {compvar=}, {is_optional=} ===")
if is_optional is False:
for message, _compvar in self._message_to_compvar.items():
if _compvar == compvar:
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,7 @@ def _set_outputs_itask(
rtconfig = bc_mgr.get_updated_rtconfig(itask)
outputs.remove(RunMode.SKIP.value)
skips = get_skip_mode_outputs(itask, rtconfig)
itask.run_mode = RunMode.SKIP.value
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs)
outputs = list(set(outputs + skips))
Expand Down
63 changes: 63 additions & 0 deletions tests/integration/run_modes/test_skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ async def test_broadcast_changes_set_skip_outputs(
):
"""When cylc set --out skip is used, task outputs are updated with
broadcasts.
Skip mode proposal point 4
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
"""
wid = flow({
'scheduling': {'graph': {'R1': 'foo:expect_this'}},
Expand All @@ -89,3 +92,63 @@ async def test_broadcast_changes_set_skip_outputs(

assert 'expect_this' in foo_outputs
assert foo_outputs['expect_this'] == '(manually completed)'


async def test_skip_mode_outputs(
flow, scheduler, reftest,
):
"""Nearly a functional test of the output emission of skip mode tasks
Skip mode proposal point 2
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
"""
graph = """
# By default, all required outputs will be generated
# plus succeeded if success is optional:
foo? & foo:required_out => success_if_optional & required_outs
# The outputs submitted and started are always produced
# and do not need to be defined in outputs:
foo:submitted => submitted_always
foo:started => started_always
# If outputs is specified and does not include either
# succeeded or failed then succeeded will be produced.
opt:optional_out? => optional_outs_produced
should_fail:fail => did_fail
"""
wid = flow({
'scheduling': {'graph': {'R1': graph}},
'runtime': {
'root': {
'run mode': 'skip',
'outputs': {
'required_out': 'the plans have been on display...',
'optional_out': 'its only four light years away...'
}
},
'opt': {
'skip': {
'outputs': 'optional_out'
}
},
'should_fail': {
'skip': {
'outputs': 'failed'
}
}
}
})
schd = scheduler(wid, run_mode='live', paused_start=False)
assert await reftest(schd) == {
('1/did_fail', ('1/should_fail',),),
('1/foo', None,),
('1/opt', None,),
('1/optional_outs_produced', ('1/opt',),),
('1/required_outs', ('1/foo', '1/foo',),),
('1/should_fail', None,),
('1/started_always', ('1/foo',),),
('1/submitted_always', ('1/foo',),),
('1/success_if_optional', ('1/foo', '1/foo',),),
}
6 changes: 2 additions & 4 deletions tests/unit/run_modes/test_skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,15 @@ def test_process_outputs(outputs, required, expect):
n.b: The real process message function sends the TASK_OUTPUT_STARTED
message for free, so there is no reference to that here.
"""


# Create a mocked up task-proxy:
rtconf = {'skip': {'outputs': outputs}}
itask = SimpleNamespace(
tdef=SimpleNamespace(
rtconfig=rtconf),
state=SimpleNamespace(
outputs=SimpleNamespace(
iter_required_messages=lambda: iter(required),
iter_required_messages=lambda exclude: iter(required),
_message_to_trigger={v: v for v in required}
)))

assert process_outputs(itask, rtconf) == ['submitted'] + expect
assert process_outputs(itask, rtconf) == ['submitted', 'started'] + expect

0 comments on commit afd62ab

Please sign in to comment.