Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: fix infinite loop with --start-cycle-point #5604

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ enhancements to `cylc lint`:

### Fixes

[#5604](https://github.com/cylc/cylc-flow/pull/5604) -
Fix a possible issue where workflows started using
`cylc play --start-cycle-point` could hang during startup.

[#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps
for `cylc play` when called by `cylc vip` or `cylc vr`.

Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,10 @@ def get_next_point_on_sequence(
return self.get_next_point_on_sequence(result)
return result

def get_first_point(self, point):
def get_first_point(
self,
point: ISO8601Point
) -> Optional[ISO8601Point]:
"""Return the first point >= to point, or None if out of bounds."""
with contextlib.suppress(KeyError):
return ISO8601Point(self._cached_first_point_values[point.value])
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ async def start(self):
self.server.thread.start()
barrier.wait()

await self.configure()
self._configure_contact()
await self.configure()
Comment on lines 702 to +703
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated: Swap the order of these two so that the contact file gets written earlier.

This reduces the window of opportunity for two schedulers to start up for the same workflow. Taking a look at the contact file fields, I can't see anything set during Scheduler.configure which is used in _configure_contact so the order should be arbitrary.

except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -1807,7 +1807,11 @@ async def _shutdown(self, reason: BaseException) -> None:
sys.stdout.flush()
sys.stderr.flush()

if self.contact_data and self.task_job_mgr:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the previous, because contact_data is set a little earlier, this test needed bodging a little as we could get into the situation where the workflow failed to startup, but did configure its contact file triggering remote-tidy to run on shutdown which caused the database to be created when it shouldn't have been.

Test changed to only run when the database is present, which is a better test, as remote-tidy needs database info to function.

if (
self.workflow_db_mgr.pri_path
and Path(self.workflow_db_mgr.pri_path).exists()
):
# only attempt remote tidy if the workflow has been started
self.task_job_mgr.task_remote_mgr.remote_tidy()

try:
Expand Down
15 changes: 10 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,17 @@ def compute_runahead(self, force=False) -> bool:
changed (needed if max_future_offset changed, or on reload).
"""
points: List['PointBase'] = []
sequence_points: Set['PointBase']
if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = list({
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
})
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was caused by a None value in this list. Due to the logic lower down, this caused the runahead limit point to be set to None.

]
else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand Down Expand Up @@ -793,7 +798,7 @@ def get_hidden_tasks(self) -> List[TaskProxy]:
self._hidden_pool_list.extend(list(itask_id_maps.values()))
return self._hidden_pool_list

def get_tasks_by_point(self):
def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
"""Return a map of task proxies by cycle point."""
point_itasks = {}
for point, itask_id_map in self.main_pool.items():
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,30 @@ async def test_graph_change_prereq_satisfaction(
schd.pool.reload_taskdefs()

await test.asend(schd)


async def test_runahead_limit_for_sequence_before_start_cycle(
flow,
scheduler,
start,
):
"""It should obey the runahead limit.

Ensure the runahead limit is computed correctly for sequences before the start cycle

See https://github.com/cylc/cylc-flow/issues/5603
"""
id_ = flow({
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P2Y',
'graph': {
'R1/2000': 'a',
'P1Y': 'b[-P1Y] => b',
},
}
})
schd = scheduler(id_, startcp='2005')
async with start(schd):
assert str(schd.pool.runahead_limit_point) == '20070101T0000Z'