Skip to content

Commit

Permalink
Trigger and set: fix default flow assignment for n=0 tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 11, 2024
1 parent ddddfcd commit e7c8c1a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 50 deletions.
6 changes: 5 additions & 1 deletion cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
def flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.
Note the schema defaults flows to ["all"].
Note the schema defaults flows to [].
Examples:
Good:
>>> flow_opts([], False)
>>> flow_opts(["new"], False)
>>> flow_opts(["1", "2"], False)
>>> flow_opts(["1", "2"], True)
Expand All @@ -61,6 +62,9 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None:
cylc.flow.exceptions.InputError: ...
"""
if not flows:
return

for val in flows:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,15 +1998,15 @@ class Arguments:
class FlowMutationArguments:
flow = graphene.List(
graphene.NonNull(Flow),
default_value=[FLOW_ALL],
default_value=[],
description=sstrip(f'''
The flow(s) to trigger these tasks in.
This should be a list of flow numbers OR a single-item list
containing one of the following three strings:
* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* (nothing) - Triggered tasks keep flow, or {FLOW_ALL} (default)
* {FLOW_ALL} - Triggered tasks belong to all active flows.
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
''')
Expand Down
22 changes: 15 additions & 7 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@

"""cylc trigger [OPTIONS] ARGS
Force tasks to run despite unsatisfied prerequisites.
Force tasks to run regardless of prerequisites.
* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
Incomplete and active-waiting tasks in the n=0 window already belong to a flow.
Triggering them queues them to run (or rerun) in the same flow.
Future tasks (n>0) do not already belong to a flow.
* by default, they are assigned to all active flows
* otherwise they are assigned to flows according to the --flow option
Beyond n=0, triggered tasks get all current active flow numbers by default, or
specified flow numbers via the --flow option. Those flows - if/when they catch
up - will see tasks that ran after triggering event as having run already.
Active tasks (n=0) already belong to a flow.
* by default, they run in the same flow
* with --flow=all, they are assigned to all active flows
* with --flow=INT, the new flow is merged with with the existing one
* --flow=none is ignored, to avoid erasing the existing flow
Note that --flow=new increments the global flow number counter so it
should only be used if a new flow can be triggered with a single command.
Once you have triggered a new flow, get the new flow number (e.g. from the
scheduler log) and use that in subsequent commands to target that flow.
Examples:
# trigger task foo in cycle 1234 in test
Expand Down Expand Up @@ -115,7 +123,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', *ids: str):
"""CLI for "cylc trigger"."""
command_validation.flow_opts(options.flow or ['all'], options.flow_wait)
command_validation.flow_opts(options.flow, options.flow_wait)
rets = call_multi(
partial(run, options),
*ids,
Expand Down
78 changes: 39 additions & 39 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,26 +1908,24 @@ def set_prereqs_and_outputs(
flow_descr: description of new flow
"""
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
# Illegal flow command opts
return

# Get matching pool tasks and future task definitions.
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

# Set existing task proxies.
flow_nums = self._get_flow_nums(flow, flow_descr, active=True)
for itask in itasks:
# Existing task proxies.
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
else:
self._set_outputs_itask(itask, outputs)

# Spawn and set future tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=False)
for name, point in future_tasks:
tdef = self.config.get_taskdef(name)
if prereqs:
Expand Down Expand Up @@ -2066,34 +2064,38 @@ def _get_flow_nums(
self,
flow: List[str],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return None
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
elif flow[0] == FLOW_NONE:
active: bool = False
) -> Set[int]:
"""Return flow numbers corresponding to user command options.
Must be called separately for active (n=0) and future tasks.
Future tasks: assign the result to the new task.
Active tasks: merge the result with its existing flow numbers
Option validity is pre-checked during command validation.
"""
if flow == []:
if active:
# active tasks: stick with the existing flow
flow_nums = set()
else:
# future tasks: assign to all active flows
flow_nums = self._get_active_flow_nums()
elif flow == [FLOW_NONE]:
flow_nums = set()
elif flow == [FLOW_ALL]:
flow_nums = self._get_active_flow_nums()
elif flow == [FLOW_NEW]:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
else:
try:
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
for n in flow
}
except ValueError:
LOG.warning(
f"Ignoring command: illegal flow values {flow}"
# specific flow numbers
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
return None
for n in flow
}
return flow_nums

def _force_trigger(self, itask):
Expand Down Expand Up @@ -2156,18 +2158,17 @@ def force_trigger_tasks(
- just spawn (if not already spawned in this flow)
unless flow-wait is set.
Existing tasks already belong to a flow.
- "all" (default) or "none": stick with the existing flow
- "new" or specific: merge existing with new or specific flow
"""
# Get flow numbers for the tasks to be triggered.
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
return

# Get matching tasks proxies, and matching future task IDs.
existing_tasks, future_ids, unmatched = self.filter_task_proxies(
items, future=True, warn=False,
)

# Trigger existing tasks.
# Trigger active tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=True)
for itask in existing_tasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
Expand All @@ -2176,11 +2177,10 @@ def force_trigger_tasks(
self._force_trigger(itask)

# Spawn and trigger future tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=False)
for name, point in future_ids:

if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)
Expand Down

0 comments on commit e7c8c1a

Please sign in to comment.