Skip to content

Commit

Permalink
don't record run times for non-live tasks
Browse files Browse the repository at this point in the history
re-enable workflow mode = skip doing something useufl

improve testing
  • Loading branch information
wxtim committed Sep 5, 2024
1 parent 7b8e678 commit 660ca3a
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 66 deletions.
4 changes: 2 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,8 +1338,8 @@ def get_script_common_text(this: str, example: Optional[str] = None):
)
Conf(
'run mode', VDR.V_STRING,
options=list(RunMode.OVERRIDING_MODES.value),
default=RunMode.LIVE.value,
options=list(RunMode.OVERRIDING_MODES.value) + [''],
default='',
desc=f'''
For a workflow run in live mode run this task in skip
mode.
Expand Down
42 changes: 25 additions & 17 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def submit_task_job(
Returns:
True - indicating that TaskJobManager need take no further action.
"""
# Don't do anything if task is held:
if itask.state.is_held:
return True

task_job_mgr._set_retry_timers(itask, rtconfig)
itask.summary['started_time'] = now[0]
itask.waiting_on_job_prep = False
Expand All @@ -56,14 +60,12 @@ def submit_task_job(
'install target': 'localhost',
'hosts': ['localhost'],
'disable task event handlers':
rtconfig['skip']['disable task event handlers']
rtconfig['skip']['disable task event handlers'],
'execution polling intervals': []
}
itask.platform['name'] = RunMode.SKIP.value
itask.summary['job_runner_name'] = RunMode.SKIP.value
itask.run_mode = RunMode.SKIP.value
task_job_mgr.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED,
)
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
Expand Down Expand Up @@ -95,34 +97,40 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]:
A list of outputs to emit.
"""
# Always produce `submitted` output:
# (No need to produce `started` as this is automatically handled by
# task event manager for jobless modes)
result: List[str] = [TASK_OUTPUT_SUBMITTED]
# Always produce `submitted` & `started` outputs first:
result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED]

conf_outputs = list(rtconfig['skip']['outputs'])

# Remove started or submitted from our list of outputs:
for out in {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}:
if out in conf_outputs:
conf_outputs.remove(out)

# Send the rest of our outputs, unless they are succeed or failed,
# Send the rest of our outputs, unless they are succeeded or failed,
# which we hold back, to prevent warnings about pre-requisites being
# unmet being shown because a "finished" output happens to come first.
for message in itask.state.outputs.iter_required_messages():
for message in itask.state.outputs.iter_required_messages(
exclude=(
'succeeded' if TASK_OUTPUT_FAILED in conf_outputs else 'failed')
):
trigger = itask.state.outputs._message_to_trigger[message]
# Send message unless it be succeeded/failed.
if (
trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED}
trigger not in {
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
and (not conf_outputs or trigger in conf_outputs)
):
result.append(message)

# Add optional outputs specified in skip settings:
for message, trigger in itask.state.outputs._message_to_trigger.items():
if trigger in conf_outputs and trigger not in result:
result.append(message)

# Send succeeded/failed last.
if TASK_OUTPUT_FAILED in conf_outputs:
result.append(TASK_OUTPUT_FAILED)
else:
elif TASK_OUTPUT_SUCCEEDED and TASK_OUTPUT_SUCCEEDED not in result:
result.append(TASK_OUTPUT_SUCCEEDED)

return result
Expand Down
14 changes: 9 additions & 5 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def process_message(

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
if itask.run_mode not in RunMode.JOBLESS_MODES.value:
if itask.run_mode != RunMode.SIMULATION.value:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
Expand Down Expand Up @@ -1383,8 +1383,12 @@ def _process_message_succeeded(self, itask, event_time, forced):
"run_status": 0,
"time_run_exit": event_time,
})
# Update mean elapsed time only on task succeeded.
if itask.summary['started_time'] is not None:
# Update mean elapsed time only on task succeeded,
# and only if task is running in live mode:
if (
itask.summary['started_time'] is not None
and itask.run_mode == RunMode.LIVE.value
):
itask.tdef.elapsed_times.append(
itask.summary['finished_time'] -
itask.summary['started_time'])
Expand Down Expand Up @@ -1463,7 +1467,7 @@ def _process_message_submitted(
)

itask.set_summary_time('submitted', event_time)
if itask.run_mode in RunMode.JOBLESS_MODES.value:
if itask.run_mode == RunMode.SIMULATION.value:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
Expand Down Expand Up @@ -1500,7 +1504,7 @@ def _process_message_submitted(
'submitted',
event_time,
)
if itask.run_mode in RunMode.JOBLESS_MODES.value:
if itask.run_mode == RunMode.SIMULATION.value:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
Expand Down
36 changes: 24 additions & 12 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,14 +1009,21 @@ def _nonlive_submit_task_jobs(
workflow: str,
workflow_run_mode: str,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Simulation mode task jobs submission.
"""Identify task mode and carry out alternative submission
paths if required:
* Simulation: Job submission.
* Skip: Entire job lifecycle happens here!
* Dummy: Pre-submission preparation (removing task scripts content)
before returning to live pathway.
* Live: return to main submission pathway without doing anything.
Returns:
lively_tasks:
A list of tasks which require subsequent
processing **as if** they were live mode tasks.
(This includes live and dummy mode tasks)
ghostly_tasks:
nonlive_tasks:
A list of tasks which require no further processing
because their apparent execution is done entirely inside
the scheduler. (This includes skip and simulation mode tasks).
Expand All @@ -1027,33 +1034,38 @@ def _nonlive_submit_task_jobs(
now = (now, get_time_string_from_unix_time(now))

for itask in itasks:
# Handle broadcasts:
# Get task config with broadcasts applied:
rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
itask)

# Apply task run mode
if workflow_run_mode in RunMode.NON_OVERRIDABLE_MODES.value:
# Task run mode cannot override workflow run-mode sim or dummy:
run_mode = workflow_run_mode
else:
run_mode = rtconfig['run mode'] or workflow_run_mode
# If workflow mode is skip or live and task mode is set,
# override workflow mode, else use workflow mode.
run_mode = rtconfig.get('run mode', None) or workflow_run_mode
# Store the run mode of the this submission:
itask.run_mode = run_mode

# Submit nonlive tasks, or add live-like tasks to list
# of tasks to put through live submission pipeline -
# We decide based on the output of the submit method:
is_done = False
# Submit nonlive tasks, or add live-like (live or dummy)
# tasks to list of tasks to put through live
# submission pipeline - We decide based on the output
# of the submit method:
is_nonlive = False
if run_mode == RunMode.DUMMY.value:
is_done = dummy_submit_task_job(
is_nonlive = dummy_submit_task_job(
self, itask, rtconfig, workflow, now)
elif run_mode == RunMode.SIMULATION.value:
is_done = simulation_submit_task_job(
is_nonlive = simulation_submit_task_job(
self, itask, rtconfig, workflow, now)
elif run_mode == RunMode.SKIP.value:
is_done = skip_submit_task_job(
is_nonlive = skip_submit_task_job(
self, itask, rtconfig, now)

# Assign task to list:
if is_done:
if is_nonlive:
nonlive_tasks.append(itask)
else:
lively_tasks.append(itask)
Expand Down
15 changes: 14 additions & 1 deletion cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
from typing_extensions import Literal


# Standard task output strings, used for triggering.
Expand Down Expand Up @@ -194,6 +195,7 @@ def get_completion_expression(tdef: 'TaskDef') -> str:
def get_optional_outputs(
expression: str,
outputs: Iterable[str],
exclude: "Optional[Literal['succeeded'], Literal['failed']]" = None
) -> Dict[str, Optional[bool]]:
"""Determine which outputs in an expression are optional.
Expand All @@ -202,6 +204,8 @@ def get_optional_outputs(
The completion expression.
outputs:
All outputs that apply to this task.
exclude:
Exclude this output.
Returns:
dict: compvar: is_optional
Expand Down Expand Up @@ -236,6 +240,9 @@ def get_optional_outputs(
# all completion variables which could appear in the expression
all_compvars = {trigger_to_completion_variable(out) for out in outputs}

# Allows exclusion of additional outcomes:
extra_excludes = {exclude: False} if exclude else {}

return { # output: is_optional
# the outputs that are used in the expression
**{
Expand All @@ -247,6 +254,7 @@ def get_optional_outputs(
# (pre-conditions are considered separately)
'expired': False,
'submit_failed': False,
**extra_excludes
},
)
for output in used_compvars
Expand Down Expand Up @@ -609,7 +617,10 @@ def _is_compvar_complete(self, compvar: str) -> Optional[bool]:
else:
raise KeyError(compvar)

def iter_required_messages(self) -> Iterator[str]:
def iter_required_messages(
self,
exclude=None
) -> Iterator[str]:
"""Yield task messages that are required for this task to be complete.
Note, in some cases tasks might not have any required messages,
Expand All @@ -618,7 +629,9 @@ def iter_required_messages(self) -> Iterator[str]:
for compvar, is_optional in get_optional_outputs(
self._completion_expression,
set(self._message_to_compvar.values()),
exclude=exclude
).items():
# breakpoint(header=f"=== {compvar=}, {is_optional=} ===")
if is_optional is False:
for message, _compvar in self._message_to_compvar.items():
if _compvar == compvar:
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,7 @@ def _set_outputs_itask(
rtconfig = bc_mgr.get_updated_rtconfig(itask)
outputs.remove(RunMode.SKIP.value)
skips = get_skip_mode_outputs(itask, rtconfig)
itask.run_mode = RunMode.SKIP.value
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs)
outputs = list(set(outputs + skips))
Expand Down
26 changes: 13 additions & 13 deletions tests/functional/cylc-config/00-simple/section2.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[meta]]]
title =
description =
Expand Down Expand Up @@ -94,7 +94,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[meta]]]
title =
description =
Expand Down Expand Up @@ -173,7 +173,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[meta]]]
title =
description =
Expand Down Expand Up @@ -252,7 +252,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
Expand Down Expand Up @@ -332,7 +332,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
Expand Down Expand Up @@ -412,7 +412,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
Expand Down Expand Up @@ -492,7 +492,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
Expand Down Expand Up @@ -572,7 +572,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
Expand Down Expand Up @@ -652,7 +652,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
Expand Down Expand Up @@ -732,7 +732,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
Expand Down Expand Up @@ -812,7 +812,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = serial
[[[meta]]]
Expand Down Expand Up @@ -892,7 +892,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
Expand Down Expand Up @@ -972,7 +972,7 @@
execution time limit =
submission polling intervals =
submission retry delays =
run mode = live
run mode =
[[[directives]]]
job_type = parallel
[[[meta]]]
Expand Down
Loading

0 comments on commit 660ca3a

Please sign in to comment.