Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Merge 8.2.x-sync into master #5817

Merged
merged 11 commits into from
Nov 13, 2023
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.2.3 (Released 2023-11-02)__

### 🔧 Fixes

[#5660](https://github.com/cylc/cylc-flow/pull/5660) - Re-worked graph n-window algorithm for better efficiency.

[#5753](https://github.com/cylc/cylc-flow/pull/5753) - Fixed bug where execution time limit polling intervals could end up incorrectly applied

[#5776](https://github.com/cylc/cylc-flow/pull/5776) - Ensure that submit-failed tasks are marked as incomplete (so remain visible) when running in back-compat mode.

[#5791](https://github.com/cylc/cylc-flow/pull/5791) - fix a bug where if multiple clock triggers are set for a task only one was being satisfied.

## __cylc-8.2.2 (Released 2023-10-05)__

### 🚀 Enhancements
Expand Down
1 change: 0 additions & 1 deletion changes.d/5660.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5753.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5776.fix.md

This file was deleted.

1 change: 1 addition & 0 deletions changes.d/5801.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix traceback when using parentheses on right hand side of graph trigger.
4 changes: 2 additions & 2 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import List, Optional, Tuple, Any, Union

from contextlib import suppress
from packaging.version import parse as parse_version, Version
from packaging.version import Version

from cylc.flow import LOG
from cylc.flow import __version__ as CYLC_VERSION
Expand Down Expand Up @@ -1866,7 +1866,7 @@ def get_version_hierarchy(version: str) -> List[str]:
['', '8', '8.0', '8.0.1', '8.0.1a2', '8.0.1a2.dev']

"""
smart_ver: Version = parse_version(version)
smart_ver = Version(version)
base = [str(i) for i in smart_ver.release]
hierarchy = ['']
hierarchy += ['.'.join(base[:i]) for i in range(1, len(base) + 1)]
Expand Down
37 changes: 22 additions & 15 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
Dict,
List,
Tuple,
Optional
Optional,
Union
)

import cylc.flow.flags
Expand Down Expand Up @@ -85,10 +86,10 @@ class GraphParser:
store dependencies for the whole workflow (call parse_graph multiple times
and key results by graph section).

The general form of a dependency is "EXPRESSION => NODE", where:
* On the right, NODE is a task or family name
The general form of a dependency is "LHS => RHS", where:
* On the left, an EXPRESSION of nodes involving parentheses, and
logical operators '&' (AND), and '|' (OR).
* On the right, an EXPRESSION of nodes NOT involving '|'
* Node names may be parameterized (any number of parameters):
NODE<i,j,k>
NODE<i=0,j,k> # specific parameter value
Expand Down Expand Up @@ -517,32 +518,33 @@ def _proc_dep_pair(
"Suicide markers must be"
f" on the right of a trigger: {left}")

# Check that parentheses match.
mismatch_msg = 'Mismatched parentheses in: "{}"'
if left and left.count("(") != left.count(")"):
raise GraphParseError(mismatch_msg.format(left))
if right.count("(") != right.count(")"):
raise GraphParseError(mismatch_msg.format(right))

# Ignore cycle point offsets on the right side.
# (Note we can't ban this; all nodes get process as left and right.)
if '[' in right:
return

# Check that parentheses match.
if left and left.count("(") != left.count(")"):
raise GraphParseError(
"Mismatched parentheses in: \"" + left + "\"")

# Split right side on AND.
rights = right.split(self.__class__.OP_AND)
if '' in rights or right and not all(rights):
raise GraphParseError(
f"Null task name in graph: {left} => {right}")

lefts: Union[List[str], List[Optional[str]]]
if not left or (self.__class__.OP_OR in left or '(' in left):
# Treat conditional or bracketed expressions as a single entity.
# Treat conditional or parenthesised expressions as a single entity
# Can get [None] or [""] here
lefts: List[Optional[str]] = [left]
lefts = [left]
else:
# Split non-conditional left-side expressions on AND.
# Can get [""] here too
# TODO figure out how to handle this wih mypy:
# assign List[str] to List[Optional[str]]
lefts = left.split(self.__class__.OP_AND) # type: ignore
lefts = left.split(self.__class__.OP_AND)
if '' in lefts or left and not all(lefts):
raise GraphParseError(
f"Null task name in graph: {left} => {right}")
Expand Down Expand Up @@ -847,9 +849,14 @@ def _compute_triggers(
trigs += [f"{name}{offset}:{trigger}"]

for right in rights:
right = right.strip('()') # parentheses don't matter
m = self.__class__.REC_RHS_NODE.match(right)
# This will match, bad nodes are detected earlier (type ignore):
suicide_char, name, output, opt_char = m.groups() # type: ignore
if not m:
# Bad nodes should have been detected earlier; fail loudly
raise ValueError( # pragma: no cover
f"Unexpected graph expression: '{right}'"
)
suicide_char, name, output, opt_char = m.groups()
suicide = (suicide_char == self.__class__.SUICIDE)
optional = (opt_char == self.__class__.OPTIONAL)
if output:
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sys
from typing import TYPE_CHECKING

from packaging.version import parse as parse_version
from packaging.version import Version

from cylc.flow import LOG, __version__
from cylc.flow.exceptions import (
Expand Down Expand Up @@ -468,7 +468,7 @@ def _version_check(
if not db_file.is_file():
# not a restart
return True
this_version = parse_version(__version__)
this_version = Version(__version__)
last_run_version = WorkflowDatabaseManager.check_db_compatibility(db_file)

for itt, (this, that) in enumerate(zip_longest(
Expand Down
42 changes: 24 additions & 18 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@
)

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.cycling import PointBase
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.taskdef import TaskDef
from cylc.flow.id import Tokens


class TaskProxy:
"""Represent an instance of a cycling task in a running workflow.

Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for wall_clock xtrigger).
.clock_trigger_times:
Memoization of clock trigger times (Used for wall_clock xtrigger):
{offset string: seconds from epoch}
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -152,7 +152,7 @@ class TaskProxy:

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'clock_trigger_time',
'clock_trigger_times',
'expire_time',
'identity',
'is_late',
Expand Down Expand Up @@ -244,7 +244,7 @@ def __init__(
self.try_timers: Dict[str, 'TaskActionTimer'] = {}
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.clock_trigger_times: Dict[str, int] = {}
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
Expand Down Expand Up @@ -355,25 +355,31 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
def get_clock_trigger_time(
self,
point: 'PointBase', offset_str: Optional[str] = None
) -> int:
"""Compute, cache and return trigger time relative to cycle point.

Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
point: Task's cycle point.
offset_str: ISO8601 interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.

"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
offset_str = offset_str if offset_str else 'P0Y'
if offset_str not in self.clock_trigger_times:
if offset_str == 'P0Y':
trigger_time = point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch
)
return self.clock_trigger_time
trigger_time = point + ISO8601Interval(offset_str)

offset = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch)
self.clock_trigger_times[offset_str] = offset
return self.clock_trigger_times[offset_str]

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def get_xtrig_ctx(
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
offsets that xtrigger manager gets both of them.

https://github.com/cylc/cylc-flow/issues/5783

n.b. Clock 3 exists to check the memoization path is followed,
and causing this test to give greater coverage.
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
'clock_2': 'wall_clock(offset=P10Y)',
'clock_3': 'wall_clock(offset=P10Y)',
},
'graph': {
'R1': '@clock_1 & @clock_2 & @clock_3 => foo'
}
}
})
schd = scheduler(id_)
async with start(schd):
foo_proxy = schd.pool.get_tasks()[0]
clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1')
clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')
clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')

assert clock_1_ctx.func_kwargs['trigger_time'] == task_point
assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead
assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead

schd.xtrigger_mgr.call_xtriggers_async(foo_proxy)
assert foo_proxy.state.xtriggers == {
'clock_1': True,
'clock_2': False,
'clock_3': False,
}
Loading
Loading