diff --git a/CHANGES.md b/CHANGES.md index 905528ea71b..ebe4b09083c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,10 @@ enhancements to `cylc lint`: ### Fixes +[#5604](https://github.com/cylc/cylc-flow/pull/5604) - +Fix a possible issue where workflows started using +`cylc play --start-cycle-point` could hang during startup. + [#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps for `cylc play` when called by `cylc vip` or `cylc vr`. diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index 9e7032efdc9..7ceca4174b1 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -544,7 +544,10 @@ def get_next_point_on_sequence( return self.get_next_point_on_sequence(result) return result - def get_first_point(self, point): + def get_first_point( + self, + point: ISO8601Point + ) -> Optional[ISO8601Point]: """Return the first point >= to point, or None if out of bounds.""" with contextlib.suppress(KeyError): return ISO8601Point(self._cached_first_point_values[point.value]) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7b6cfe6a44d..dd4b8efae74 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -699,8 +699,8 @@ async def start(self): self.server.thread.start() barrier.wait() - await self.configure() self._configure_contact() + await self.configure() except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) @@ -1807,7 +1807,11 @@ async def _shutdown(self, reason: BaseException) -> None: sys.stdout.flush() sys.stderr.flush() - if self.contact_data and self.task_job_mgr: + if ( + self.workflow_db_mgr.pri_path + and Path(self.workflow_db_mgr.pri_path).exists() + ): + # only attempt remote tidy if the workflow has been started self.task_job_mgr.task_remote_mgr.remote_tidy() try: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2d2d7fd0926..ae193c044a4 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -312,12 +312,17 @@ def compute_runahead(self, force=False) -> bool: changed (needed if max_future_offset changed, or on reload). """ points: List['PointBase'] = [] + sequence_points: Set['PointBase'] if not self.main_pool: # Start at first point in each sequence, after the initial point. - points = list({ - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - }) + points = [ + point + for point in { + seq.get_first_point(self.config.start_point) + for seq in self.config.sequences + } + if point is not None + ] else: # Find the earliest point with unfinished tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): @@ -793,7 +798,7 @@ def get_hidden_tasks(self) -> List[TaskProxy]: self._hidden_pool_list.extend(list(itask_id_maps.values())) return self._hidden_pool_list - def get_tasks_by_point(self): + def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': """Return a map of task proxies by cycle point.""" point_itasks = {} for point, itask_id_map in self.main_pool.items(): diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 530e0063b70..4cf4ab78208 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -962,3 +962,30 @@ async def test_graph_change_prereq_satisfaction( schd.pool.reload_taskdefs() await test.asend(schd) + + +async def test_runahead_limit_for_sequence_before_start_cycle( + flow, + scheduler, + start, +): + """It should obey the runahead limit. + + Ensure the runahead limit is computed correctly for sequences before the start cycle + + See https://github.com/cylc/cylc-flow/issues/5603 + """ + id_ = flow({ + 'scheduler': {'allow implicit tasks': 'True'}, + 'scheduling': { + 'initial cycle point': '2000', + 'runahead limit': 'P2Y', + 'graph': { + 'R1/2000': 'a', + 'P1Y': 'b[-P1Y] => b', + }, + } + }) + schd = scheduler(id_, startcp='2005') + async with start(schd): + assert str(schd.pool.runahead_limit_point) == '20070101T0000Z'