From 01a84978d0b31d92475ebbfa84ab648051fae37a Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 26 Jul 2023 17:09:53 +0100 Subject: [PATCH 1/3] optional outputs extension **Implements:** https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html **Closes:** #5640 * Require at least one optional output to be generated in order for the task to be considered "completed" by default. * Add the ability for users to manually specify the completion expression. * Bring task expiry into the optional outputs model. Document in changelog. --- changes.d/5651.break.md | 9 + cylc/flow/cfgspec/workflow.py | 109 ++++ cylc/flow/config.py | 114 ++++- cylc/flow/exceptions.py | 13 + cylc/flow/graph_parser.py | 5 +- cylc/flow/task_outputs.py | 154 +++++- cylc/flow/task_pool.py | 41 +- cylc/flow/taskdef.py | 55 +- cylc/flow/unicode_rules.py | 4 +- cylc/flow/util.py | 203 ++++++++ .../04-dummy-mode-output/flow.cylc | 2 + .../clock-expire/00-basic/flow.cylc | 17 +- .../cylc-config/00-simple/section2.stdout | 13 + .../data-store/00-prune-optional-break.t | 15 +- .../11-garbage-platform-command/flow.cylc | 1 + .../02-no-stall-on-optional/flow.cylc | 33 +- tests/functional/restart/30-outputs/flow.cylc | 7 +- .../restart/58-removed-task/flow.cylc | 13 +- .../spawn-on-demand/09-set-outputs/flow.cylc | 2 + .../spawn-on-demand/11-abs-suicide/flow.cylc | 46 +- .../triggering/07-fam-fail-any/flow.cylc | 1 + tests/integration/test_config.py | 2 +- tests/integration/test_optional_outputs.py | 476 ++++++++++++++++++ tests/integration/validate/test_outputs.py | 3 +- tests/unit/test_graph_parser.py | 38 +- tests/unit/test_task_outputs.py | 264 +++++++++- tests/unit/test_xtrigger_mgr.py | 8 +- 27 files changed, 1493 insertions(+), 155 deletions(-) create mode 100644 changes.d/5651.break.md create mode 100644 tests/integration/test_optional_outputs.py diff --git a/changes.d/5651.break.md b/changes.d/5651.break.md new file mode 100644 index 00000000000..9edff066cf4 --- /dev/null +++ b/changes.d/5651.break.md @@ -0,0 +1,9 @@ +Refinements to optional outputs: + +* Require at least one optional output to be generated. This is a breaking + change for some workflows e.g. `a? => b` where `a:fail?` is not referenced + in the graph. To fix, set `[runtime][a]completion = succeeded or failed`. +* Allow the condition for task completion to be manually specified. +* Allow expiry events to be detected for tasks as soon as any of their + prerequisites are satisfied (rather than when all of their prerequisites + are satisfied). diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 7ef99814af2..e10b7cc7264 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -961,6 +961,115 @@ def get_script_common_text(this: str, example: Optional[str] = None): can be explicitly configured to provide or override default settings for all tasks in the workflow. '''): + Conf('completion', VDR.V_STRING, desc=''' + Define the condition for task completion. + + The completion condition is evaluated when a task is finished, + it is a validation check which confirms that the task has + generated the outputs it was expected to. + + If the task fails this check it is considered + :term:`incomplete` and will cause the workflow to + :term:`stall` altering you that something has gone wrong which + requires investigation. + + By default, the completion condition ensures that the + task succeeds. I.E. if the task fails, then the workflow will + stall. + + If :ref:`User Guide Optional Outputs` are defined in the graph, + then one or more of the optional outputs must also be generated + in order for the task to be completed. + + E.G. In this example, the task ``foo`` must succeed *and* + yield one of the outputs ``a`` or ``b`` in order to be + considered complete: + + .. code-block:: cylc-graph + + foo:a? => a + foo:b? => b + + In Python syntax that condition looks like this: + + .. code-block:: python + + # the task must succeed and yield at least one of the + # outputs a or b + succeeded and (a or b) + + The ``completion`` configuration allows you to override the + default completion to suit your needs. + + E.G. In this example, the task ``foo`` must yield both the + ``succeeded`` output and the :term:`custom output` + ``myoutput`` to be considered complete: + + .. code-block:: cylc + + [runtime] + [[foo]] + completion = succeeded and myoutput + [[[outputs]]] + myoutput = 'my custom task output' + + The completion condition uses Python syntax and may reference + any task outputs. + It can use the ``and`` & ``or`` operators, but cannot use + ``not``. + + .. note:: + + For the completion expression, hyphens in task outputs + are converted into underscores e.g: + + .. code-block:: cylc + + [runtime] + [[foo]] + completion = succeeded and my_output # underscore + [[[outputs]]] + my-output = 'my custom task output' # hyphen + + .. hint:: + + If task outputs are optional in the graph they must also + be optional in the completion condition and vice versa. + + .. code-block:: cylc + + [scheduling] + [[graph]] + R1 = """ + # ERROR: this should be "a? => b" + a => b + """ + [runtime] + [[a]] + # this completion condition infers that the + # succeeded output is optional + completion = succeeded or failed + + .. rubric:: Examples + + ``succeeded`` + The task must succeed. + ``succeeded or expired`` + The task can either succeed or + :ref:`expire `. + ``succeeded or (failed and some_error)`` + The task can fail, but only if it also yields the custom + output ``my_error``. + ``succeeded and (x or y or z)`` + The task must succeed and yield at least one of the + custom outputs, x, y or z. + ``(a and b) or (c and d)`` + One pair of these outputs must be yielded for the task + to be complete. + + .. versionadded:: 8.3.0 + + ''') Conf('platform', VDR.V_STRING, desc=''' The name of a compute resource defined in :cylc:conf:`global.cylc[platforms]` or diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 79d305da57a..57d72a6c322 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -88,8 +88,11 @@ ) from cylc.flow.task_id import TaskID from cylc.flow.task_outputs import ( + TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUCCEEDED, - TaskOutputs + TaskOutputs, + get_optional_outputs, + trigger_to_completion_variable, ) from cylc.flow.task_trigger import TaskTrigger, Dependency from cylc.flow.taskdef import TaskDef @@ -534,6 +537,7 @@ def __init__( self._check_implicit_tasks() self._check_sequence_bounds() + self.check_completion_expressions() self.validate_namespace_names() # Check that external trigger messages are only used once (they have to @@ -1127,6 +1131,100 @@ def check_param_env_tmpls(self): ) ) + def check_completion_expressions(self): + """Check any configured completion expressions. + + * Ensure completion expressions are not used in Cylc 7 compat mode. + * Ensure completion expressions do not reference missing task outputs. + * Ensure completion expressions are valid and do not use forbidden + syntax. + * Ensure optional/required outputs are consistent between the + completion expression and the graph. + + Raises: + WorkflowConfigError: If any of the above checks fail. + + """ + for name, rtcfg in self.cfg['runtime'].items(): + expr = rtcfg.get('completion') + if not expr: + # no expression to validate + continue + + if 'submit-failed' in expr: + raise WorkflowConfigError( + f'Error in [runtime][{name}]completion:' + f'\nUse "submit_failed" rather than "submit-failed"' + ' in completion expressions.' + ) + + # check completion expressions are not being used in compat mode + if cylc.flow.flags.cylc7_back_compat: + raise WorkflowConfigError( + '[runtime][]completion cannot be used' + ' in Cylc 7 compatibility mode.' + ) + + # get the outputs and completion expression for this task + try: + outputs = self.taskdefs[name].outputs + except KeyError: + # this is a family + continue + + # get the optional/required outputs defined in the graph + graph_optionals = { + trigger_to_completion_variable(output): ( + None if is_required is None else not is_required + ) + for output, (_, is_required) + in outputs.items() + } + + # get the optional/required outputs defined in the expression + try: + # this involves running the expression which also validates it + expression_optionals = get_optional_outputs(expr, outputs) + except NameError as exc: + # expression references an output which has not been registered + raise WorkflowConfigError( + # NOTE: str(exc) == "name 'x' is not defined" + # tested in tests/integration/test_optional_outputs.py + f'Error in [runtime][{name}]completion:' + f'\nInput {str(exc)[5:]}' + ) + except Exception as exc: # includes InvalidCompletionExpression + # expression contains non-whitelisted syntax + # or any other error in the expression e.g. SyntaxError + raise WorkflowConfigError( + f'Error in [runtime][{name}]completion:' + f'\n{str(exc)}' + ) + + # ensure the optional/required outputs defined in the + # graph/expression + for output in (*graph_optionals, *expression_optionals): + graph_opt = graph_optionals.get(output) + expr_opt = expression_optionals.get(output) + if ( + # output is used in graph + graph_opt is not None + # output optionality does not match between graph and expr + and graph_opt != expr_opt + ): + if graph_opt is True: + raise WorkflowConfigError( + f'{name}:{output} is optional in the graph' + ' (? symbol), but required in the completion' + ' expression.' + ) + if expr_opt is True: + raise WorkflowConfigError( + f'{name}:{output} is optional in the completion' + ' expression, but required in the graph' + ' (? symbol).' + ) + def filter_env(self): """Filter environment variables after sparse inheritance""" for ns in self.cfg['runtime'].values(): @@ -2101,7 +2199,12 @@ def load_graph(self): # Parse and process each graph section. task_triggers = {} - task_output_opt = {} + task_output_opt = { + # :expired is inferred to be optional if is + # clock-expired + (name, TASK_OUTPUT_EXPIRED): (True, True, True) + for name in self.expiration_offsets + } for section, graph in sections: try: seq = get_sequence(section, icp, fcp) @@ -2136,7 +2239,7 @@ def load_graph(self): raise WorkflowConfigError('task and @xtrigger names clash') for tdef in self.taskdefs.values(): - tdef.tweak_outputs() + tdef.set_implicit_required_outputs() def _proc_triggers(self, parser, seq, task_triggers): """Define graph edges, taskdefs, and triggers, from graph sections.""" @@ -2194,13 +2297,14 @@ def set_required_outputs( task_output_opt: {(task, output): (is-optional, default, is_set)} """ for name, taskdef in self.taskdefs.items(): + outputs = taskdef.outputs for output in taskdef.outputs: try: - optional, _, _ = task_output_opt[(name, output)] + optional_graph, *_ = task_output_opt[(name, output)] except KeyError: # Output not used in graph. continue - taskdef.set_required_output(output, not optional) + outputs[output] = (outputs[output][0], not optional_graph) def find_taskdefs(self, name: str) -> Set[TaskDef]: """Find TaskDef objects in family "name" or matching "name". diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 79a726d7bbe..2c1555344a6 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -478,3 +478,16 @@ def __str__(self): ) else: return "Installed workflow is not compatible with Cylc 8." + + +class InvalidCompletionExpression(CylcError): + """For the [runtime][]completion configuration. + + Raised when non-whitelisted syntax is present. + """ + def __init__(self, message, expr=None): + self.message = message + self.expr = expr + + def __str__(self): + return self.message diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index f37954712c0..4396ea57959 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -37,7 +37,8 @@ TASK_OUTPUT_FAILED, TASK_OUTPUT_FINISHED, TASK_OUTPUT_SUBMITTED, - TASK_OUTPUT_SUBMIT_FAILED + TASK_OUTPUT_SUBMIT_FAILED, + TASK_OUTPUT_EXPIRED, ) from cylc.flow.task_qualifiers import ( QUAL_FAM_SUCCEED_ALL, @@ -782,6 +783,8 @@ def _set_output_opt( # Check opposite output where appropriate. for opposites in [ (TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED), + (TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_EXPIRED), + (TASK_OUTPUT_FAILED, TASK_OUTPUT_EXPIRED), (TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUBMIT_FAILED) ]: if output not in opposites: diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index d2c70f11780..2b3ab5be6b3 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -15,6 +15,12 @@ # along with this program. If not, see . """Task output message manager and constants.""" +import ast +import re + +from cylc.flow.exceptions import InvalidCompletionExpression +from cylc.flow.util import restricted_evaluator + # Standard task output strings, used for triggering. TASK_OUTPUT_EXPIRED = "expired" @@ -48,6 +54,119 @@ _IS_COMPLETED = 2 +# this evaluates task completion expressions +CompletionEvaluator = restricted_evaluator( + # expressions + ast.Expression, + # variables + ast.Name, ast.Load, + # operations + ast.BoolOp, ast.And, ast.Or, ast.BinOp, + error_class=InvalidCompletionExpression, +) + + +def trigger_to_completion_variable(output): + """Turn a trigger into something that can be used in an expression. + + Examples: + >>> trigger_to_completion_variable('succeeded') + 'succeeded' + >>> trigger_to_completion_variable('submit-failed') + 'submit_failed' + + """ + return output.replace('-', '_') + + +def get_completion_expression(tdef): + """Return a task's completion expression.""" + expr = tdef.rtconfig.get('completion') + if expr: + # the expression has been explicitly provided in the configuration + return expr + + # default behaviour: + # * succeeded is a required output unless stated otherwise + # * if optional outputs are defined in the graph, one or more must be + # generated. + ands = [] + ors = [] + for trigger, (_message, required) in tdef.outputs.items(): + trig = trigger_to_completion_variable(trigger) + if required is True: + ands.append(trig) + if required is False: + ors.append(trig) + + if not ands and not ors: + # task is not used in the graph + # e.g. task has been removed by restart/reload + # we cannot tell what the completion condition was because it is not + # defined in the runtime section so we allow any completion status + return 'succeeded or failed or expired' + + # sort for stable output + ands.sort() + ors.sort() + + # join the lists of "ands" and "ors" into statements + _ands = '' + if ands: + _ands = ' and '.join(ands) + _ors = '' + if ors: + _ors = ' or '.join(ors) + + # join the statements of "ands" and "ors" into an expression + if ands and ors: + if len(ors) > 1: + expr = f'{_ands} and ({_ors})' + else: + expr = f'{_ands} and {_ors}' + elif ands: + expr = _ands + else: + expr = _ors + + return expr + + +def get_optional_outputs(expression, outputs): + """Determine which outputs are optional in an expression + + Raises: + NameError: + If an output referenced in the expression is not present in the + outputs dict provided. + InvalidCompletionExpression: + If any syntax used in the completion expression is not permitted. + Exception: + For errors executing the completion expression itself. + + """ + _outputs = [trigger_to_completion_variable(o) for o in outputs] + return { # output: is_optional + output: CompletionEvaluator( + expression, + **{o: o != output for o in _outputs} + ) + for output in _outputs + } + + +def get_used_outputs(expression, outputs): + """Return all outputs which are used in the expression. + + Called on stall to determine what outputs weren't generated. + """ + return { + output + for output in outputs + if re.findall(rf'\b{output}\b', expression) + } + + class TaskOutputs: """Task output message manager. @@ -64,28 +183,26 @@ class TaskOutputs: """ # Memory optimization - constrain possible attributes to this list. - __slots__ = ["_by_message", "_by_trigger", "_required"] + __slots__ = ["_by_message", "_by_trigger", "_completion_expression"] def __init__(self, tdef): self._by_message = {} self._by_trigger = {} - self._required = set() + self._completion_expression = get_completion_expression(tdef) # Add outputs from task def. - for trigger, (message, required) in tdef.outputs.items(): - self._add(message, trigger, required=required) + for trigger, (message, _required) in tdef.outputs.items(): + self._add(message, trigger) - def _add(self, message, trigger, is_completed=False, required=False): + def _add(self, message, trigger, is_completed=False): """Add a new output message""" self._by_message[message] = [trigger, message, is_completed] self._by_trigger[trigger] = self._by_message[message] - if required: - self._required.add(trigger) def set_completed_by_msg(self, message): """For flow trigger --wait: set completed outputs from the DB.""" for trig, msg, _ in self._by_trigger.values(): if message == msg: - self._add(message, trig, True, trig in self._required) + self._add(message, trig, True) break def all_completed(self): @@ -190,19 +307,24 @@ def set_msg_trg_completion(self, message=None, trigger=None, def is_incomplete(self): """Return True if any required outputs are not complete.""" - return any( - not completed - and trigger in self._required + outputs = { + trigger_to_completion_variable(trigger): completed for trigger, (_, _, completed) in self._by_trigger.items() - ) + } + return not CompletionEvaluator(self._completion_expression, **outputs) def get_incomplete(self): - """Return a list of required outputs that are not complete.""" - return [ + """Return a list of outputs that are not complete.""" + used_outputs = get_used_outputs( + self._completion_expression, + self._by_trigger, + ) + return sorted( trigger for trigger, (_, _, is_completed) in self._by_trigger.items() - if not is_completed and trigger in self._required - ] + if not is_completed + and trigger in used_outputs + ) def get_item(self, message): """Return output item by message. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e7d85f66938..64a8bb3e7f8 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -811,13 +811,22 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': return point_itasks - def get_task(self, point, name): + def get_task( + self, + point: Union[str, 'PointBase'], + name: str + ) -> Optional[TaskProxy]: """Retrieve a task from the pool.""" rel_id = f'{point}/{name}' + if isinstance(point, str): + # convenient for tests, use PointBase instances for efficiency + # otherwise + point = get_point(point) for pool in (self.main_pool, self.hidden_pool): tasks = pool.get(point) if tasks and rel_id in tasks: return tasks[rel_id] + return None def _get_hidden_task_by_id(self, id_: str) -> Optional[TaskProxy]: """Return runahead pool task by ID if it exists, or None.""" @@ -1100,8 +1109,9 @@ def log_incomplete_tasks(self) -> bool: for itask in self.get_tasks(): if not itask.state(*TASK_STATUSES_FINAL): continue - outputs = itask.state.outputs.get_incomplete() - if outputs: + + if itask.state.outputs.is_incomplete(): + outputs = itask.state.outputs.get_incomplete() incomplete.append((itask.identity, outputs)) if incomplete: @@ -1374,12 +1384,11 @@ def remove_if_complete(self, itask): if self.compute_runahead(): self.release_runahead_tasks() else: - incomplete = itask.state.outputs.get_incomplete() - if incomplete: + if itask.state.outputs.is_incomplete(): # Retain as incomplete. LOG.warning( f"[{itask}] did not complete required outputs:" - f" {incomplete}" + f" {itask.state.outputs.get_incomplete()}" ) else: # Remove as completed. @@ -1763,8 +1772,14 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: return sim_task_state_changed def set_expired_tasks(self): + """Check whether any tasks have expired. + + Note, this must check tasks with partially-satisfied prerequisites, + see proposal point (6): + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ res = False - for itask in self.get_tasks(): + for itask in self.get_all_tasks(): if self._set_expired_task(itask): res = True return res @@ -1775,11 +1790,11 @@ def _set_expired_task(self, itask): Return True if task has expired. """ if ( - not itask.state( - TASK_STATUS_WAITING, - is_held=False - ) - or itask.tdef.expiration_offset is None + not itask.state( + TASK_STATUS_WAITING, + is_held=False + ) + or itask.tdef.expiration_offset is None ): return False if itask.expire_time is None: @@ -2048,7 +2063,7 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: if ( itask.state(*TASK_STATUSES_FINAL) - and itask.state.outputs.get_incomplete() + and itask.state.outputs.is_incomplete() ): # Re-queue incomplete task to run again in the merged flow. LOG.info(f"[{itask}] incomplete task absorbed by new flow.") diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index a4ae681e2d9..b8057a41e4b 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -125,6 +125,31 @@ def generate_graph_parents(tdef, point, taskdefs): return graph_parents +def set_implicit_required_outputs(outputs): + """Set implicit "submitted" and "succeeded" required outputs. + + * If :succeed or :fail not set, assume success is required. + * Unless submit (and submit-fail) is optional (don't stall + because of missing succeed if submit is optional). + + """ + if ( + outputs[TASK_OUTPUT_SUCCEEDED][1] is None + and outputs[TASK_OUTPUT_FAILED][1] is None + and outputs[TASK_OUTPUT_SUBMITTED][1] is not False + and outputs[TASK_OUTPUT_SUBMIT_FAILED][1] is not False + ): + outputs[TASK_OUTPUT_SUCCEEDED] = (TASK_OUTPUT_SUCCEEDED, True) + + # In Cylc 7 back compat mode, make all success outputs required. + if cylc.flow.flags.cylc7_back_compat: + for output in [ + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUCCEEDED + ]: + outputs[output] = (output, True) + + class TaskDef: """Task definition.""" @@ -184,33 +209,9 @@ def _add_std_outputs(self): for output in SORT_ORDERS: self.outputs[output] = (output, None) - def set_required_output(self, output, required): - """Set outputs to required or optional.""" - # (Note outputs and associated messages already defined.) - message, _ = self.outputs[output] - self.outputs[output] = (message, required) - - def tweak_outputs(self): - """Output consistency checking and tweaking.""" - - # If :succeed or :fail not set, assume success is required. - # Unless submit (and submit-fail) is optional (don't stall - # because of missing succeed if submit is optional). - if ( - self.outputs[TASK_OUTPUT_SUCCEEDED][1] is None - and self.outputs[TASK_OUTPUT_FAILED][1] is None - and self.outputs[TASK_OUTPUT_SUBMITTED][1] is not False - and self.outputs[TASK_OUTPUT_SUBMIT_FAILED][1] is not False - ): - self.set_required_output(TASK_OUTPUT_SUCCEEDED, True) - - # In Cylc 7 back compat mode, make all success outputs required. - if cylc.flow.flags.cylc7_back_compat: - for output in [ - TASK_OUTPUT_SUBMITTED, - TASK_OUTPUT_SUCCEEDED - ]: - self.set_required_output(output, True) + def set_implicit_required_outputs(self): + """Set implicit "submitted" and "succeeded" required outputs.""" + set_implicit_required_outputs(self.outputs) def add_graph_child(self, trigger, taskname, sequence): """Record child task instances that depend on my outputs. diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 4c71e190ce3..05cc065d19e 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -345,11 +345,11 @@ class TaskOutputValidator(UnicodeRuleChecker): RULES = [ # restrict outputs to sensible characters - allowed_characters(r'\w', r'\d', r'\-', r'\.'), + allowed_characters(r'\w', r'\d', r'\-'), # blacklist the _cylc prefix not_starts_with('_cylc'), # blacklist keywords - not_equals('required', 'optional', 'all'), + not_equals('required', 'optional', 'all', 'and', 'or'), # blacklist built-in task qualifiers not_equals(*TASK_QUALIFIERS), ] diff --git a/cylc/flow/util.py b/cylc/flow/util.py index 11cb83c5c25..a2988a62cda 100644 --- a/cylc/flow/util.py +++ b/cylc/flow/util.py @@ -15,12 +15,14 @@ # along with this program. If not, see . """Misc functionality.""" +import ast from contextlib import suppress from functools import partial import json import re from typing import ( Any, + Callable, List, Tuple, Union, @@ -134,3 +136,204 @@ def serialise(flow_nums: set): def deserialise(flow_num_str: str): """Converts string to set.""" return set(json.loads(flow_num_str)) + + +class _RestrictedEvalError(Exception): + """For internal use. + + Raised in the event non-whitelisted syntax is detected in an expression. + """ + + def __init__(self, node): + self.node = node + + +class RestrictedNodeVisitor(ast.NodeVisitor): + """AST node visitor which error on non-whitelisted syntax. + + Raises _RestrictedEvalError if a non-whitelisted node is visited. + """ + + def __init__(self, whitelist): + super().__init__() + self._whitelist: Tuple[type] = whitelist + + def visit(self, node): + if not isinstance(node, self._whitelist): + # only permit whitelisted operations + raise _RestrictedEvalError(node) + return super().visit(node) + + +def restricted_evaluator( + *whitelist: type, + error_class: Callable = ValueError, +) -> Callable: + """Returns a Python eval statement restricted to whitelisted operations. + + The "eval" function can be used to run arbitrary code. This is useful + but presents security issues. This returns an "eval" method which will + only allow whitelisted operations to be performed allowing it to be used + safely with user-provided input. + + The code passed into the evaluator will be parsed into an abstract syntax + tree AST), then that tree will be executed using Python's internal logic. + The evaluator will check the type of each node before it is executed and + fail with a ValueError if it is not permitted. + + The node types are documented in the ast module: + https://docs.python.org/3/library/ast.html + + The evaluator returned is only as safe as the nodes you whitelist, read the + docs carefully. + + Note: + If you don't need to parse expressions, use ast.literal_eval instead. + + Args: + whitelist: + Types to permit e.g. `ast.Expression`, see the ast docs for + details. + error_class: + An Exception class or callable which returns an Exception instance. + This is called and its result raised in the event that an + expression contains non-whitelisted operations. It will be provided + with the error message as an argument, additionally the following + keyword arguments will be provided if defined: + expr: + The expression the evaluator was called with. + expr_node: + The AST node containing the parsed expression. + error_node: + The first non-whitelisted AST node in the expression. + E.G. `` for a `-` operator. + error_type: + error_node.__class__.__name__. + E.G. `Sub` for a `-` operator. + + Returns: + An "eval" function restricted to the whitelisted nodes. + + Examples: + Optionally, provide an error class to be raised in the event of + non-whitelisted syntax (or you'll get ValueError): + >>> class RestrictedSyntaxError(Exception): + ... def __init__(self, message, error_node): + ... self.args = (str(error_node.__class__),) + + Create an evaluator, whitelisting allowed node types: + >>> evaluator = restricted_evaluator( + ... ast.Expression, # required for all uses + ... ast.BinOp, # an operation (e.g. addition or division) + ... ast.Add, # the "+" operator + ... ast.Constant, # required for literals e.g. "1" + ... ast.Name, # required for using variables in expressions + ... ast.Load, # required for accessing variable values + ... ast.Num, # for Python 3.7 compatibility + ... error_class=RestrictedSyntaxError, # error to raise + ... ) + + This will correctly evaluate intended expressions: + >>> evaluator('1 + 1') + 2 + + But will fail if a non-whitelisted node type is present: + >>> evaluator('1 - 1') + Traceback (most recent call last): + RestrictedSyntaxError: + >>> evaluator('my_function()') + Traceback (most recent call last): + RestrictedSyntaxError: + >>> evaluator('__import__("os")') + Traceback (most recent call last): + RestrictedSyntaxError: + + The evaluator cannot see the containing scope: + >>> a = b = 1 + >>> evaluator('a + b') + Traceback (most recent call last): + NameError: name 'a' is not defined + + To use variables you must explicitly pass them in: + >>> evaluator('a + b', a=1, b=2) + 3 + + """ + # the node visitor is called for each node in the AST, + # this is the bit which rejects types which are not whitelisted + visitor = RestrictedNodeVisitor(whitelist) + + def _eval(expr, **variables): + nonlocal visitor + + # parse the expression + try: + expr_node = ast.parse(expr.strip(), mode='eval') + except SyntaxError as exc: + raise _get_exception( + error_class, + f'{exc.msg}: {exc.text}', + {'expr': expr} + ) + + # check against whitelisted types + try: + visitor.visit(expr_node) + except _RestrictedEvalError as exc: + # non-whitelisted node detected in expression + # => raise exception + error_node = exc.args[0] + raise _get_exception( + error_class, + ( + f'Invalid expression: {expr}' + f'\n"{error_node.__class__.__name__}" not permitted' + ), + { + 'expr': expr, + 'expr_node': expr_node, + 'error_node': error_node, + 'error_type': error_node.__class__.__name__, + }, + ) + + # run the expresion + # Note: this may raise runtime errors + return eval( # nosec + # acceptable use of eval as only whitelisted operations are + # permitted + compile(expr_node, '', 'eval'), + # deny access to builtins + {'__builtins__': {}}, + # provide access to explicitly provided variables + variables, + ) + + return _eval + + +def _get_exception( + error_class: Callable, + message: str, + context: dict +) -> Exception: + """Helper which returns exception instances. + + Filters the arguments in context by the parameters of the error_class. + + This allows the error_class to decide what fields it wants, and for us + to add/change these params in the future. + """ + import inspect # no need to import unless errors occur + try: + params = dict(inspect.signature(error_class).parameters) + except ValueError: + params = {} + + context = { + key: value + for key, value in context.items() + if key in params + } + + return error_class(message, **context) diff --git a/tests/flakyfunctional/cylc-get-config/04-dummy-mode-output/flow.cylc b/tests/flakyfunctional/cylc-get-config/04-dummy-mode-output/flow.cylc index 4db81ada9ee..d366990eed6 100644 --- a/tests/flakyfunctional/cylc-get-config/04-dummy-mode-output/flow.cylc +++ b/tests/flakyfunctional/cylc-get-config/04-dummy-mode-output/flow.cylc @@ -22,10 +22,12 @@ default run length = PT0S [[foo]] script = true + completion = succeeded [[[outputs]]] meet = meet [[bar]] script = true + completion = succeeded [[[outputs]]] greet = greet [[baz]] diff --git a/tests/functional/clock-expire/00-basic/flow.cylc b/tests/functional/clock-expire/00-basic/flow.cylc index afeb7df356d..b44a3647d4c 100644 --- a/tests/functional/clock-expire/00-basic/flow.cylc +++ b/tests/functional/clock-expire/00-basic/flow.cylc @@ -3,7 +3,8 @@ [meta] title = task expire example workflow description = """ -Skip a daily post-processing workflow if the 'copy' task has expired.""" + Skip a daily post-processing workflow if the 'copy' task has expired. + """ [scheduler] cycle point format = %Y-%m-%dT%H @@ -12,6 +13,7 @@ Skip a daily post-processing workflow if the 'copy' task has expired.""" [[events]] abort on stall timeout = True stall timeout = PT1M + [scheduling] initial cycle point = now final cycle point = +P3D @@ -23,16 +25,17 @@ Skip a daily post-processing workflow if the 'copy' task has expired.""" # behind "now + 1 day". This makes the first two 'copy' tasks expire. [[graph]] P1D = """ - model[-P1D] => model => copy => proc + model[-P1D] => model => copy? => proc copy:expired? => !proc """ + [runtime] [[root]] script = true [[copy]] script = """ -# Abort if I run in either of the first two cycle points. -test "${CYLC_TASK_CYCLE_POINT}" != "${CYLC_WORKFLOW_INITIAL_CYCLE_POINT}" -P2D="$(cylc cyclepoint --offset='P1D' "${CYLC_WORKFLOW_INITIAL_CYCLE_POINT}")" -test "${CYLC_TASK_CYCLE_POINT}" != "${P2D}" -""" + # Abort if I run in either of the first two cycle points. + test "${CYLC_TASK_CYCLE_POINT}" != "${CYLC_WORKFLOW_INITIAL_CYCLE_POINT}" + P2D="$(cylc cyclepoint --offset='P1D' "${CYLC_WORKFLOW_INITIAL_CYCLE_POINT}")" + test "${CYLC_TASK_CYCLE_POINT}" != "${P2D}" + """ diff --git a/tests/functional/cylc-config/00-simple/section2.stdout b/tests/functional/cylc-config/00-simple/section2.stdout index 4d3989c8387..8413d2df9fc 100644 --- a/tests/functional/cylc-config/00-simple/section2.stdout +++ b/tests/functional/cylc-config/00-simple/section2.stdout @@ -1,5 +1,6 @@ [[root]] platform = + completion = inherit = init-script = env-script = @@ -76,6 +77,7 @@ [[[parameter environment templates]]] [[OPS]] script = echo "RUN: run-ops.sh" + completion = platform = inherit = init-script = @@ -152,6 +154,7 @@ [[[parameter environment templates]]] [[VAR]] script = echo "RUN: run-var.sh" + completion = platform = inherit = init-script = @@ -228,6 +231,7 @@ [[[parameter environment templates]]] [[SERIAL]] platform = + completion = inherit = init-script = env-script = @@ -305,6 +309,7 @@ [[[parameter environment templates]]] [[PARALLEL]] platform = + completion = inherit = init-script = env-script = @@ -382,6 +387,7 @@ [[[parameter environment templates]]] [[ops_s1]] script = echo "RUN: run-ops.sh" + completion = inherit = OPS, SERIAL platform = init-script = @@ -459,6 +465,7 @@ [[[parameter environment templates]]] [[ops_s2]] script = echo "RUN: run-ops.sh" + completion = inherit = OPS, SERIAL platform = init-script = @@ -536,6 +543,7 @@ [[[parameter environment templates]]] [[ops_p1]] script = echo "RUN: run-ops.sh" + completion = inherit = OPS, PARALLEL platform = init-script = @@ -613,6 +621,7 @@ [[[parameter environment templates]]] [[ops_p2]] script = echo "RUN: run-ops.sh" + completion = inherit = OPS, PARALLEL platform = init-script = @@ -690,6 +699,7 @@ [[[parameter environment templates]]] [[var_s1]] script = echo "RUN: run-var.sh" + completion = inherit = VAR, SERIAL platform = init-script = @@ -767,6 +777,7 @@ [[[parameter environment templates]]] [[var_s2]] script = echo "RUN: run-var.sh" + completion = inherit = VAR, SERIAL platform = init-script = @@ -844,6 +855,7 @@ [[[parameter environment templates]]] [[var_p1]] script = echo "RUN: run-var.sh" + completion = inherit = VAR, PARALLEL platform = init-script = @@ -921,6 +933,7 @@ [[[parameter environment templates]]] [[var_p2]] script = echo "RUN: run-var.sh" + completion = inherit = VAR, PARALLEL platform = init-script = diff --git a/tests/functional/data-store/00-prune-optional-break.t b/tests/functional/data-store/00-prune-optional-break.t index 39de0225e34..8148a15ca2b 100755 --- a/tests/functional/data-store/00-prune-optional-break.t +++ b/tests/functional/data-store/00-prune-optional-break.t @@ -27,19 +27,22 @@ init_workflow "${TEST_NAME_BASE}" << __FLOW__ final cycle point = 1 [[graph]] P1 = """ -a? => b? => c? -d => e -""" + a? => b? => c? + d => e + """ + [runtime] + [[a,b,c]] + completion = succeeded or failed [[a,c,e]] script = true [[b]] script = false [[d]] script = """ -cylc workflow-state \$CYLC_WORKFLOW_ID --task=b --point=1 --status=failed --interval=2 -cylc pause \$CYLC_WORKFLOW_ID -""" + cylc workflow-state \$CYLC_WORKFLOW_ID --task=b --point=1 --status=failed --interval=2 + cylc pause \$CYLC_WORKFLOW_ID + """ __FLOW__ # run workflow diff --git a/tests/functional/job-submission/11-garbage-platform-command/flow.cylc b/tests/functional/job-submission/11-garbage-platform-command/flow.cylc index 1955c1e81ad..f4f306fea9b 100644 --- a/tests/functional/job-submission/11-garbage-platform-command/flow.cylc +++ b/tests/functional/job-submission/11-garbage-platform-command/flow.cylc @@ -10,6 +10,7 @@ [[t1]] script = true platform = badhost + completion = submit_failed or succeeded [[t2]] script = """ cylc broadcast "${CYLC_WORKFLOW_ID}" \ diff --git a/tests/functional/optional-outputs/02-no-stall-on-optional/flow.cylc b/tests/functional/optional-outputs/02-no-stall-on-optional/flow.cylc index 46d72690e85..5db368c700a 100644 --- a/tests/functional/optional-outputs/02-no-stall-on-optional/flow.cylc +++ b/tests/functional/optional-outputs/02-no-stall-on-optional/flow.cylc @@ -1,19 +1,22 @@ # Should not stall at runhead limit with incomplete foo [scheduler] - [[events]] - abort on stall timeout = True - stall timeout = PT0S + [[events]] + abort on stall timeout = True + stall timeout = PT0S + [scheduling] - cycling mode = integer - initial cycle point = 1 - final cycle point = 2 - runahead limit = P0 - [[graph]] - P1 = "foo:y? => bar" + cycling mode = integer + initial cycle point = 1 + final cycle point = 2 + runahead limit = P0 + [[graph]] + P1 = "foo:y? => bar" + [runtime] - [[foo]] - script = "cylc message x" - [[[outputs]]] - x = x - y = y - [[bar]] + [[foo]] + script = "cylc message x" + completion = succeeded + [[[outputs]]] + x = x + y = y + [[bar]] diff --git a/tests/functional/restart/30-outputs/flow.cylc b/tests/functional/restart/30-outputs/flow.cylc index 7a2168dac6a..f8e24f50e0a 100644 --- a/tests/functional/restart/30-outputs/flow.cylc +++ b/tests/functional/restart/30-outputs/flow.cylc @@ -8,12 +8,13 @@ [scheduling] [[graph]] R1 = """ -t1:hello => t2 -t1:greet? => t3 -""" + t1:hello => t2 + t1:greet? => t3 + """ [runtime] [[t1]] script = cylc message 'hello' + completion = succeeded and hello [[[outputs]]] hello = hello greet = greeting diff --git a/tests/functional/restart/58-removed-task/flow.cylc b/tests/functional/restart/58-removed-task/flow.cylc index 94c5cf27b24..67278ea377e 100644 --- a/tests/functional/restart/58-removed-task/flow.cylc +++ b/tests/functional/restart/58-removed-task/flow.cylc @@ -15,9 +15,9 @@ [[graph]] R1 = """ a - {% if INCL_B_C %} - b & c? - {% endif %} +{% if INCL_B_C %} + b & c? +{% endif %} """ [runtime] [[a]] @@ -28,5 +28,8 @@ cylc__job__poll_grep_workflow_log "1/a .*(polled)started" cylc__job__poll_grep_workflow_log "1/b .*(polled)failed" """ - [[b, c]] - script = "false" + [[b]] + script = false + [[c]] + script = false + completion = succeeded or failed diff --git a/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc b/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc index 1d1d7e7e061..dafeaef158f 100644 --- a/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc +++ b/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc @@ -37,8 +37,10 @@ script = """ cylc__job__poll_grep_workflow_log -E "1/setter .* => succeeded" """ + completion = succeeded [[bar]] script = true + completion = succeeded [[setter]] # (To the rescue). script = """ diff --git a/tests/functional/spawn-on-demand/11-abs-suicide/flow.cylc b/tests/functional/spawn-on-demand/11-abs-suicide/flow.cylc index a58d65e36b4..67881e56402 100644 --- a/tests/functional/spawn-on-demand/11-abs-suicide/flow.cylc +++ b/tests/functional/spawn-on-demand/11-abs-suicide/flow.cylc @@ -4,28 +4,30 @@ # cleanly. Otherwise it will abort on stall with unsatisfied prerequisites. [scheduler] - [[events]] - stall timeout = PT0S - abort on stall timeout = True - expected task failures = 1/a + [[events]] + stall timeout = PT0S + abort on stall timeout = True + expected task failures = 1/a [scheduling] - cycling mode = integer - initial cycle point = 1 - final cycle point = 5 - runahead limit = P4 - [[graph]] - R1 = "a?" - P1 = """ - a[^]? => c & !x - a[^]:fail? => x & !c - b => c & x - """ + cycling mode = integer + initial cycle point = 1 + final cycle point = 5 + runahead limit = P4 + [[graph]] + R1 = "a?" + P1 = """ + a[^]? => c & !x + a[^]:fail? => x & !c + b => c & x + """ + [runtime] - [[b,c,x]] - [[a]] - # Fail after c is spawned out to the runahead limit. - script = """ - cylc__job__poll_grep_workflow_log "spawned 5/c" - false - """ + [[b,c,x]] + [[a]] + # Fail after c is spawned out to the runahead limit. + script = """ + cylc__job__poll_grep_workflow_log "spawned 5/c" + false + """ + completion = succeeded or failed diff --git a/tests/functional/triggering/07-fam-fail-any/flow.cylc b/tests/functional/triggering/07-fam-fail-any/flow.cylc index cf50c7000ad..b1ab8c6f56b 100644 --- a/tests/functional/triggering/07-fam-fail-any/flow.cylc +++ b/tests/functional/triggering/07-fam-fail-any/flow.cylc @@ -8,6 +8,7 @@ [runtime] [[FAM]] script = "true" + completion = succeeded or failed [[a,c]] inherit = FAM [[b]] diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index acf24d17eaf..56c16759b85 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -239,7 +239,7 @@ def test_parse_special_tasks_families(flow, scheduler, validate, section): section: 'FOO(P1D)', }, 'graph': { - 'R1': 'foo & foot', + 'R1': 'foo? & foot?', } }, 'runtime': { diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py new file mode 100644 index 00000000000..630a8980f4e --- /dev/null +++ b/tests/integration/test_optional_outputs.py @@ -0,0 +1,476 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test optional output functionality. + +See the proposals: +* https://cylc.github.io/cylc-admin/proposal-new-output-syntax.html +* https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html +""" + +from unittest.mock import Mock + +import pytest + +from cylc.flow.exceptions import ( + GraphParseError, + WorkflowConfigError, +) +from cylc.flow.task_outputs import ( + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUCCEEDED, + get_completion_expression, +) +from cylc.flow.task_state import TASK_STATUS_EXPIRED + + +async def test_workflow_stalls_if_no_opt_output_generated_1( + flow, + scheduler, + start, + log_filter, +): + """It should stall if zero optional outputs are generated. + + Example 1: succeeded and (x or y) + + Tests proposal point (2) + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': ''' + a:x? => x + a:y? => y + ''' + } + }, + 'runtime': { + 'a': { + 'outputs': { + 'x': 'x', + 'y': 'y', + } + }, + 'x, y': {}, + } + }) + schd = scheduler(id_) + async with start(schd) as log: + # set 1/a to succeeded without generating x or y + task_a = schd.pool.get_task('1', 'a') + task_a.state_reset(TASK_OUTPUT_SUCCEEDED) + + # the workflow should be stalled + assert schd.pool.is_stalled() + + # this can get logged by two different interfaces + assert log_filter( + # automatically - when the task finishes + log, + contains="1/a did not complete required outputs: ['x', 'y']", + ) + log.clear() + schd.pool.log_incomplete_tasks() + assert log_filter( + # via log_incomplete_tasks - called on shutdown + log, + contains="1/a did not complete required outputs: ['x', 'y']", + ) + + # set output x + schd.pool.spawn_on_output(task_a, 'x') + + # the workflow should be unstalled + schd.is_stalled = None # reset stall status + assert not schd.pool.is_stalled() + + +async def test_workflow_stalls_if_no_opt_output_generated_2( + flow, + scheduler, + start, +): + """It should stall if zero optional outputs are generated. + + Example 2: succeeded # but we actually want "succeeded or failed" + + Because there is no failure pathway defined in this example, Cylc will + stall as it cannot determine the user's intention. To avoid this a failure + pathway must be provided in the graph, if there is no failure pathway + (i.e. `a:fail => null`) a completion expression can be written to tolerate + task failure. + + Tests proposal point (2) + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + raw = { + 'scheduling': { + 'graph': { + 'R1': ''' + a? => x + ''' + }, + }, + 'runtime': {'a': {}, 'x': {}} + } + + id_ = flow(raw) + schd = scheduler(id_) + async with start(schd): + # one or more optional outputs must be completed so, in the absense of + # an alternative pathway "succeeded" is required + task_a = schd.pool.get_task('1', 'a') + assert task_a.state.outputs._completion_expression == 'succeeded' + + # if 1/a fails, the workflow should stall + task_a.state_reset(TASK_OUTPUT_FAILED) + assert schd.pool.is_stalled() + + # to avoid stall in the event of failure the user must configure a failure + # pathway, either by `a:fail? => something` or by using a completion + # expression + raw['runtime']['a']['completion'] = 'succeeded or failed' + + # the workflow should *not* stall if 1/a fails + id_ = flow(raw) + schd = scheduler(id_) + async with start(schd): + # Cylc now knows that failure is expected and ok + task_a = schd.pool.get_task('1', 'a') + assert task_a.state.outputs._completion_expression == ( + 'succeeded or failed' + ) + + # so the workflow should not stall if 1/a fails + task_a.state_reset(TASK_OUTPUT_FAILED) + assert not schd.pool.is_stalled() + + +async def test_completion_expiression( + flow, + scheduler, + start, + monkeypatch, +): + """The task completion condition should be customisable. + + Tests proposal point (3) + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + mock_remove = Mock() + monkeypatch.setattr( + 'cylc.flow.task_pool.TaskPool.remove', + mock_remove, + ) + + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'a?', + }, + }, + 'runtime': { + 'a': { + 'completion': '(succeeded and x) or (failed and y)', + 'outputs': {'x': 'x', 'y': 'y'}, + }, + }, + }) + schd = scheduler(id_) + async with start(schd): + # complete the "succeeded" output + itask = schd.pool.get_task('1', 'a') + itask.state.outputs.set_completion(TASK_OUTPUT_SUCCEEDED, True) + + # the task should *not* be removed from the pool as complete + schd.pool.remove_if_complete(itask) + assert mock_remove.call_count == 0 + + # complete the "x" output + itask.state.outputs.set_completion('x', True) + + # the task *should* not be removed from the pool as complete + schd.pool.remove_if_complete(itask) + assert mock_remove.call_count == 1 + + +def test_validate_completion_expression( + flow, + validate, + monkeypatch, +): + """It should validate the completion expression itself. + + Tests the validation side of proposal point (3) + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + config = { + 'scheduling': { + 'graph': { + 'R1': 'a?', + }, + }, + 'runtime': { + 'a': { + 'completion': 'succeeded or failed' + } + } + } + id_ = flow(config) + + # it should fail validation in back-compat mode + monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', True) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert 'compatibility mode' in str(exc_ctx.value) + + # but validate fine otherwise + monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', False) + validate(id_) + + # it should fail if we reference outputs which haven't been registered + config['runtime']['a']['completion'] = 'succeeded or (failed and x)' + id_ = flow(config) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert '[runtime][a]completion' in str(exc_ctx.value) + assert "Input 'x' is not defined" in str(exc_ctx.value) + + # it should be happy once we're registered those outputs + config['runtime']['a']['outputs'] = {'x': 'x'} + id_ = flow(config) + validate(id_) + + # it should error on invalid syntax + config['runtime']['a']['completion'] = '!succeeded' + id_ = flow(config) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert '[runtime][a]completion' in str(exc_ctx.value) + assert "invalid syntax" in str(exc_ctx.value) + assert "!succeeded" in str(exc_ctx.value) + + # it should error on restricted syntax + config['runtime']['a']['completion'] = '__import__("os")' + id_ = flow(config) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert '[runtime][a]completion' in str(exc_ctx.value) + assert '__import__("os")' in str(exc_ctx.value) + assert '"Call" not permitted' in str(exc_ctx.value) + + # it should give a helpful error message when submit-failed is used + config['runtime']['a']['completion'] = 'submit-failed' + id_ = flow(config) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert '[runtime][a]completion' in str(exc_ctx.value) + assert ( + 'Use "submit_failed" rather than "submit-failed"' + ) in str(exc_ctx.value) + + # it should accept "submit_failed" + config['runtime']['a']['completion'] = 'submit_failed' + id_ = flow(config) + validate(id_) + + +def test_clock_expire_infers_optional_expiry( + flow, + validate, +): + """Expiry must be an optional output when clock-expiry is configured. + + Tests proposal point (5) + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + config = { + 'scheduling': { + 'initial cycle point': '2000', + 'special tasks': { + 'clock-expire': 'a', + }, + 'graph': { + # * a:expired? is not referenced in the graph + # but is still optional because it's clock-expired + # * a:succeeded cannot be required if a:expired is optional + 'P1D': 'a' + }, + }, + 'runtime': { + 'a': {}, + } + } + + # it should fail validation because "a:succeeded" must be optional + id_ = flow(config) + with pytest.raises(GraphParseError) as exc_ctx: + validate(id_) + assert ( + 'Opposite outputs a:succeeded and a:expired must both be optional' + ) in str(exc_ctx.value) + + # when we correct this is should pass vallidation + config['scheduling']['graph']['P1D'] = 'a?' + id_ = flow(config) + cfg = validate(id_) + taskdef = cfg.taskdefs['a'] + + # expired and succeeded should both be optional outputs + assert taskdef.outputs[TASK_OUTPUT_EXPIRED][1] is False + assert taskdef.outputs[TASK_OUTPUT_SUCCEEDED][1] is False + assert get_completion_expression(taskdef) == ( + 'expired or succeeded' + ) + + +async def test_expiry_is_considered_for_tasks_with_partially_sat_prereqs( + flow, + scheduler, + start, + monkeypatch, +): + """We should capture expiry events as soon as a task is spawned. + + To allow expiry events to be captured early (as opposed to at the time of + submission) e.g. for catchup logic, we need to consider tasks for expiry + as soon as any of their prereqs are satisfied (subject to runahead + constraints). + + Tests proposal point (6): + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md + """ + # prevent expired tasks being removed from the pool for ease of testing + monkeypatch.setattr( + 'cylc.flow.task_pool.TaskPool.remove', + Mock(), + ) + + start_year = 2000 + runahead_limit = 3 + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': str(start_year), + 'runahead limit': f'P{runahead_limit}', + 'special tasks': { + 'clock-expire': 'a', + }, + 'graph': { + 'P1Y': ''' + # this parentless task ensures "a" gets spawned when it is + # within the runahead window + head_of_cycle + a[-P1Y]? & head_of_cycle => a? + a:expire? => catchup + a? => continue + ''' + }, + } + }) + schd = scheduler(id_) + async with start(schd): + # all cycle points *within* the runahead limit + runahead_window = [ + str(year) + for year in range(start_year, start_year + runahead_limit) + ] + + # mark the "head_of_cycle" task as succeeded for each cycle within the + # runahead window + for cycle in runahead_window: + itask = schd.pool.get_task(cycle, 'head_of_cycle') + schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED) + + # run the expiry logic + assert schd.pool.set_expired_tasks() + + # check which tasks were expired + assert { + # tasks marked as expired in the pool + itask.tokens.relative_id + for itask in schd.pool.get_all_tasks() + if itask.state(TASK_STATUS_EXPIRED) + } == { + # tasks we would expect to have been expired + schd.tokens.duplicate(cycle=cycle, task='a').relative_id + for cycle in runahead_window + } + + +def test_validate_optional_outputs_graph_and_completion( + flow, + validate, +): + """It should error if optional outputs are required elsewhere. + + Ensure the use of optional outputs is consistent between the graph and + completion expression if defined. + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'graph': { + 'R1': ''' + a:succeeded? => s + a:expired? => e + ''' + } + }, + 'runtime': { + 'a': { + 'completion': 'expired', + } + } + }) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert str(exc_ctx.value) == ( + 'a:expired is optional in the graph (? symbol),' + ' but required in the completion expression.' + ) + + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'graph': { + 'R1': ''' + a:succeeded => s + ''' + } + }, + 'runtime': { + 'a': { + 'completion': 'succeeded or expired', + } + } + }) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert str(exc_ctx.value) == ( + 'a:succeeded is optional in the completion expression,' + ' but required in the graph (? symbol).' + ) diff --git a/tests/integration/validate/test_outputs.py b/tests/integration/validate/test_outputs.py index a91393366b5..a7ab3cf9a9c 100644 --- a/tests/integration/validate/test_outputs.py +++ b/tests/integration/validate/test_outputs.py @@ -32,7 +32,6 @@ 'foo', 'foo-bar', 'foo_bar', - 'foo.bar', '0foo0', '123', ], @@ -151,7 +150,7 @@ def test_messages(messages, valid, flow, validate): 'runtime': { 'foo': { 'outputs': { - str(random()): message + str(random()).replace('.', ''): message for message in messages } } diff --git a/tests/unit/test_graph_parser.py b/tests/unit/test_graph_parser.py index 97b7fb45483..f3136301d6d 100644 --- a/tests/unit/test_graph_parser.py +++ b/tests/unit/test_graph_parser.py @@ -17,7 +17,7 @@ import logging import pytest -from itertools import product +from itertools import product, permutations from pytest import param from types import SimpleNamespace @@ -29,7 +29,8 @@ TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED + TASK_OUTPUT_FAILED, + TASK_OUTPUT_EXPIRED, ) @@ -840,8 +841,33 @@ def test_fail_family_triggers_on_tasks(ftrig): gp = GraphParser() with pytest.raises(GraphParseError) as cm: gp.parse_graph(f"foo:{ftrig} => bar") - assert ( - str(cm.value).startswith( - "family trigger on non-family namespace" - ) + assert ( + str(cm.value).startswith( + "family trigger on non-family namespace" ) + ) + + +@pytest.mark.parametrize( + 'output1, output2', + permutations( + [ + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_EXPIRED, + ], + 2 + ) +) +def test_opposite_outputs(output1, output2): + """Succeeded, failed and expired are three orthogonal completion outcomes. + + The graph parser should error if one is required and another succeeded. + + See proposal point (4) + https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html + """ + gp = GraphParser() + with pytest.raises(GraphParseError) as cm: + gp.parse_graph(f'a:{output1}? | a:{output2} => b') + assert 'Opposite outputs' in str(cm.value) diff --git a/tests/unit/test_task_outputs.py b/tests/unit/test_task_outputs.py index 4f61e696fbc..7777c4a48be 100644 --- a/tests/unit/test_task_outputs.py +++ b/tests/unit/test_task_outputs.py @@ -13,30 +13,254 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . + import random -import unittest +from types import SimpleNamespace + +import pytest + +from cylc.flow.taskdef import ( + set_implicit_required_outputs, +) +from cylc.flow.task_outputs import ( + TASK_OUTPUTS, + CompletionEvaluator, + TaskOutputs, + get_completion_expression, + get_used_outputs, +) + + +TEST_MESSAGES = [ + ['expired', 'expired', False], + ['submitted', 'submitted', False], + ['submit-failed', 'submit-failed', False], + ['started', 'started', False], + ['succeeded', 'succeeded', False], + ['failed', 'failed', False], + [None, None, False], + ['foo', 'bar', False], + ['foot', 'bart', False], + # NOTE: [None, 'bar', False] is unstable under Python2 +] + + +def test_sorting(): + messages = list(TEST_MESSAGES) + for _ in range(5): + random.shuffle(messages) + output = sorted(messages, key=TaskOutputs.msg_sort_key) + assert output == TEST_MESSAGES + -from cylc.flow.task_outputs import TaskOutputs +def make_tdef( + opt_outputs=None, + req_outputs=None, + completion=None, + set_implicit=True, +): + """Return a TaskDef-like object for completion expression purposes. + Args: + opt_outputs: List of optional outputs or None. + req_outputs: List of required outputs or None. + completion: Custom completion expression or None. + set_implicit: Set implicit outputs (not done for removed tasks). -class TestMessageSorting(unittest.TestCase): + Returns: + Something with enough information to pass as a TaskDef. - TEST_MESSAGES = [ - ['expired', 'expired', False], - ['submitted', 'submitted', False], - ['submit-failed', 'submit-failed', False], - ['started', 'started', False], - ['succeeded', 'succeeded', False], - ['failed', 'failed', False], - [None, None, False], - ['foo', 'bar', False], - ['foot', 'bart', False], - # NOTE: [None, 'bar', False] is unstable under Python2 + """ + opt_outputs = opt_outputs or [] + req_outputs = req_outputs or [] + tdef = SimpleNamespace( + rtconfig={ + 'completion': completion, + }, + outputs={ + **{output: (output, None) for output in TASK_OUTPUTS}, + **{output: (output, False) for output in opt_outputs}, + **{output: (output, True) for output in req_outputs}, + }, + ) + if set_implicit: + set_implicit_required_outputs(tdef.outputs) + return tdef + + +@pytest.mark.parametrize( + 'opt_outputs, req_outputs, completion, expression', + [ + pytest.param( + None, + None, + None, + 'succeeded', + id='default', + ), + pytest.param( + ['x'], + None, + None, + 'succeeded and x', + id='single-optional-output' + ), + pytest.param( + ['x', 'y', 'z'], + None, + None, + 'succeeded and (x or y or z)', + id='multiple-optional-outputs' + ), + pytest.param( + None, + ['succeeded', 'x', 'y', 'z'], + None, + 'succeeded and x and y and z', + id='multiple-required-outputs' + ), + pytest.param( + ['succeeded', 'failed', 'expired'], + None, + None, + 'expired or failed or succeeded', + id='multiple-optional-outputs-no-required-outputs' + ), + pytest.param( + ['a', 'b', 'c'], + ['succeeded', 'x', 'y', 'z'], + None, + 'succeeded and x and y and z and (a or b or c)', + id='multiple-optional-and-required-outputs' + ), + pytest.param( + None, + None, + '(succeeded and x) or (failed and y)', + '(succeeded and x) or (failed and y)', + id='custom-completion-expression' + ), ] +) +def test_completion_expression( + opt_outputs, + req_outputs, + completion, + expression +): + """It should derive a completion expression where not explicitly stated. + + See proposal point (2) for spec: + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-optional-output-extension.md#proposal + """ + expr = get_completion_expression( + make_tdef(opt_outputs, req_outputs, completion) + ) + assert expr == expression + + +def test_completed_expression_removed_task(): + """It should handle tasks not present in the graph. + + If an active task is removed by restart/reload, then we may be left with a + task which does not exist in the graph and might not even have a + definition in the config. + + In these cases we should allow any final outcome. + """ + assert get_completion_expression(make_tdef(set_implicit=False)) == ( + 'succeeded or failed or expired' + ) - def test_sorting(self): - messages = list(self.TEST_MESSAGES) - for _ in range(5): - random.shuffle(messages) - output = sorted(messages, key=TaskOutputs.msg_sort_key) - self.assertEqual(output, self.TEST_MESSAGES, output) + +@pytest.mark.parametrize( + 'expression, outputs, outcome', + [ + ( + 'succeeded', + {'succeeded': False}, + False, + ), + ( + 'succeeded', + {'succeeded': True}, + True, + ), + ( + '(succeeded and x) or (failed and y)', + {'succeeded': True, 'x': True, 'failed': False, 'y': False}, + True, + ), + ] +) +def test_completion_evaluator(expression, outputs, outcome): + """It should evaluate completion expressions against output states.""" + assert CompletionEvaluator(expression, **outputs) == outcome + + +def test_output_is_incomplete(): + """It should report when outputs are complete. + + And which ouputs are incomplete. + """ + tdef = make_tdef( + opt_outputs=['x', 'y', 'z'], + req_outputs=['succeeded', 'a'], + ) + outputs = TaskOutputs(tdef) + + # the outputs should be incomplete on initialisation + assert outputs.is_incomplete() + + # only the referenced outputs should be returned as incomplete + # (i.e. submitted, started, expired, etc should be filtered out) + assert outputs.get_incomplete() == ['a', 'succeeded', 'x', 'y', 'z'] + + # set 'a' and 'succeeded' as completed + outputs.set_completion('a', True) + outputs.set_completion('succeeded', True) + + # the outputs should still be incomplete + assert outputs.is_incomplete() + assert outputs.get_incomplete() == ['x', 'y', 'z'] + + # set the 'x' output as completed + outputs.set_completion('x', True) + + # the outputs should now be complete + assert not outputs.is_incomplete() + assert outputs.get_incomplete() == ['y', 'z'] + + +@pytest.mark.parametrize( + 'expression, outputs, used', + [ + # the 'y' output is not used + pytest.param( + 'succeeded and x', + {'succeeded', 'x', 'y'}, + {'succeeded', 'x'}, + id='1', + ), + # the 'x' output is not used + # (but some used outputs begin with the letter 'x') + pytest.param( + 'xy or xz', + {'x', 'xy', 'xz'}, + {'xy', 'xz'}, + id='2', + ), + pytest.param( + 'a or (b and c) or (d and (e or f))', + {'a', 'b', 'c', 'd', 'e', 'f'}, + {'a', 'b', 'c', 'd', 'e', 'f'}, + id='3', + ), + ] +) +def test_get_used_outputs(expression, outputs, used): + """It should return outputs referenced in the completion expression.""" + assert get_used_outputs( + expression, + ['submitted', 'started', *outputs], + ) == used diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 89cc0024ddf..5b87f3497a1 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -150,7 +150,7 @@ def test_housekeeping_with_xtrigger_satisfied(xtrigger_mgr): xtrig.out = "[\"True\", {\"name\": \"Yossarian\"}]" tdef = TaskDef( name="foo", - rtcfg=None, + rtcfg={}, run_mode="live", start_point=1, initial_point=1 @@ -194,7 +194,7 @@ def test__call_xtriggers_async(xtrigger_mgr): # create a task tdef = TaskDef( name="foo", - rtcfg=None, + rtcfg={}, run_mode="live", start_point=1, initial_point=1 @@ -298,7 +298,7 @@ def test_check_xtriggers(xtrigger_mgr): get_name.out = "[\"True\", {\"name\": \"Yossarian\"}]" tdef1 = TaskDef( name="foo", - rtcfg=None, + rtcfg={}, run_mode="live", start_point=1, initial_point=1 @@ -322,7 +322,7 @@ def test_check_xtriggers(xtrigger_mgr): # create a task tdef2 = TaskDef( name="foo", - rtcfg=None, + rtcfg={}, run_mode="live", start_point=1, initial_point=1 From ba42cc0deafa18e28adbb0fb5e471f2d1e810e1f Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 27 Jul 2023 11:28:20 +0100 Subject: [PATCH 2/3] host select: move to new restricted_evaluator interface --- cylc/flow/host_select.py | 145 ++++++++++++++++++--------------------- 1 file changed, 65 insertions(+), 80 deletions(-) diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index 83861d043d4..df3b0c01c75 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -14,7 +14,51 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Functionality for selecting a host from pre-defined list.""" +"""Functionality for selecting a host from pre-defined list. + +Ranking/filtering hosts can be achieved using Python expressions which work +with the `psutil` interfaces. + +These expressions are used-defined, buy run a restricted evluation environment +where only certain whitelisted operations are permitted. + +Examples: + >>> RankingExpressionEvaluator('1 + 1') + 2 + >>> RankingExpressionEvaluator('1 * -1') + -1 + >>> RankingExpressionEvaluator('1 < a', a=2) + True + >>> RankingExpressionEvaluator('1 in (1, 2, 3)') + True + >>> import psutil + >>> RankingExpressionEvaluator( + ... 'a.available > 0', + ... a=psutil.virtual_memory() + ... ) + True + + If you try to get it to do something you're not allowed to: + >>> RankingExpressionEvaluator('open("foo")') + Traceback (most recent call last): + ValueError: + >>> RankingExpressionEvaluator('import sys') + Traceback (most recent call last): + ValueError: invalid syntax: import sys + + If you try to get hold of something you aren't supposed to: + >>> answer = 42 # only variables explicitly passed in should work + >>> RankingExpressionEvaluator('answer') + Traceback (most recent call last): + NameError: name 'a' is not defined + + If you try to do something which doesn't make sense: + >>> RankingExpressionEvaluator('a.b.c') # no value "a.b.c" + Traceback (most recent call last): + NameError: name 'answer' is not defined + +""" + import ast from collections import namedtuple from functools import lru_cache @@ -35,6 +79,22 @@ from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host from cylc.flow.remote import run_cmd, cylc_server_cmd from cylc.flow.terminal import parse_dirty_json +from cylc.flow.util import restricted_evaluator + + +# evaluates ranking expressions +# (see module docstring for examples) +RankingExpressionEvaluator = restricted_evaluator( + ast.Expression, + # variables + ast.Name, ast.Load, ast.Attribute, ast.Subscript, ast.Index, + # opers + ast.BinOp, ast.operator, ast.UnaryOp, ast.unaryop, + # types + ast.Num, ast.Str, + # comparisons + ast.Compare, ast.cmpop, ast.List, ast.Tuple, +) GLBL_CFG_STR = 'global.cylc[scheduler][run hosts]ranking' @@ -301,7 +361,10 @@ def _filter_by_ranking(hosts, rankings, results, data=None): for key, expression in rankings: item = _reformat_expr(key, expression) try: - result = _simple_eval(expression, RESULT=results[host][key]) + result = RankingExpressionEvaluator( + expression, + RESULT=results[host][key], + ) except Exception as exc: raise GlobalConfigError( 'Invalid host ranking expression' @@ -334,84 +397,6 @@ def _filter_by_ranking(hosts, rankings, results, data=None): ) -class SimpleVisitor(ast.NodeVisitor): - """Abstract syntax tree node visitor for simple safe operations.""" - - def visit(self, node): - if not isinstance(node, self.whitelist): - # permit only whitelisted operations - raise ValueError(type(node)) - return super().visit(node) - - whitelist = ( - ast.Expression, - # variables - ast.Name, ast.Load, ast.Attribute, ast.Subscript, ast.Index, - # opers - ast.BinOp, ast.operator, ast.UnaryOp, ast.unaryop, - # types - ast.Num, ast.Str, - # comparisons - ast.Compare, ast.cmpop, ast.List, ast.Tuple, - ) - - -def _simple_eval(expr, **variables): - """Safely evaluates simple python expressions. - - Supports a minimal subset of Python operators: - * Binary operations - * Simple comparisons - - Supports a minimal subset of Python data types: - * Numbers - * Strings - * Tuples - * Lists - - Examples: - >>> _simple_eval('1 + 1') - 2 - >>> _simple_eval('1 * -1') - -1 - >>> _simple_eval('1 < a', a=2) - True - >>> _simple_eval('1 in (1, 2, 3)') - True - >>> import psutil - >>> _simple_eval('a.available > 0', a=psutil.virtual_memory()) - True - - If you try to get it to do something you're not allowed to: - >>> _simple_eval('open("foo")') - Traceback (most recent call last): - ValueError: - >>> _simple_eval('import sys') - Traceback (most recent call last): - SyntaxError: ... - - If you try to get hold of something you aren't supposed to: - >>> answer = 42 # only variables explicitly passed in should work - >>> _simple_eval('answer') - Traceback (most recent call last): - NameError: name 'a' is not defined - - If you try to do something which doesn't make sense: - >>> _simple_eval('a.b.c') # no value "a.b.c" - Traceback (most recent call last): - NameError: name 'answer' is not defined - - """ - node = ast.parse(expr.strip(), mode='eval') - SimpleVisitor().visit(node) - # acceptable use of eval due to restricted language features - return eval( # nosec - compile(node, '', 'eval'), - {'__builtins__': {}}, - variables - ) - - def _get_rankings(string): """Yield parsed ranking expressions. From 6a710c0d6d524361075b64fa3f564f74882bb7c4 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 24 Aug 2023 10:03:31 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Hilary James Oliver Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- changes.d/5651.break.md | 2 +- cylc/flow/cfgspec/workflow.py | 22 +++++++++++----------- cylc/flow/config.py | 7 +++++++ cylc/flow/host_select.py | 4 ++-- cylc/flow/task_outputs.py | 8 ++------ cylc/flow/util.py | 4 ++-- tests/integration/test_optional_outputs.py | 14 +++++++++++--- 7 files changed, 36 insertions(+), 25 deletions(-) diff --git a/changes.d/5651.break.md b/changes.d/5651.break.md index 9edff066cf4..6308121c072 100644 --- a/changes.d/5651.break.md +++ b/changes.d/5651.break.md @@ -1,6 +1,6 @@ Refinements to optional outputs: -* Require at least one optional output to be generated. This is a breaking +* If a task has optional outputs, require at least one of them to be generated. This is a breaking change for some workflows e.g. `a? => b` where `a:fail?` is not referenced in the graph. To fix, set `[runtime][a]completion = succeeded or failed`. * Allow the condition for task completion to be manually specified. diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index e10b7cc7264..802384b1b19 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -964,24 +964,24 @@ def get_script_common_text(this: str, example: Optional[str] = None): Conf('completion', VDR.V_STRING, desc=''' Define the condition for task completion. - The completion condition is evaluated when a task is finished, - it is a validation check which confirms that the task has + The completion condition is evaluated when a task is finished. + It is a validation check which confirms that the task has generated the outputs it was expected to. If the task fails this check it is considered - :term:`incomplete` and will cause the workflow to - :term:`stall` altering you that something has gone wrong which + :term:`incomplete` and may cause the workflow to + :term:`stall`, alerting you that something has gone wrong which requires investigation. By default, the completion condition ensures that the - task succeeds. I.E. if the task fails, then the workflow will + task succeeds - i.e., if the task fails, then the workflow will stall. If :ref:`User Guide Optional Outputs` are defined in the graph, - then one or more of the optional outputs must also be generated - in order for the task to be completed. + then by default one or more of the optional outputs must also + be generated in order for the task to be completed. - E.G. In this example, the task ``foo`` must succeed *and* + E.g., in this example, the task ``foo`` must succeed *and* yield one of the outputs ``a`` or ``b`` in order to be considered complete: @@ -1001,7 +1001,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): The ``completion`` configuration allows you to override the default completion to suit your needs. - E.G. In this example, the task ``foo`` must yield both the + E.g., in this example, the task ``foo`` must yield both the ``succeeded`` output and the :term:`custom output` ``myoutput`` to be considered complete: @@ -1046,7 +1046,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): """ [runtime] [[a]] - # this completion condition infers that the + # this completion condition implies that the # succeeded output is optional completion = succeeded or failed @@ -1057,7 +1057,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): ``succeeded or expired`` The task can either succeed or :ref:`expire `. - ``succeeded or (failed and some_error)`` + ``succeeded or (failed and my_error)`` The task can fail, but only if it also yields the custom output ``my_error``. ``succeeded and (x or y or z)`` diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 57d72a6c322..b24f3ee0919 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1157,6 +1157,13 @@ def check_completion_expressions(self): f'\nUse "submit_failed" rather than "submit-failed"' ' in completion expressions.' ) + elif '-' in expr: + raise WorkflowConfigError( + f'Error in [runtime][{name}]completion:' + f'\n {expr}' + '\nReplace hyphens with underscores in task outputs when' + ' used in completion expressions.' + ) # check completion expressions are not being used in compat mode if cylc.flow.flags.cylc7_back_compat: diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index df3b0c01c75..324030f3b9c 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -19,8 +19,8 @@ Ranking/filtering hosts can be achieved using Python expressions which work with the `psutil` interfaces. -These expressions are used-defined, buy run a restricted evluation environment -where only certain whitelisted operations are permitted. +These expressions are used-defined, but run in a restricted evaluation +environment where only certain whitelisted operations are permitted. Examples: >>> RankingExpressionEvaluator('1 + 1') diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 2b3ab5be6b3..b129193a933 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -111,12 +111,8 @@ def get_completion_expression(tdef): ors.sort() # join the lists of "ands" and "ors" into statements - _ands = '' - if ands: - _ands = ' and '.join(ands) - _ors = '' - if ors: - _ors = ' or '.join(ors) + _ands = ' and '.join(ands) + _ors = ' or '.join(ors) # join the statements of "ands" and "ors" into an expression if ands and ors: diff --git a/cylc/flow/util.py b/cylc/flow/util.py index a2988a62cda..2cb84bdeb05 100644 --- a/cylc/flow/util.py +++ b/cylc/flow/util.py @@ -149,7 +149,7 @@ def __init__(self, node): class RestrictedNodeVisitor(ast.NodeVisitor): - """AST node visitor which error on non-whitelisted syntax. + """AST node visitor which errors on non-whitelisted syntax. Raises _RestrictedEvalError if a non-whitelisted node is visited. """ @@ -177,7 +177,7 @@ def restricted_evaluator( safely with user-provided input. The code passed into the evaluator will be parsed into an abstract syntax - tree AST), then that tree will be executed using Python's internal logic. + tree (AST), then that tree will be executed using Python's internal logic. The evaluator will check the type of each node before it is executed and fail with a ValueError if it is not permitted. diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index 630a8980f4e..90845308006 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -162,7 +162,7 @@ async def test_workflow_stalls_if_no_opt_output_generated_2( assert not schd.pool.is_stalled() -async def test_completion_expiression( +async def test_completion_expression( flow, scheduler, start, @@ -205,7 +205,7 @@ async def test_completion_expiression( # complete the "x" output itask.state.outputs.set_completion('x', True) - # the task *should* not be removed from the pool as complete + # the task *should* be removed from the pool as complete schd.pool.remove_if_complete(itask) assert mock_remove.call_count == 1 @@ -290,8 +290,16 @@ def test_validate_completion_expression( id_ = flow(config) validate(id_) + # it should give a generic warning when hypens are used + config['runtime']['a']['completion'] = 'a-b - c' + id_ = flow(config) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert '[runtime][a]completion' in str(exc_ctx.value) + assert 'Replace hyphens with underscores' in str(exc_ctx.value) + -def test_clock_expire_infers_optional_expiry( +def test_clock_expire_implies_optional_expiry( flow, validate, ):