Skip to content

Commit

Permalink
Review fixes 2
Browse files Browse the repository at this point in the history
Co-authored-by: Ronnie Dutta <[email protected]>
  • Loading branch information
dwsutherland and MetRonnie committed Feb 15, 2024
1 parent ea31d2f commit c7cb74a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 32 deletions.
9 changes: 5 additions & 4 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,11 +803,12 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')

Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc='''
Set to ``True``, this allows for sequential spawning of associated
parentless tasks on xtrigger satisfaction.
Instead of out to the runahead limit (default: ``False``).
When set to ``True``, parentless tasks that trigger off xtriggers
will only spawn sequentially, i.e. on the satisfaction of the
xtriggers in order. Otherwise, these tasks will all spawn at the
same time up to the runahead limit.
This workflow wide default can be overridden by a reserved
This workflow-wide default can be overridden by a reserved
keyword argument in the xtrigger function declaration and/or
function (``sequential=True/False``).
Expand Down
14 changes: 6 additions & 8 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,10 @@ def _get_spawned_or_merged_task(
def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit.
Sequentially checked xtriggers with spawn corresponding task out to the
next task with any xtrigger with the same behaviour, or to the runahead
limit (whichever occurs first).
Sequentially checked xtriggers will spawn the next occurrence of their
corresponding tasks. These tasks will keep spawning until they depend
on any unsatisfied xtrigger of the same sequential behavior, are no
longer parentless, and/or hit the runahead limit.
"""
if not flow_nums or point is None:
Expand Down Expand Up @@ -806,10 +807,7 @@ def remove(self, itask, reason=""):
msg += f" ({reason})"

if itask.is_xtrigger_sequential:
with suppress(ValueError):
self.xtrigger_mgr.sequential_spawn_next.remove(
itask.identity
)
self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity)

try:
del self.hidden_pool[itask.point][itask.identity]
Expand Down Expand Up @@ -1723,7 +1721,7 @@ def remove_tasks(self, items):
"""Remove tasks from the pool."""
itasks, _, bad_items = self.filter_task_proxies(items)
for itask in itasks:
# Spawn next occurance of xtrigger sequential task.
# Spawn next occurrence of xtrigger sequential task.
if itask.is_xtrigger_sequential:
self.spawn_to_rh_limit(
itask.tdef,
Expand Down
11 changes: 3 additions & 8 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,11 @@ def __init__(
self.state = TaskState(tdef, self.point, status, is_held)

# Set xtrigger checking type, which effects parentless spawning.
if (
self.is_xtrigger_sequential = bool(
sequential_xtrigger_labels
and self.tdef.is_parentless(start_point)
and set(self.state.xtriggers.keys()).intersection(
sequential_xtrigger_labels
)
):
self.is_xtrigger_sequential = True
else:
self.is_xtrigger_sequential = False
and sequential_xtrigger_labels.intersection(self.state.xtriggers)
)

# Determine graph children of this task (for spawning).
if data_mode:
Expand Down
16 changes: 11 additions & 5 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
import re
from copy import deepcopy
from time import time
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING
from typing import (
Any,
Dict,
Optional,
Set,
Tuple,
TYPE_CHECKING
)

from cylc.flow import LOG
from cylc.flow.exceptions import XtriggerConfigError
Expand Down Expand Up @@ -243,15 +250,15 @@ def __init__(
self.active: list = []

# Clock labels, to avoid repeated string comparisons
self.wall_clock_labels: set = set()
self.wall_clock_labels: Set[str] = set()

# Workflow wide default, used when not specified in xtrigger kwargs.
self.sequential_xtriggers_default = False
# Labels whose xtriggers are sequentially checked.
self.sequential_xtrigger_labels: set = set()
self.sequential_xtrigger_labels: Set[str] = set()
# Gather parentless tasks whose xtrigger(s) have been satisfied
# (these will be used to spawn the next occurance).
self.sequential_spawn_next: set = set()
self.sequential_spawn_next: Set[str] = set()

self.workflow_run_dir = workflow_run_dir

Expand Down Expand Up @@ -295,7 +302,6 @@ def validate_xtrigger(
* If the function module was not found.
* If the function was not found in the xtrigger module.
* If the function is not callable.
* If the function is not callable.
* If any string template in the function context
arguments are not present in the expected template values.
Expand Down
14 changes: 7 additions & 7 deletions tests/functional/xtriggers/04-sequential.t
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__'
):PT1S
[[graph]]
R1 = """
@clock_1 => a
b
"""
@clock_1 => a
b
"""
+P1Y/P1Y = """
@clock_2 => a
@clock_2 => b
@up_1 => c
"""
@clock_2 => a
@clock_2 => b
@up_1 => c
"""
__FLOW_CONFIG__

run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}"
Expand Down

0 comments on commit c7cb74a

Please sign in to comment.