Skip to content

Commit

Permalink
Remove unused argument
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Sep 5, 2024
1 parent 61789a1 commit a6ce75d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 31 deletions.
8 changes: 5 additions & 3 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
NamedTuple,
Expand Down Expand Up @@ -437,6 +438,9 @@ class TaskEventsManager():

workflow_cfg: Dict[str, Any]
uuid_str: str
# To be set by the task pool:
spawn_func: Callable[['TaskProxy', str], Any]

mail_interval: float = 0
mail_smtp: Optional[str] = None
mail_footer: Optional[str] = None
Expand All @@ -459,8 +463,6 @@ def __init__(
self._event_timers: Dict[EventKey, Any] = {}
# NOTE: flag for DB use
self.event_timers_updated = True
# To be set by the task pool:
self.spawn_func = None
self.timestamp = timestamp
self.bad_hosts = bad_hosts

Expand Down Expand Up @@ -1966,7 +1968,7 @@ def reset_bad_hosts(self):
)
self.bad_hosts.clear()

def spawn_children(self, itask, output):
def spawn_children(self, itask: 'TaskProxy', output: str) -> None:
# update DB task outputs
self.workflow_db_mgr.put_update_task_outputs(itask)
# spawn child-tasks
Expand Down
12 changes: 3 additions & 9 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,9 +1329,7 @@ def check_abort_on_task_fails(self):
"""
return self.abort_task_failed

def spawn_on_output(
self, itask: TaskProxy, output: str, forced: bool = False
) -> None:
def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
"""Spawn child-tasks of given output, into the pool.
Remove the parent task from the pool if complete.
Expand All @@ -1352,7 +1350,6 @@ def spawn_on_output(
Args:
output: output to spawn on.
forced: True if called from manual set task command
"""
if (
Expand All @@ -1363,7 +1360,7 @@ def spawn_on_output(
self.abort_task_failed = True

children = []
if itask.flow_nums or forced:
if itask.flow_nums:
with suppress(KeyError):
children = itask.graph_children[output]

Expand Down Expand Up @@ -1394,10 +1391,7 @@ def spawn_on_output(
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
):
elif c_task is None and itask.flow_nums:
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
Expand Down
28 changes: 9 additions & 19 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,12 +1085,9 @@ async def test_no_flow_tasks_dont_spawn(
'R1': 'a => b => c'
}
},
'scheduler': {
'allow implicit tasks': 'true',
},
})

schd = scheduler(id_)
schd: Scheduler = scheduler(id_)
async with start(schd):
task_a = schd.pool.get_tasks()[0]

Expand All @@ -1099,37 +1096,30 @@ async def test_no_flow_tasks_dont_spawn(

# Set as completed: should not spawn children.
schd.pool.set_prereqs_and_outputs(
[task_a.identity], None, None, [FLOW_NONE])
[task_a.identity], [], [], [FLOW_NONE]
)
assert not schd.pool.get_tasks()

for flow_nums, force, pool in (
for flow_nums, expected_pool in (
# outputs yielded from a no-flow task should not spawn downstreams
(set(), False, []),
# forced spawning downstream of a no-flow task should spawn
# downstreams with flow_nums={}
(set(), True, [('1/b', set())]),
(set(), []),
# outputs yielded from a task with flow numbers should spawn
# downstreams in the same flow
({1}, False, [('1/b', {1})]),
# forced spawning should work in the same way
({1}, True, [('1/b', {1})]),
({1}, [('1/b', {1})]),
):
# set the flow-nums on 1/a
task_a.flow_nums = flow_nums

# spawn on the succeeded output
schd.pool.spawn_on_output(
task_a,
TASK_OUTPUT_SUCCEEDED,
forced=force,
)
schd.pool.spawn_on_output(task_a, TASK_OUTPUT_SUCCEEDED)

schd.pool.spawn_on_all_outputs(task_a)

# ensure the pool is as expected
assert [
(itask.identity, itask.flow_nums)
for itask in schd.pool.get_tasks()
] == pool
] == expected_pool


async def test_task_proxy_remove_from_queues(
Expand Down

0 comments on commit a6ce75d

Please sign in to comment.