Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optional outputs extension #5651

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changes.d/5651.break.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Refinements to optional outputs:

* If a task has optional outputs, require at least one of them to be generated. This is a breaking
change for some workflows e.g. `a? => b` where `a:fail?` is not referenced
in the graph. To fix, set `[runtime][a]completion = succeeded or failed`.
* Allow the condition for task completion to be manually specified.
* Allow expiry events to be detected for tasks as soon as any of their
prerequisites are satisfied (rather than when all of their prerequisites
are satisfied).
109 changes: 109 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,115 @@ def get_script_common_text(this: str, example: Optional[str] = None):
can be explicitly configured to provide or override default
settings for all tasks in the workflow.
'''):
Conf('completion', VDR.V_STRING, desc='''
Define the condition for task completion.

The completion condition is evaluated when a task is finished.
It is a validation check which confirms that the task has
generated the outputs it was expected to.

If the task fails this check it is considered
:term:`incomplete` and may cause the workflow to
:term:`stall`, alerting you that something has gone wrong which
requires investigation.

By default, the completion condition ensures that the
task succeeds - i.e., if the task fails, then the workflow will
stall.

If :ref:`User Guide Optional Outputs` are defined in the graph,
then by default one or more of the optional outputs must also
be generated in order for the task to be completed.

E.g., in this example, the task ``foo`` must succeed *and*
yield one of the outputs ``a`` or ``b`` in order to be
considered complete:

.. code-block:: cylc-graph

foo:a? => a
foo:b? => b

In Python syntax that condition looks like this:

.. code-block:: python

# the task must succeed and yield at least one of the
# outputs a or b
succeeded and (a or b)

The ``completion`` configuration allows you to override the
default completion to suit your needs.

E.g., in this example, the task ``foo`` must yield both the
``succeeded`` output and the :term:`custom output`
``myoutput`` to be considered complete:

.. code-block:: cylc

[runtime]
[[foo]]
completion = succeeded and myoutput
[[[outputs]]]
myoutput = 'my custom task output'

The completion condition uses Python syntax and may reference
any task outputs.
It can use the ``and`` & ``or`` operators, but cannot use
``not``.

.. note::

For the completion expression, hyphens in task outputs
are converted into underscores e.g:

.. code-block:: cylc

[runtime]
[[foo]]
completion = succeeded and my_output # underscore
[[[outputs]]]
my-output = 'my custom task output' # hyphen

.. hint::

If task outputs are optional in the graph they must also
be optional in the completion condition and vice versa.

.. code-block:: cylc

[scheduling]
[[graph]]
R1 = """
# ERROR: this should be "a? => b"
a => b
"""
[runtime]
[[a]]
# this completion condition implies that the
# succeeded output is optional
completion = succeeded or failed

.. rubric:: Examples

``succeeded``
The task must succeed.
``succeeded or expired``
The task can either succeed or
:ref:`expire <ClockExpireTasks>`.
``succeeded or (failed and my_error)``
The task can fail, but only if it also yields the custom
output ``my_error``.
``succeeded and (x or y or z)``
The task must succeed and yield at least one of the
custom outputs, x, y or z.
``(a and b) or (c and d)``
One pair of these outputs must be yielded for the task
to be complete.

.. versionadded:: 8.3.0

''')
Conf('platform', VDR.V_STRING, desc='''
The name of a compute resource defined in
:cylc:conf:`global.cylc[platforms]` or
Expand Down
121 changes: 116 additions & 5 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@
)
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TaskOutputs
TaskOutputs,
get_optional_outputs,
trigger_to_completion_variable,
)
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -534,6 +537,7 @@ def __init__(

self._check_implicit_tasks()
self._check_sequence_bounds()
self.check_completion_expressions()
self.validate_namespace_names()

# Check that external trigger messages are only used once (they have to
Expand Down Expand Up @@ -1127,6 +1131,107 @@ def check_param_env_tmpls(self):
)
)

def check_completion_expressions(self):
"""Check any configured completion expressions.

* Ensure completion expressions are not used in Cylc 7 compat mode.
* Ensure completion expressions do not reference missing task outputs.
* Ensure completion expressions are valid and do not use forbidden
syntax.
* Ensure optional/required outputs are consistent between the
completion expression and the graph.

Raises:
WorkflowConfigError: If any of the above checks fail.

"""
for name, rtcfg in self.cfg['runtime'].items():
expr = rtcfg.get('completion')
if not expr:
# no expression to validate
continue

if 'submit-failed' in expr:
raise WorkflowConfigError(
f'Error in [runtime][{name}]completion:'
f'\nUse "submit_failed" rather than "submit-failed"'
' in completion expressions.'
)
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
elif '-' in expr:
raise WorkflowConfigError(
f'Error in [runtime][{name}]completion:'
f'\n {expr}'
'\nReplace hyphens with underscores in task outputs when'
' used in completion expressions.'
)

# check completion expressions are not being used in compat mode
if cylc.flow.flags.cylc7_back_compat:
raise WorkflowConfigError(
'[runtime][<namespace>]completion cannot be used'
' in Cylc 7 compatibility mode.'
)

# get the outputs and completion expression for this task
try:
outputs = self.taskdefs[name].outputs
except KeyError:
# this is a family
continue

# get the optional/required outputs defined in the graph
graph_optionals = {
trigger_to_completion_variable(output): (
None if is_required is None else not is_required
)
for output, (_, is_required)
in outputs.items()
}

# get the optional/required outputs defined in the expression
try:
# this involves running the expression which also validates it
expression_optionals = get_optional_outputs(expr, outputs)
except NameError as exc:
# expression references an output which has not been registered
raise WorkflowConfigError(
# NOTE: str(exc) == "name 'x' is not defined"
# tested in tests/integration/test_optional_outputs.py
f'Error in [runtime][{name}]completion:'
f'\nInput {str(exc)[5:]}'
)
except Exception as exc: # includes InvalidCompletionExpression
# expression contains non-whitelisted syntax
# or any other error in the expression e.g. SyntaxError
raise WorkflowConfigError(
f'Error in [runtime][{name}]completion:'
f'\n{str(exc)}'
)

# ensure the optional/required outputs defined in the
# graph/expression
for output in (*graph_optionals, *expression_optionals):
graph_opt = graph_optionals.get(output)
expr_opt = expression_optionals.get(output)
if (
# output is used in graph
graph_opt is not None
# output optionality does not match between graph and expr
and graph_opt != expr_opt
):
if graph_opt is True:
raise WorkflowConfigError(
f'{name}:{output} is optional in the graph'
' (? symbol), but required in the completion'
' expression.'
)
if expr_opt is True:
raise WorkflowConfigError(
f'{name}:{output} is optional in the completion'
' expression, but required in the graph'
' (? symbol).'
)

def filter_env(self):
"""Filter environment variables after sparse inheritance"""
for ns in self.cfg['runtime'].values():
Expand Down Expand Up @@ -2101,7 +2206,12 @@ def load_graph(self):

# Parse and process each graph section.
task_triggers = {}
task_output_opt = {}
task_output_opt = {
# <task>:expired is inferred to be optional if <task> is
# clock-expired
(name, TASK_OUTPUT_EXPIRED): (True, True, True)
for name in self.expiration_offsets
}
for section, graph in sections:
try:
seq = get_sequence(section, icp, fcp)
Expand Down Expand Up @@ -2136,7 +2246,7 @@ def load_graph(self):
raise WorkflowConfigError('task and @xtrigger names clash')

for tdef in self.taskdefs.values():
tdef.tweak_outputs()
tdef.set_implicit_required_outputs()

def _proc_triggers(self, parser, seq, task_triggers):
"""Define graph edges, taskdefs, and triggers, from graph sections."""
Expand Down Expand Up @@ -2194,13 +2304,14 @@ def set_required_outputs(
task_output_opt: {(task, output): (is-optional, default, is_set)}
"""
for name, taskdef in self.taskdefs.items():
outputs = taskdef.outputs
for output in taskdef.outputs:
try:
optional, _, _ = task_output_opt[(name, output)]
optional_graph, *_ = task_output_opt[(name, output)]
except KeyError:
# Output not used in graph.
continue
taskdef.set_required_output(output, not optional)
outputs[output] = (outputs[output][0], not optional_graph)

def find_taskdefs(self, name: str) -> Set[TaskDef]:
"""Find TaskDef objects in family "name" or matching "name".
Expand Down
13 changes: 13 additions & 0 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,16 @@ def __str__(self):
)
else:
return "Installed workflow is not compatible with Cylc 8."


class InvalidCompletionExpression(CylcError):
"""For the [runtime][<namespace>]completion configuration.

Raised when non-whitelisted syntax is present.
"""
def __init__(self, message, expr=None):
self.message = message
self.expr = expr

def __str__(self):
return self.message
5 changes: 4 additions & 1 deletion cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_EXPIRED,
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_SUCCEED_ALL,
Expand Down Expand Up @@ -782,6 +783,8 @@ def _set_output_opt(
# Check opposite output where appropriate.
for opposites in [
(TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED),
(TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_EXPIRED),
(TASK_OUTPUT_FAILED, TASK_OUTPUT_EXPIRED),
(TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUBMIT_FAILED)
]:
if output not in opposites:
Expand Down
Loading
Loading