From 2dddba84577e37b7fa5c310fbf59f96c29b8726f Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Thu, 29 Aug 2024 15:17:22 +1200 Subject: [PATCH 1/2] Fix duplicate job submissions. --- changes.d/6337.fix.md | 1 + cylc/flow/task_queues/__init__.py | 4 ++-- cylc/flow/task_queues/independent.py | 12 +++++------- 3 files changed, 8 insertions(+), 9 deletions(-) create mode 100644 changes.d/6337.fix.md diff --git a/changes.d/6337.fix.md b/changes.d/6337.fix.md new file mode 100644 index 00000000000..6a6fd72757f --- /dev/null +++ b/changes.d/6337.fix.md @@ -0,0 +1 @@ +Fix potential duplicate job submissions when manually triggering unqueued active tasks. diff --git a/cylc/flow/task_queues/__init__.py b/cylc/flow/task_queues/__init__.py index 32983789632..e19b68c25e7 100644 --- a/cylc/flow/task_queues/__init__.py +++ b/cylc/flow/task_queues/__init__.py @@ -53,8 +53,8 @@ def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]': pass @abstractmethod - def remove_task(self, itask: 'TaskProxy') -> None: - """Remove a task from the queueing system.""" + def remove_task(self, itask: 'TaskProxy') -> bool: + """Try to remove a task from the queues. Return True if done.""" pass @abstractmethod diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index 185edee4f2b..3bac42fe0f5 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -130,19 +130,17 @@ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']: self.force_released = set() return released - def remove_task(self, itask: 'TaskProxy') -> None: - """Remove a task from whichever queue it belongs to.""" - for queue in self.queues.values(): - if queue.remove(itask): - break + def remove_task(self, itask: 'TaskProxy') -> bool: + """Try to remove a task from the queues. Return True if done.""" + return any(queue.remove(itask) for queue in self.queues.values()) def force_release_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to. To be returned when release_tasks() is next called. """ - self.remove_task(itask) - self.force_released.add(itask) + if self.remove_task(itask): + self.force_released.add(itask) def adopt_tasks(self, orphans: List[str]) -> None: """Adopt orphaned tasks to the default group.""" From 85e280082d8da334689a5a4706d017f8403119b6 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Thu, 29 Aug 2024 19:07:05 +1200 Subject: [PATCH 2/2] Add integration test. --- tests/integration/test_task_pool.py | 43 ++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index beba9075bd3..9f349c7e78c 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2085,7 +2085,7 @@ async def test_set_future_flow(flow, scheduler, start, log_filter): # set b:succeeded in flow 2 and check downstream spawning schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2]) assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded' - assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded' + assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded' async def test_trigger_queue(one, run, db_select, complete): @@ -2109,3 +2109,44 @@ async def test_trigger_queue(one, run, db_select, complete): one.resume_workflow() await complete(one, timeout=2) assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)] + + +async def test_trigger_unqueued(flow, scheduler, start): + """Test triggering an unqueued active task. + + It should not add to the force_released list. + See https://github.com/cylc/cylc-flow/pull/6337 + + """ + conf = { + 'scheduler': {'allow implicit tasks': 'True'}, + 'scheduling': { + 'graph': { + 'R1': 'a & b => c' + } + } + } + schd = scheduler( + flow(conf), + run_mode='simulation', + paused_start=False + ) + + async with start(schd): + # Release tasks 1/a and 1/b + schd.pool.release_runahead_tasks() + schd.release_queued_tasks() + assert pool_get_task_ids(schd.pool) == ['1/a', '1/b'] + + # Mark 1/a as succeeded and spawn 1/c + task_a = schd.pool.get_task(IntegerPoint("1"), "a") + schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') + assert pool_get_task_ids(schd.pool) == ['1/b', '1/c'] + + # Trigger the partially satisified (and not queued) task 1/c + schd.pool.force_trigger_tasks(['1/c'], [FLOW_ALL]) + + # It should not add to the queue managers force_released list. + assert not schd.pool.task_queue_mgr.force_released, ( + "Triggering an unqueued task should not affect the force_released list" + )