Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve xtrigger validation #60

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1735,14 +1734,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

# Specific xtrigger.validate(), if available.
with suppress(AttributeError, ImportError):
get_xtrig_func(xtrig.func_name, "validate", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
Expand Down Expand Up @@ -2434,8 +2425,8 @@ def upgrade_clock_triggers(self):
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
args = [] if offset is None else [offset]
xtrig = SubFuncContext(label, 'wall_clock', args, {})
if self.xtrigger_mgr is None:
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,12 @@ class XtriggerConfigError(WorkflowConfigError):

"""

def __init__(self, label: str, trigger: str, message: str):
def __init__(self, label: str, message: str):
self.label: str = label
self.trigger: str = trigger
self.message: str = message

def __str__(self):
return f'[{self.label}] {self.message}'
def __str__(self) -> str:
return f'[@{self.label}] {self.message}'


class ClientError(CylcError):
Expand Down
88 changes: 55 additions & 33 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import wall_clock
from cylc.flow.xtriggers.wall_clock import _wall_clock

if TYPE_CHECKING:
from inspect import BoundArguments
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocctx import SubFuncContext
Expand Down Expand Up @@ -246,9 +247,11 @@ def check_xtrigger(
fctx: 'SubFuncContext',
fdir: str,
) -> None:
"""Generic xtrigger validation: check existence and string templates.
"""Generic xtrigger validation: check existence, string templates and
function signature.

Xtrigger modules may also supply a specific "validate" function.
Xtrigger modules may also supply a specific `validate` function
which will be run here.

Args:
label: xtrigger label
Expand All @@ -262,6 +265,7 @@ def check_xtrigger(
* If the function is not callable.
* If any string template in the function context
arguments are not present in the expected template values.
* If the arguments do not match the function signature.

"""
fname: str = fctx.func_name
Expand All @@ -270,32 +274,30 @@ def check_xtrigger(
func = get_xtrig_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
fname,
f"xtrigger module '{fname}' not found",
label, f"xtrigger module '{fname}' not found",
)
except AttributeError:
raise XtriggerConfigError(
label,
fname,
f"'{fname}' not found in xtrigger module '{fname}'",
label, f"'{fname}' not found in xtrigger module '{fname}'",
)

if not callable(func):
raise XtriggerConfigError(
label,
fname,
f"'{fname}' not callable in xtrigger module '{fname}'",
label, f"'{fname}' not callable in xtrigger module '{fname}'",
)
if func is not wall_clock:
# Validate args and kwargs against the function signature
# (but not for wall_clock because it's a special case).
try:
signature(func).bind(*fctx.func_args, **fctx.func_kwargs)
except TypeError as exc:
raise XtriggerConfigError(
label, fname, f"{fctx.get_signature()}: {exc}"
)

# Validate args and kwargs against the function signature
sig_str = fctx.get_signature()
try:
bound_args = signature(func).bind(
*fctx.func_args, **fctx.func_kwargs
)
except TypeError as exc:
raise XtriggerConfigError(label, f"{sig_str}: {exc}")
# Specific xtrigger.validate(), if available.
XtriggerManager.try_xtrig_validate_func(
label, fname, fdir, bound_args, sig_str
)

# Check any string templates in the function arg values (note this
# won't catch bad task-specific values - which are added dynamically).
Expand All @@ -311,9 +313,7 @@ def check_xtrigger(
template_vars.add(TemplateVariables(match))
except ValueError:
raise XtriggerConfigError(
label,
fname,
f"Illegal template in xtrigger: {match}",
label, f"Illegal template in xtrigger: {match}",
)

# check for deprecated template variables
Expand All @@ -329,6 +329,30 @@ def check_xtrigger(
f' {", ".join(t.value for t in deprecated_variables)}'
)

@staticmethod
def try_xtrig_validate_func(
label: str,
fname: str,
fdir: str,
bound_args: 'BoundArguments',
signature_str: str,
):
"""Call an xtrigger's `validate()` function if it is implemented.

Raise XtriggerConfigError if validation fails.
"""
try:
xtrig_validate_func = get_xtrig_func(fname, 'validate', fdir)
except (AttributeError, ImportError):
return
bound_args.apply_defaults()
try:
xtrig_validate_func(bound_args.arguments)
Comment on lines +348 to +350
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, people writing xtrigger validate functions don't have to worry about whether an argument is positional and/or keyword (the original xrandom.validate example was flawed because it didn't account both cases under the previous system)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like it.

except Exception as exc: # Note: catch all errors
raise XtriggerConfigError(
label, f"{signature_str} validation failed: {exc}"
)

def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Expand Down Expand Up @@ -410,20 +434,18 @@ def get_xtrig_ctx(
args = []
kwargs = {}
if ctx.func_name == "wall_clock":
if "trigger_time" in ctx.func_kwargs:
if "trigger_time" in ctx.func_kwargs: # noqa: SIM401 (readabilty)
# Internal (retry timer): trigger_time already set.
kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"]
elif "offset" in ctx.func_kwargs: # noqa: SIM106
else:
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
# Should not happen!
raise ValueError(
"wall_clock function kwargs needs trigger time or offset"
ctx.func_kwargs.get(
"offset",
ctx.func_args[0] if ctx.func_args else None
)
)
else:
# Other xtrig functions: substitute template values.
Expand Down Expand Up @@ -455,7 +477,7 @@ def call_xtriggers_async(self, itask: 'TaskProxy'):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True
elif wall_clock(*ctx.func_args, **ctx.func_kwargs):
elif _wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
self.sat_xtrig[sig] = {}
Expand Down
18 changes: 10 additions & 8 deletions cylc/flow/xtriggers/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

"""A Cylc xtrigger function."""

from typing import Any, Dict, Tuple
from cylc.flow.exceptions import WorkflowConfigError

from typing import Tuple


def echo(*args, **kwargs) -> Tuple:
"""Print arguments to stdout, return kwargs['succeed'] and kwargs.
Expand Down Expand Up @@ -48,15 +47,18 @@ def echo(*args, **kwargs) -> Tuple:
return kwargs["succeed"], kwargs


def validate(f_args, f_kwargs, f_signature):

def validate(all_args: Dict[str, Any]):
"""
Validate the xtrigger function arguments parsed from the workflow config.

This is separate from the xtrigger to allow parse-time validation.

"""
if "succeed" not in f_kwargs or not isinstance(f_kwargs["succeed"], bool):
raise WorkflowConfigError(
f"Requires 'succeed=True/False' arg: {f_signature}"
)
# NOTE: with (*args, **kwargs) pattern, all_args looks like:
# {
# 'args': (arg1, arg2, ...),
# 'kwargs': {kwarg1: val, kwarg2: val, ...}
# }
succeed = all_args['kwargs'].get("succeed")
if not isinstance(succeed, bool):
raise WorkflowConfigError("Requires 'succeed=True/False' arg")
61 changes: 29 additions & 32 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,53 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from typing import Any, Dict
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
"""Return True after the desired wall clock time, False.
def wall_clock(offset: str = 'PT0S'):
"""Trigger at a specific real "wall clock" time relative to the cycle point
in the graph.

Clock triggers, unlike other trigger functions, are executed synchronously
in the main process.

Args:
offset:
ISO 8601 interval to wait after the cycle point is reached in real
time before triggering. May be negative, in which case it will
trigger before the real time reaches the cycle point.
"""
# NOTE: This is just a placeholder for the actual implementation.
# This is only used for validating the signature and for autodocs.
...


def _wall_clock(trigger_time: int) -> bool:
"""Actual implementation of wall_clock.

Return True after the desired wall clock time, or False before.

Args:
trigger_time (int):
trigger_time:
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate(f_args, f_kwargs, f_signature):
def validate(args: Dict[str, Any]):
"""Validate and manipulate args parsed from the workflow config.

NOTE: the xtrigger signature is different to the function signature above

wall_clock() # infer zero interval
wall_clock(PT1H)
wall_clock(offset=PT1H)

The offset must be a valid ISO 8601 interval.

If f_args used, convert to f_kwargs for clarity.

"""

n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"Too many args: {f_signature}")

if n_kwargs:
# sole kwarg must be "offset"
kw = next(iter(f_kwargs))
if kw != "offset":
raise WorkflowConfigError(f"Illegal arg '{kw}': {f_signature}")

elif n_args:
# convert to kwarg
f_kwargs["offset"] = f_args[0]
del f_args[0]

else:
# no args, infer zero interval
f_kwargs["offset"] = "P0Y"

# must be a valid interval
try:
interval_parse(f_kwargs["offset"])
interval_parse(args["offset"])
except (ValueError, AttributeError):
raise WorkflowConfigError(f"Invalid offset: {f_signature}")
raise WorkflowConfigError(f"Invalid offset: {args['offset']}")
Loading