diff --git a/CHANGES.md b/CHANGES.md index 1edbb2b04d..061d935a01 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,18 @@ $ towncrier create ..md --content "Short description" +## __cylc-8.3.2 (Released 2024-07-10)__ + +### 🔧 Fixes + +[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. + +[#6186](https://github.com/cylc/cylc-flow/pull/6186) - Fixed bug where using flow numbers with `cylc set` would not work correctly. + +[#6200](https://github.com/cylc/cylc-flow/pull/6200) - Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused + +[#6206](https://github.com/cylc/cylc-flow/pull/6206) - Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger. + ## __cylc-8.3.1 (Released 2024-07-04)__ ### 🔧 Fixes diff --git a/changes.d/6200.fix.md b/changes.d/6200.fix.md deleted file mode 100644 index 3b4cf8012c..0000000000 --- a/changes.d/6200.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused \ No newline at end of file diff --git a/changes.d/6206.fix.md b/changes.d/6206.fix.md deleted file mode 100644 index fef5fb1ec2..0000000000 --- a/changes.d/6206.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger. \ No newline at end of file diff --git a/changes.d/fix.6178.md b/changes.d/fix.6178.md deleted file mode 100644 index 7d1b9b0f3f..0000000000 --- a/changes.d/fix.6178.md +++ /dev/null @@ -1 +0,0 @@ -Fix an issue where Tui could hang when closing. diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 3489713750..71a45f6bbf 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -15,9 +15,8 @@ # along with this program. If not, see . """Set up the cylc environment.""" -import os import logging - +import os CYLC_LOG = 'cylc' diff --git a/cylc/flow/scripts/set.py b/cylc/flow/scripts/set.py index 6c69d7235f..b64cf74aba 100755 --- a/cylc/flow/scripts/set.py +++ b/cylc/flow/scripts/set.py @@ -90,7 +90,7 @@ from functools import partial import sys -from typing import Tuple, TYPE_CHECKING +from typing import Iterable, TYPE_CHECKING from cylc.flow.exceptions import InputError from cylc.flow.network.client_factory import get_client @@ -177,7 +177,7 @@ def get_option_parser() -> COP: return parser -def validate_tokens(tokens_list: Tuple['Tokens']) -> None: +def validate_tokens(tokens_list: Iterable['Tokens']) -> None: """Check the cycles/tasks provided. This checks that cycle/task selectors have not been provided in the IDs. @@ -214,7 +214,7 @@ def validate_tokens(tokens_list: Tuple['Tokens']) -> None: async def run( options: 'Values', workflow_id: str, - *tokens_list + *tokens_list: 'Tokens' ): validate_tokens(tokens_list) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 21893103f8..7bdca66a1c 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1578,8 +1578,8 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool: def _get_task_history( self, name: str, point: 'PointBase', flow_nums: Set[int] - ) -> Tuple[bool, int, str, bool]: - """Get history of previous submits for this task. + ) -> Tuple[int, Optional[str], bool]: + """Get submit_num, status, flow_wait for point/name in flow_nums. Args: name: task name @@ -1587,41 +1587,33 @@ def _get_task_history( flow_nums: task flow numbers Returns: - never_spawned: if task never spawned before - submit_num: submit number of previous submit - prev_status: task status of previous sumbit - prev_flow_wait: if previous submit was a flow-wait task + (submit_num, status, flow_wait) + If no matching history, status will be None """ + submit_num: int = 0 + status: Optional[str] = None + flow_wait = False + info = self.workflow_db_mgr.pri_dao.select_prev_instances( name, str(point) ) - try: - submit_num: int = max(s[0] for s in info) - except ValueError: - # never spawned in any flow - submit_num = 0 - never_spawned = True - else: - never_spawned = False - # (submit_num could still be zero, if removed before submit) - - prev_status: str = TASK_STATUS_WAITING - prev_flow_wait = False + with suppress(ValueError): + submit_num = max(s[0] for s in info) - for _snum, f_wait, old_fnums, status in info: + for _snum, f_wait, old_fnums, old_status in info: if set.intersection(flow_nums, old_fnums): # matching flows - prev_status = status - prev_flow_wait = f_wait - if prev_status in TASK_STATUSES_FINAL: + status = old_status + flow_wait = f_wait + if status in TASK_STATUSES_FINAL: # task finished break # Else continue: there may be multiple entries with flow # overlap due to merges (they'll have have same snum and # f_wait); keep going to find the finished one, if any. - return never_spawned, submit_num, prev_status, prev_flow_wait + return submit_num, status, flow_wait def _load_historical_outputs(self, itask: 'TaskProxy') -> None: """Load a task's historical outputs from the DB.""" @@ -1631,8 +1623,11 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None: # task never ran before self.db_add_new_flow_rows(itask) else: + flow_seen = False for outputs_str, fnums in info.items(): if itask.flow_nums.intersection(fnums): + # DB row has overlap with itask's flows + flow_seen = True # BACK COMPAT: In Cylc >8.0.0,<8.3.0, only the task # messages were stored in the DB as a list. # from: 8.0.0 @@ -1649,6 +1644,9 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None: # [message] - always the full task message for msg in outputs: itask.state.outputs.set_message_complete(msg) + if not flow_seen: + # itask never ran before in its assigned flows + self.db_add_new_flow_rows(itask) def spawn_task( self, @@ -1658,44 +1656,52 @@ def spawn_task( force: bool = False, flow_wait: bool = False, ) -> Optional[TaskProxy]: - """Return task proxy if not completed in this flow, or if forced. + """Return a new task proxy for the given flow if possible. - If finished previously with flow wait, just try to spawn children. + We need to hit the DB for: + - submit number + - task status + - flow-wait + - completed outputs (e.g. via "cylc set") - Note finished tasks may be incomplete, but we don't automatically - re-run incomplete tasks in the same flow. + If history records a final task status (for this flow): + - if not flow wait, don't spawn (return None) + - if flow wait, don't spawn (return None) but do spawn children + - if outputs are incomplete, don't auto-rerun it (return None) - For every task spawned, we need a DB lookup for submit number, - and flow-wait. + Otherwise, spawn the task and load any completed outputs. """ - if not self.can_be_spawned(name, point): - return None - - never_spawned, submit_num, prev_status, prev_flow_wait = ( + submit_num, prev_status, prev_flow_wait = ( self._get_task_history(name, point, flow_nums) ) - if ( - not never_spawned and - not prev_flow_wait and - submit_num == 0 - ): - # Previous instance removed before completing any outputs. - LOG.debug(f"Not spawning {point}/{name} - task removed") - return None - + # Create the task proxy with any completed outputs loaded. itask = self._get_task_proxy_db_outputs( point, self.config.get_taskdef(name), flow_nums, - status=prev_status, + status=prev_status or TASK_STATUS_WAITING, submit_num=submit_num, flow_wait=flow_wait, ) if itask is None: return None + if ( + prev_status is not None + and not itask.state.outputs.get_completed_outputs() + ): + # If itask has any history in this flow but no completed outputs + # we can infer it was deliberately removed, so don't respawn it. + # TODO (follow-up work): + # - this logic fails if task removed after some outputs completed + # - this is does not conform to future "cylc remove" flow-erasure + # behaviour which would result in respawning of the removed task + # See github.com/cylc/cylc-flow/pull/6186/#discussion_r1669727292 + LOG.debug(f"Not respawning {point}/{name} - task was removed") + return None + if prev_status in TASK_STATUSES_FINAL: # Task finished previously. msg = f"[{point}/{name}:{prev_status}] already finished" @@ -1878,7 +1884,6 @@ def set_prereqs_and_outputs( - future tasks must be specified individually - family names are not expanded to members - Uses a transient task proxy to spawn children. (Even if parent was previously spawned in this flow its children might not have been). @@ -1963,6 +1968,7 @@ def _set_outputs_itask( self.data_store_mgr.delta_task_outputs(itask) self.workflow_db_mgr.put_update_task_state(itask) self.workflow_db_mgr.put_update_task_outputs(itask) + self.workflow_db_mgr.process_queued_ops() def _set_prereqs_itask( self, @@ -2168,10 +2174,9 @@ def force_trigger_tasks( if not self.can_be_spawned(name, point): continue - _, submit_num, _prev_status, prev_fwait = ( + submit_num, _, prev_fwait = ( self._get_task_history(name, point, flow_nums) ) - itask = TaskProxy( self.tokens, self.config.get_taskdef(name), diff --git a/tests/integration/test_dbstatecheck.py b/tests/integration/test_dbstatecheck.py index 08fd59aa0e..94de81fbef 100644 --- a/tests/integration/test_dbstatecheck.py +++ b/tests/integration/test_dbstatecheck.py @@ -16,10 +16,7 @@ """Tests for the backend method of workflow_state""" - -from asyncio import sleep import pytest -from textwrap import dedent from cylc.flow.dbstatecheck import CylcWorkflowDBChecker from cylc.flow.scheduler import Scheduler @@ -36,13 +33,15 @@ async def checker( """ wid = mod_flow({ 'scheduling': { - 'graph': {'P1Y': dedent(''' - good:succeeded - bad:failed? - output:custom_output - ''')}, 'initial cycle point': '1000', - 'final cycle point': '1001' + 'final cycle point': '1001', + 'graph': { + 'P1Y': ''' + good:succeeded + bad:failed? + output:custom_output + ''' + }, }, 'runtime': { 'bad': {'simulation': {'fail cycle points': '1000'}}, @@ -51,11 +50,17 @@ async def checker( }) schd: Scheduler = mod_scheduler(wid, paused_start=False) async with mod_run(schd): + # allow a cycle of the main loop to pass so that flow 2 can be + # added to db await mod_complete(schd) + + # trigger a new task in flow 2 schd.pool.force_trigger_tasks(['1000/good'], ['2']) - # Allow a cycle of the main loop to pass so that flow 2 can be - # added to db - await sleep(1) + + # update the database + schd.process_workflow_db_queue() + + # yield a DB checker with CylcWorkflowDBChecker( 'somestring', 'utterbunkum', schd.workflow_db_mgr.pub_path ) as _checker: @@ -73,7 +78,7 @@ def test_basic(checker): ['output', '10000101T0000Z', 'succeeded'], ['output', '10010101T0000Z', 'succeeded'], ['good', '10000101T0000Z', 'waiting', '(flows=2)'], - ] + ['good', '10010101T0000Z', 'waiting', '(flows=2)'], ] assert result == expect @@ -131,5 +136,6 @@ def test_flownum(checker): result = checker.workflow_state_query(flow_num=2) expect = [ ['good', '10000101T0000Z', 'waiting', '(flows=2)'], + ['good', '10010101T0000Z', 'waiting', '(flows=2)'], ] assert result == expect diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 183532e060..a7d205c135 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1893,7 +1893,7 @@ async def test_fast_respawn( # attempt to spawn it again itask = task_pool.spawn_task("foo", IntegerPoint("1"), {1}) assert itask is None - assert "Not spawning 1/foo - task removed" in caplog.text + assert "Not respawning 1/foo - task was removed" in caplog.text async def test_remove_active_task( @@ -2019,9 +2019,50 @@ async def test_remove_no_respawn(flow, scheduler, start, log_filter): # respawned as a result schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED) assert log_filter( - log, contains='Not spawning 1/z - task removed' + log, contains='Not respawning 1/z - task was removed' ) z1 = schd.pool.get_task(IntegerPoint("1"), "z") assert ( z1 is None ), '1/z should have stayed removed (but has been added back into the pool' + + +async def test_set_future_flow(flow, scheduler, start, log_filter): + """Manually-set outputs for new flow num must be recorded in the DB. + + See https://github.com/cylc/cylc-flow/pull/6186 + + To trigger the bug, the flow must be new but the task must have been + spawned before in an earlier flow. + + """ + # Scenario: after flow 1, set c1:succeeded in a future flow so + # when b succeeds in the new flow it will spawn c2 but not c1. + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'cycling mode': 'integer', + 'graph': { + 'R1': 'b => c1 & c2', + }, + }, + }) + schd: 'Scheduler' = scheduler(id_) + async with start(schd, level=logging.DEBUG) as log: + + assert schd.pool.get_task(IntegerPoint("1"), "b") is not None, '1/b should be spawned on startup' + + # set b, c1, c2 succeeded in flow 1 + schd.pool.set_prereqs_and_outputs(['1/b', '1/c1', '1/c2'], prereqs=[], outputs=[], flow=[1]) + schd.workflow_db_mgr.process_queued_ops() + + # set task c1:succeeded in flow 2 + schd.pool.set_prereqs_and_outputs(['1/c1'], prereqs=[], outputs=[], flow=[2]) + schd.workflow_db_mgr.process_queued_ops() + + # set b:succeeded in flow 2 and check downstream spawning + schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2]) + assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded' + assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'