Skip to content

Commit

Permalink
task_pool: fix compute_runahead calculation
Browse files Browse the repository at this point in the history
* Closes cylc#5825
* Cycles were considered active if they contained runahead limited
  tasks.
* This could cause the runahead limit to be bumped forwards whenever the
  limit calculation was forced to update, e.g. on reload.
* This filters out tasks at or beyond the runahead limit and straigntens
  out the task status checks to match Cylc 7 behaviour in compat mode.
  • Loading branch information
oliver-sanders committed Nov 21, 2023
1 parent 88b2798 commit 40db21c
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 17 deletions.
49 changes: 32 additions & 17 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_FINAL,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_WAITING,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUCCEEDED,
Expand Down Expand Up @@ -362,20 +362,35 @@ def compute_runahead(self, force=False) -> bool:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
any(
# filter out runahead tasks
itask.state(is_runahead=False)
and (
# waiting tasks
# * tasks with partially satisfied prereqs
# * tasks with incomplete xtriggers
# * held tasks
itask.state(TASK_STATUS_WAITING)

# unfinished tasks (back-compat mode)
# * Cylc 7 runahead logic considered a cycle to be
# active if it had "unfinished" tasks
or (
cylc.flow.flags.cylc7_back_compat
and not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED,
)
)

# incomplete tasks (optional outputs)
# * tasks with one or more required outputs
# incomplete
or (
not cylc.flow.flags.cylc7_back_compat
and itask.state.outputs.is_incomplete()
)
)
for itask in itasks
)
Expand Down
88 changes: 88 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,3 +1246,91 @@ async def test_detect_incomplete_tasks(
assert log_filter(log, contains=f"[{itask}] did not complete required outputs:")
# the task should not have been removed
assert itask in schd.pool.get_tasks()


@pytest.mark.parametrize('back_compat', [True, False])
@pytest.mark.parametrize('integer', [True, False])
async def test_compute_runahead(
integer,
back_compat,
flow,
scheduler,
start,
monkeypatch,
):
"""Test the calculation of the runahead limit.
This test ensures that:
* Runahead tasks are excluded from computations
see https://github.com/cylc/cylc-flow/issues/5825
* Tasks are initiated with the correct is_runahead status on statup.
* Behaviour is the same in compat/regular modes.
* Behaviour is the same for integer/datetime cycling modes.
"""
if integer:
# cycling mode = integer
config = {
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': '1',
'cycling mode': 'integer',
'runahead limit': 'P3',
'graph': {
'P1': 'a'
},
}
}
point = lambda point: IntegerPoint(str(int(point)))
else:
# cycling mode = gregorian
config = {
'scheduler': {
'allow implicit tasks': 'True',
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '0001',
'runahead limit': 'P3Y',
'graph': {
'P1Y': 'a'
},
}
}
point = ISO8601Point

monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', back_compat)

id_ = flow(config)
schd = scheduler(id_)
async with start(schd):
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4

# ensure task states are initiated with is_runahead status
assert schd.pool.get_task(point('0001'), 'a').state(is_runahead=False)
assert schd.pool.get_task(point('0005'), 'a').state(is_runahead=True)

# mark the first three cycles as running
for cycle in range(1, 4):
schd.pool.get_task(point(f'{cycle:04}'), 'a').state.reset(
TASK_STATUS_RUNNING
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4 # no change

# mark cycle 1 as incomplete (but finished)
schd.pool.get_task(point('0001'), 'a').state.reset(
TASK_STATUS_SUBMIT_FAILED
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4 # no change

# mark cycle 1 as complete
schd.pool.get_task(point('0001'), 'a').state.reset(
TASK_STATUS_SUCCEEDED
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 5 # +1

0 comments on commit 40db21c

Please sign in to comment.