Skip to content

Commit

Permalink
Merge pull request #6353 from oliver-sanders/6284
Browse files Browse the repository at this point in the history
expiry: prevent expired tasks from retrying automatically
  • Loading branch information
oliver-sanders authored Sep 11, 2024
2 parents ddddfcd + 3216d9d commit 2ec0cb3
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
1 change: 1 addition & 0 deletions changes.d/6353.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent clock-expired tasks from being automatically retried.
1 change: 1 addition & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,7 @@ def clock_expire_tasks(self):
# check if this task is clock expired
and itask.clock_expire()
):
self.task_queue_mgr.remove_task(itask)
self.task_events_mgr.process_message(
itask,
logging.WARNING,
Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2150,3 +2150,51 @@ async def test_trigger_unqueued(flow, scheduler, start):
assert not schd.pool.task_queue_mgr.force_released, (
"Triggering an unqueued task should not affect the force_released list"
)


@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual'])
async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type):
"""An expired waiting task should be removed from any queues.
See https://github.com/cylc/cylc-flow/issues/6284
"""
conf = {
'scheduling': {
'initial cycle point': '2000',

'graph': {
'R1': 'foo'
},
},
'runtime': {
'foo': {
'execution retry delays': 'PT0S'
}
}
}

if expire_type == 'clock-expire':
conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'}
method = lambda schd: schd.pool.clock_expire_tasks()
else:
method = lambda schd: schd.pool.set_prereqs_and_outputs(
['2000/foo'], prereqs=[], outputs=['expired'], flow=['1']
)

id_ = flow(conf)
schd = scheduler(id_)
schd: Scheduler
async with start(schd):
itask = schd.pool.get_tasks()[0]

# the task should start as "waiting(queued)"
assert itask.state(TASK_STATUS_WAITING, is_queued=True)

# expire the task via whichever method we are testing
method(schd)

# the task should enter the "expired" state
assert itask.state(TASK_STATUS_EXPIRED, is_queued=False)

# the task should also have been removed from the queue
assert not schd.pool.task_queue_mgr.remove_task(itask)

0 comments on commit 2ec0cb3

Please sign in to comment.