Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger and set: fix default flow assignment for n=0 tasks. #6367

Merged
merged 17 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6367.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default.
6 changes: 5 additions & 1 deletion cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
def flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.

Note the schema defaults flows to ["all"].
Note the schema defaults flows to [].

Examples:
Good:
>>> flow_opts([], False)
>>> flow_opts(["new"], False)
>>> flow_opts(["1", "2"], False)
>>> flow_opts(["1", "2"], True)
Expand All @@ -61,6 +62,9 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None:
cylc.flow.exceptions.InputError: ...

"""
if not flows:
return

for val in flows:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
Expand Down
23 changes: 13 additions & 10 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,17 +1998,20 @@ class Arguments:
class FlowMutationArguments:
flow = graphene.List(
graphene.NonNull(Flow),
default_value=[FLOW_ALL],
default_value=[],
description=sstrip(f'''
The flow(s) to trigger these tasks in.

This should be a list of flow numbers OR a single-item list
containing one of the following three strings:

* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
The flow(s) to trigger/set these tasks in.

By default:
* active tasks (n=0) keep their existing flow assignment
* future tasks (n>0) get assigned all active flows

Otherwise you can assign (future tasks) or add to (active tasks):
* a list of integer flow numbers
or a single-item list containing one of the following strings:
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
* {FLOW_ALL} - all active flows
* {FLOW_NEW} - an automatically generated new flow number
* {FLOW_NONE} - (ignored for active tasks): no flow
''')
)
flow_wait = Boolean(
Expand Down
21 changes: 15 additions & 6 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

"""cylc trigger [OPTIONS] ARGS

Force tasks to run despite unsatisfied prerequisites.
Force tasks to run regardless of prerequisites.

* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).

Incomplete and active-waiting tasks in the n=0 window already belong to a flow.
Triggering them queues them to run (or rerun) in the same flow.
Future tasks (n>0) do not already belong to a flow.
* by default they are assigned to all active flows
* otherwise, according to the --flow option
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

Beyond n=0, triggered tasks get all current active flow numbers by default, or
specified flow numbers via the --flow option. Those flows - if/when they catch
up - will see tasks that ran after triggering event as having run already.
Active tasks (n=0) already belong to a flow.
* by default they run in the same flow
* with --flow=all, they are assigned to all active flows
* with --flow=INT or --flow=new, the new flow merges with the old one
* --flow=none is ignored

Note --flow=new increments the global flow counter so if you need multiple
commands to start a single new flow only use --flow=new in the first command,
then use the actual new flow number (e.g. read it from the scheduler log).
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

Examples:
# trigger task foo in cycle 1234 in test
Expand All @@ -53,6 +60,7 @@
)
from cylc.flow.terminal import cli_function
from cylc.flow.flow_mgr import add_flow_opts
from cylc.flow.command_validation import flow_opts


if TYPE_CHECKING:
Expand Down Expand Up @@ -114,6 +122,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', *ids: str):
"""CLI for "cylc trigger"."""
flow_opts(options.flow, options.flow_wait)
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
rets = call_multi(
partial(run, options),
*ids,
Expand Down
102 changes: 63 additions & 39 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,26 +1908,33 @@ def set_prereqs_and_outputs(
flow_descr: description of new flow

"""
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
# Illegal flow command opts
return

hjoliver marked this conversation as resolved.
Show resolved Hide resolved
# Get matching pool tasks and future task definitions.
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

if flow == [FLOW_NEW]:
# Translate --flow=new to an actual flow number now to avoid
# incrementing it twice below.
flow = [
str(
self.flow_mgr.get_flow_num(meta=flow_descr)
)
]

# Set existing task proxies.
flow_nums = self._get_flow_nums(flow, flow_descr, active=True)
for itask in itasks:
# Existing task proxies.
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
else:
self._set_outputs_itask(itask, outputs)

# Spawn and set future tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=False)
for name, point in future_tasks:
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
tdef = self.config.get_taskdef(name)
if prereqs:
Expand Down Expand Up @@ -2066,34 +2073,47 @@ def _get_flow_nums(
self,
flow: List[str],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return None
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
elif flow[0] == FLOW_NONE:
active: bool = False
) -> Set[int]:
"""Return flow numbers corresponding to user command options.

Arg should have been validated already during command validation.

Call this method separately for active (n=0) and future tasks.
- future tasks: assign the result to the new task
- active tasks: merge the result with existing flow numbers

Note if a single command results in two calls to this method (for
active and future tasks), translate --flow=new to an actual flow
number first, to avoid incrementing the flow counter twice.
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

The result is different in the default case (no --flow option):
- future tasks: return all active flows
- active tasks: stick with the existing flows (so return empty set).

"""
if not flow:
# default (i.e. no --flow option was used)
if active:
# active tasks: stick with the existing flow
flow_nums = set()
else:
# future tasks: assign to all active flows
flow_nums = self._get_active_flow_nums()
elif flow == [FLOW_NONE]:
flow_nums = set()
elif flow == [FLOW_ALL]:
flow_nums = self._get_active_flow_nums()
elif flow == [FLOW_NEW]:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
else:
try:
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
for n in flow
}
except ValueError:
LOG.warning(
f"Ignoring command: illegal flow values {flow}"
# specific flow numbers
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
return None
for n in flow
}
return flow_nums

def _force_trigger(self, itask):
Expand Down Expand Up @@ -2157,17 +2177,22 @@ def force_trigger_tasks(
unless flow-wait is set.

"""
# Get flow numbers for the tasks to be triggered.
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
return

# Get matching tasks proxies, and matching future task IDs.
existing_tasks, future_ids, unmatched = self.filter_task_proxies(
items, future=True, warn=False,
)

# Trigger existing tasks.
if flow == [FLOW_NEW]:
# Translate --flow=new to an actual flow number now to avoid
# incrementing it twice below.
flow = [
str(
self.flow_mgr.get_flow_num(meta=flow_descr)
)
]

# Trigger active tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=True)
for itask in existing_tasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
Expand All @@ -2176,11 +2201,10 @@ def force_trigger_tasks(
self._force_trigger(itask)

# Spawn and trigger future tasks.
flow_nums = self._get_flow_nums(flow, flow_descr, active=False)
for name, point in future_ids:

if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)
Expand Down
15 changes: 11 additions & 4 deletions tests/functional/triggering/08-fam-finish-any/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
[[graph]]
R1 = """FAM:finish-any => foo"""
[runtime]
[[root]]
script = true
[[FAM]]
script = sleep 10
[[a,c]]
[[a]]
inherit = FAM
script = """
cylc__job__poll_grep_workflow_log -E "1/b.*succeeded"
"""
[[b]]
inherit = FAM
script = true
[[c]]
inherit = FAM
script = """
cylc__job__poll_grep_workflow_log -E "1/b.*succeeded"
"""
[[foo]]
script = true
2 changes: 1 addition & 1 deletion tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_delta_task_prerequisite(harness):
[t.identity for t in schd.pool.get_tasks()],
[(TASK_STATUS_SUCCEEDED,)],
[],
"all"
["all"]
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
)
assert all({
p.satisfied
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import logging

from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE
from cylc.flow.command_validation import flow_opts
from cylc.flow.exceptions import InputError

import pytest
import time
Expand All @@ -34,11 +36,11 @@
)
)
async def test_trigger_invalid(mod_one, start, log_filter, flow_strs):
"""Ensure invalid flow values are rejected."""
"""Ensure invalid flow values are rejected during command validation."""
async with start(mod_one) as log:
log.clear()
assert mod_one.pool.force_trigger_tasks(['*'], flow_strs) is None
assert len(log_filter(log, level=logging.WARN)) == 1
with pytest.raises(InputError):
flow_opts(flow_strs, False)


async def test_trigger_no_flows(one, start, log_filter):
Expand Down