Skip to content

Commit

Permalink
Allow manual trigger when workflow paused.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jul 4, 2024
1 parent cc28eb3 commit 4c577e0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
7 changes: 3 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,8 +1225,7 @@ def release_queued_tasks(self) -> bool:
"""
if (
not self.is_paused
and self.stop_mode is None
self.stop_mode is None
and self.auto_restart_time is None
and self.reload_pending is False
):
Expand Down Expand Up @@ -1883,7 +1882,7 @@ def pause_workflow(self, msg: Optional[str] = None) -> None:
if msg:
_msg += f': {msg}'
LOG.info(_msg)
self.is_paused = True
self.is_paused = self.pool.task_queue_mgr.is_paused = True
self.workflow_db_mgr.put_workflow_paused(True)
self.update_data_store()

Expand All @@ -1905,7 +1904,7 @@ def resume_workflow(self, quiet: bool = False) -> None:
return
if not quiet:
LOG.info("RESUMING the workflow now")
self.is_paused = False
self.is_paused = self.pool.task_queue_mgr.is_paused = False
self.workflow_db_mgr.put_workflow_paused(False)
self.update_data_store()

Expand Down
13 changes: 10 additions & 3 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def push_task(self, itask: 'TaskProxy') -> None:
if itask.tdef.name in self.members:
self.deque.appendleft(itask)

def release(self, active: Counter[str]) -> List['TaskProxy']:
def release(
self, active: Counter[str], is_paused: bool = False
) -> List['TaskProxy']:
"""Release tasks if below the active limit."""
# The "active" argument counts active tasks by name.
released: List['TaskProxy'] = []
Expand All @@ -54,7 +56,10 @@ def release(self, active: Counter[str]) -> List['TaskProxy']:
except IndexError:
# deque empty
break
if itask.state.is_held:
if (
itask.state.is_held or
(is_paused and not itask.is_manual_submit)
):
held.append(itask)
else:
released.append(itask)
Expand Down Expand Up @@ -114,6 +119,8 @@ def __init__(self,
)

self.force_released: Set['TaskProxy'] = set()
# if paused don't task released unless they are manually triggered
self.is_paused = False

def push_task(self, itask: 'TaskProxy') -> None:
"""Push a task to the appropriate queue."""
Expand All @@ -124,7 +131,7 @@ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks up to the queue limits."""
released: List['TaskProxy'] = []
for queue in self.queues.values():
released += queue.release(active)
released += queue.release(active, self.is_paused)
if self.force_released:
released.extend(self.force_released)
self.force_released = set()
Expand Down

0 comments on commit 4c577e0

Please sign in to comment.