Skip to content

Commit

Permalink
pool: fix possible infinite loop bug
Browse files Browse the repository at this point in the history
* Closes #5603
* Fixes an infinite loop which could occur where one or more task sequences
  terminate before the start point.
* This triggered a bug whereby the runahead limit was disabled on
  startup causing an infinite spawning bug.
  • Loading branch information
oliver-sanders committed Jun 28, 2023
1 parent 9a0f5f8 commit 4d66301
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ enhancements to `cylc lint`:

### Fixes

[#5604](https://github.com/cylc/cylc-flow/pull/5604) -
Fix a possible issue where workflows started using
`cylc play --start-cycle-point` could hang during startup.

[#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps
for `cylc play` when called by `cylc vip` or `cylc vr`.

Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,10 @@ def get_next_point_on_sequence(
return self.get_next_point_on_sequence(result)
return result

def get_first_point(self, point):
def get_first_point(
self,
point: ISO8601Point
) -> Optional[ISO8601Point]:
"""Return the first point >= to point, or None if out of bounds."""
with contextlib.suppress(KeyError):
return ISO8601Point(self._cached_first_point_values[point.value])
Expand Down
15 changes: 10 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,17 @@ def compute_runahead(self, force=False) -> bool:
changed (needed if max_future_offset changed, or on reload).
"""
points: List['PointBase'] = []
sequence_points: Set['PointBase']
if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = list({
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
})
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand Down Expand Up @@ -793,7 +798,7 @@ def get_hidden_tasks(self) -> List[TaskProxy]:
self._hidden_pool_list.extend(list(itask_id_maps.values()))
return self._hidden_pool_list

def get_tasks_by_point(self):
def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
"""Return a map of task proxies by cycle point."""
point_itasks = {}
for point, itask_id_map in self.main_pool.items():
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,30 @@ async def test_graph_change_prereq_satisfaction(
schd.pool.reload_taskdefs()

await test.asend(schd)


async def test_runahead_limit_for_sequence_before_start_cycle(
flow,
scheduler,
start,
):
"""It should obey the runahead limit.
Ensure the runahead limit is computed correctly for sequences before the start cycle
See https://github.com/cylc/cylc-flow/issues/5603
"""
id_ = flow({
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P2Y',
'graph': {
'R1/2000': 'a',
'P1Y': 'b[-P1Y] => b',
},
}
})
schd = scheduler(id_, startcp='2005')
async with start(schd):
assert str(schd.pool.runahead_limit_point) == '20070101T0000Z'

0 comments on commit 4d66301

Please sign in to comment.