Skip to content

Commit

Permalink
group-trigger-tweak.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 22, 2024
1 parent 244d191 commit da6b375
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,7 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
itask.is_manual_submit = True
itask.reset_try_timers()

LOG.info(f"[{itask}] - force trigger")
if itask.state_reset(TASK_STATUS_WAITING):
# (could also be unhandled failed)
self.data_store_mgr.delta_task_state(itask)
Expand Down Expand Up @@ -2238,6 +2239,7 @@ def _force_trigger_if_ready(self, itask, on_resume):
itask.is_manual_submit = True
itask.reset_try_timers()
self.data_store_mgr.delta_task_prerequisite(itask)
# TODO this is only called here now
self._force_trigger(itask, on_resume)

def force_trigger_tasks(
Expand Down Expand Up @@ -2274,7 +2276,7 @@ def force_trigger_tasks(
items, inactive=True, warn_no_active=False,
)
all_ids = (
list(inactive) +
[(tdef.name, point) for (tdef, point) in inactive] +
[(itask.tdef.name, itask.point) for itask in existing_tasks]
)

Expand All @@ -2291,16 +2293,21 @@ def force_trigger_tasks(
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.error(f"[{itask}] ignoring trigger - already active")
continue

for pre in itask.state.prerequisites:
# satisfy off-group prerequisites
for (
p_point, p_name, p_out
), p_state in pre._satisfied.items():
if (
not p_state and
(p_name, get_point(p_point)) not in all_ids
(p_name, p_point) not in all_ids
):
# off-group
LOG.info(
f"[{itask}] - force satisfying off-group"
f" prerequisite {p_point}/{p_name}:{p_out}"
)
itask.satisfy_me(
[
Tokens(
Expand All @@ -2319,6 +2326,7 @@ def force_trigger_tasks(
flow_nums = self._get_active_flow_nums()

for tdef, point in inactive:
jtask: Optional[TaskProxy] = None
if tdef.is_parentless(point):
# parentless: set pre=all to spawn into task pool
jtask = self._set_prereqs_tdef(
Expand All @@ -2330,11 +2338,18 @@ def force_trigger_tasks(
for pid in tdef.get_triggers(point):
p_point = pid.get_point(point)
p_name = pid.task_name
if (p_name, get_point(p_point)) not in all_ids:
off_flow_prereqs.append(f"{p_point}/{p_name}")
jtask = self._set_prereqs_tdef(
point, tdef, off_flow_prereqs, flow_nums, flow_wait
)
p_out = pid.output
if (p_name, p_point) not in all_ids:
off_flow_prereqs.append(f"{p_point}/{p_name}:{p_out}")
LOG.info(
f"[{point}/{tdef.name}] - force satisfying off-"
f"group prerequisite {p_point}/{p_name}:{p_out}"
)

if off_flow_prereqs:
jtask = self._set_prereqs_tdef(
point, tdef, off_flow_prereqs, flow_nums, flow_wait
)
if jtask is not None:
self._force_trigger_if_ready(jtask, on_resume)

Expand Down

0 comments on commit da6b375

Please sign in to comment.