Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Merge 8.3.x-sync into master #6217

Merged
merged 8 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.3.2 (Released 2024-07-10)__

### 🔧 Fixes

[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing.

[#6186](https://github.com/cylc/cylc-flow/pull/6186) - Fixed bug where using flow numbers with `cylc set` would not work correctly.

[#6200](https://github.com/cylc/cylc-flow/pull/6200) - Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused

[#6206](https://github.com/cylc/cylc-flow/pull/6206) - Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger.

## __cylc-8.3.1 (Released 2024-07-04)__

### 🔧 Fixes
Expand Down
1 change: 0 additions & 1 deletion changes.d/6200.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6206.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/fix.6178.md

This file was deleted.

2 changes: 1 addition & 1 deletion cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

environ_init()

__version__ = '8.3.2.dev'
__version__ = '8.3.3.dev'

Check warning on line 56 in cylc/flow/__init__.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/__init__.py#L56

Added line #L56 was not covered by tests


def iter_entry_points(entry_point_name):
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/scripts/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@

from functools import partial
import sys
from typing import Tuple, TYPE_CHECKING
from typing import Iterable, TYPE_CHECKING

from cylc.flow.exceptions import InputError
from cylc.flow.network.client_factory import get_client
Expand Down Expand Up @@ -177,7 +177,7 @@ def get_option_parser() -> COP:
return parser


def validate_tokens(tokens_list: Tuple['Tokens']) -> None:
def validate_tokens(tokens_list: Iterable['Tokens']) -> None:
"""Check the cycles/tasks provided.

This checks that cycle/task selectors have not been provided in the IDs.
Expand Down Expand Up @@ -214,7 +214,7 @@ def validate_tokens(tokens_list: Tuple['Tokens']) -> None:
async def run(
options: 'Values',
workflow_id: str,
*tokens_list
*tokens_list: 'Tokens'
):
validate_tokens(tokens_list)

Expand Down
97 changes: 51 additions & 46 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1578,50 +1578,42 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[bool, int, str, bool]:
"""Get history of previous submits for this task.
) -> Tuple[int, Optional[str], bool]:
"""Get submit_num, status, flow_wait for point/name in flow_nums.

Args:
name: task name
point: task cycle point
flow_nums: task flow numbers

Returns:
never_spawned: if task never spawned before
submit_num: submit number of previous submit
prev_status: task status of previous sumbit
prev_flow_wait: if previous submit was a flow-wait task
(submit_num, status, flow_wait)
If no matching history, status will be None

"""
submit_num: int = 0
status: Optional[str] = None
flow_wait = False

info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned in any flow
submit_num = 0
never_spawned = True
else:
never_spawned = False
# (submit_num could still be zero, if removed before submit)

prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
with suppress(ValueError):
submit_num = max(s[0] for s in info)

for _snum, f_wait, old_fnums, status in info:
for _snum, f_wait, old_fnums, old_status in info:
if set.intersection(flow_nums, old_fnums):
# matching flows
prev_status = status
prev_flow_wait = f_wait
if prev_status in TASK_STATUSES_FINAL:
status = old_status
flow_wait = f_wait
if status in TASK_STATUSES_FINAL:
# task finished
break
# Else continue: there may be multiple entries with flow
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.

return never_spawned, submit_num, prev_status, prev_flow_wait
return submit_num, status, flow_wait

def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
"""Load a task's historical outputs from the DB."""
Expand All @@ -1631,8 +1623,11 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
# task never ran before
self.db_add_new_flow_rows(itask)
else:
flow_seen = False
for outputs_str, fnums in info.items():
if itask.flow_nums.intersection(fnums):
# DB row has overlap with itask's flows
flow_seen = True
# BACK COMPAT: In Cylc >8.0.0,<8.3.0, only the task
# messages were stored in the DB as a list.
# from: 8.0.0
Expand All @@ -1649,6 +1644,9 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
# [message] - always the full task message
for msg in outputs:
itask.state.outputs.set_message_complete(msg)
if not flow_seen:
# itask never ran before in its assigned flows
self.db_add_new_flow_rows(itask)

def spawn_task(
self,
Expand All @@ -1658,44 +1656,52 @@ def spawn_task(
force: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Return task proxy if not completed in this flow, or if forced.
"""Return a new task proxy for the given flow if possible.

If finished previously with flow wait, just try to spawn children.
We need to hit the DB for:
- submit number
- task status
- flow-wait
- completed outputs (e.g. via "cylc set")

Note finished tasks may be incomplete, but we don't automatically
re-run incomplete tasks in the same flow.
If history records a final task status (for this flow):
- if not flow wait, don't spawn (return None)
- if flow wait, don't spawn (return None) but do spawn children
- if outputs are incomplete, don't auto-rerun it (return None)

For every task spawned, we need a DB lookup for submit number,
and flow-wait.
Otherwise, spawn the task and load any completed outputs.

"""
if not self.can_be_spawned(name, point):
return None

never_spawned, submit_num, prev_status, prev_flow_wait = (
submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if (
not never_spawned and
not prev_flow_wait and
submit_num == 0
):
# Previous instance removed before completing any outputs.
LOG.debug(f"Not spawning {point}/{name} - task removed")
return None

# Create the task proxy with any completed outputs loaded.
itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
flow_nums,
status=prev_status,
status=prev_status or TASK_STATUS_WAITING,
submit_num=submit_num,
flow_wait=flow_wait,
)
if itask is None:
return None

if (
prev_status is not None
and not itask.state.outputs.get_completed_outputs()
):
# If itask has any history in this flow but no completed outputs
# we can infer it was deliberately removed, so don't respawn it.
# TODO (follow-up work):
# - this logic fails if task removed after some outputs completed
# - this is does not conform to future "cylc remove" flow-erasure
# behaviour which would result in respawning of the removed task
# See github.com/cylc/cylc-flow/pull/6186/#discussion_r1669727292
LOG.debug(f"Not respawning {point}/{name} - task was removed")
return None

if prev_status in TASK_STATUSES_FINAL:
# Task finished previously.
msg = f"[{point}/{name}:{prev_status}] already finished"
Expand Down Expand Up @@ -1878,7 +1884,6 @@ def set_prereqs_and_outputs(
- future tasks must be specified individually
- family names are not expanded to members


Uses a transient task proxy to spawn children. (Even if parent was
previously spawned in this flow its children might not have been).

Expand Down Expand Up @@ -1963,6 +1968,7 @@ def _set_outputs_itask(
self.data_store_mgr.delta_task_outputs(itask)
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)
self.workflow_db_mgr.process_queued_ops()

def _set_prereqs_itask(
self,
Expand Down Expand Up @@ -2168,10 +2174,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

_, submit_num, _prev_status, prev_fwait = (
submit_num, _, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)

itask = TaskProxy(
self.tokens,
self.config.get_taskdef(name),
Expand Down
32 changes: 19 additions & 13 deletions tests/integration/test_dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

"""Tests for the backend method of workflow_state"""


from asyncio import sleep
import pytest
from textwrap import dedent

from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.scheduler import Scheduler
Expand All @@ -36,13 +33,15 @@ async def checker(
"""
wid = mod_flow({
'scheduling': {
'graph': {'P1Y': dedent('''
good:succeeded
bad:failed?
output:custom_output
''')},
'initial cycle point': '1000',
'final cycle point': '1001'
'final cycle point': '1001',
'graph': {
'P1Y': '''
good:succeeded
bad:failed?
output:custom_output
'''
},
},
'runtime': {
'bad': {'simulation': {'fail cycle points': '1000'}},
Expand All @@ -51,11 +50,17 @@ async def checker(
})
schd: Scheduler = mod_scheduler(wid, paused_start=False)
async with mod_run(schd):
# allow a cycle of the main loop to pass so that flow 2 can be
# added to db
await mod_complete(schd)

# trigger a new task in flow 2
schd.pool.force_trigger_tasks(['1000/good'], ['2'])
# Allow a cycle of the main loop to pass so that flow 2 can be
# added to db
await sleep(1)

# update the database
schd.process_workflow_db_queue()

# yield a DB checker
with CylcWorkflowDBChecker(
'somestring', 'utterbunkum', schd.workflow_db_mgr.pub_path
) as _checker:
Expand All @@ -73,7 +78,7 @@ def test_basic(checker):
['output', '10000101T0000Z', 'succeeded'],
['output', '10010101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]
['good', '10010101T0000Z', 'waiting', '(flows=2)'], ]
assert result == expect


Expand Down Expand Up @@ -131,5 +136,6 @@ def test_flownum(checker):
result = checker.workflow_state_query(flow_num=2)
expect = [
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
['good', '10010101T0000Z', 'waiting', '(flows=2)'],
]
assert result == expect
45 changes: 43 additions & 2 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1893,7 +1893,7 @@ async def test_fast_respawn(
# attempt to spawn it again
itask = task_pool.spawn_task("foo", IntegerPoint("1"), {1})
assert itask is None
assert "Not spawning 1/foo - task removed" in caplog.text
assert "Not respawning 1/foo - task was removed" in caplog.text


async def test_remove_active_task(
Expand Down Expand Up @@ -2019,9 +2019,50 @@ async def test_remove_no_respawn(flow, scheduler, start, log_filter):
# respawned as a result
schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED)
assert log_filter(
log, contains='Not spawning 1/z - task removed'
log, contains='Not respawning 1/z - task was removed'
)
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert (
z1 is None
), '1/z should have stayed removed (but has been added back into the pool'


async def test_set_future_flow(flow, scheduler, start, log_filter):
"""Manually-set outputs for new flow num must be recorded in the DB.

See https://github.com/cylc/cylc-flow/pull/6186

To trigger the bug, the flow must be new but the task must have been
spawned before in an earlier flow.

"""
# Scenario: after flow 1, set c1:succeeded in a future flow so
# when b succeeds in the new flow it will spawn c2 but not c1.
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'cycling mode': 'integer',
'graph': {
'R1': 'b => c1 & c2',
},
},
})
schd: 'Scheduler' = scheduler(id_)
async with start(schd, level=logging.DEBUG) as log:

assert schd.pool.get_task(IntegerPoint("1"), "b") is not None, '1/b should be spawned on startup'

# set b, c1, c2 succeeded in flow 1
schd.pool.set_prereqs_and_outputs(['1/b', '1/c1', '1/c2'], prereqs=[], outputs=[], flow=[1])
schd.workflow_db_mgr.process_queued_ops()

# set task c1:succeeded in flow 2
schd.pool.set_prereqs_and_outputs(['1/c1'], prereqs=[], outputs=[], flow=[2])
schd.workflow_db_mgr.process_queued_ops()

# set b:succeeded in flow 2 and check downstream spawning
schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2])
assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded'
assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'
Loading