Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit a3da49837f2b60e768784c416c511fe1d1b6d720
Merge: e793ca1 c07392d
Author: Hilary James Oliver <[email protected]>
Date:   Tue Jan 16 15:32:58 2024 +1300

    Merge branch 'master' into cylc-set-task

commit e793ca1
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 19 17:49:33 2023 +1300

    working on flow_wait reset ...[skip ci]

commit f6a4f33
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 19 15:46:11 2023 +1300

    Handle flow-number skipping.

commit 2f8536a
Merge: 3fddad0 ca8637e
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 19 12:12:41 2023 +1300

    Merge pull request #35 from oliver-sanders/cylc-set-stuff

    Cylc set stuff

commit ca8637e
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 19 12:12:19 2023 +1300

    Apply suggestions from code review [skip ci]

commit 3fddad0
Author: Hilary James Oliver <[email protected]>
Date:   Sun Dec 17 13:05:48 2023 +1300

    Fix wrong-outputs warning.

commit 7c75fbc
Author: Oliver Sanders <[email protected]>
Date:   Fri Dec 15 10:28:35 2023 +0000

    tui: support the "cylc set" command with default opts

commit cd4a3e3
Author: Oliver Sanders <[email protected]>
Date:   Fri Dec 15 10:18:49 2023 +0000

    set: add note about CLI completion

commit 06fbdea
Author: Hilary James Oliver <[email protected]>
Date:   Fri Dec 15 16:24:39 2023 +1300

    Fix C7 back-compat task removal.

commit ad6e5d7
Merge: 9d320a5 6f62691
Author: Hilary James Oliver <[email protected]>
Date:   Fri Dec 15 15:32:00 2023 +1300

    Merge branch 'master' into cylc-set-task

commit 9d320a5
Author: Hilary James Oliver <[email protected]>
Date:   Fri Dec 15 13:43:05 2023 +1300

    Final functionality tweaks.

commit 8f5c0aa
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 21:33:08 2023 +1300

    Fix select prev instance from task_states table.

commit 8c34c75
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 17:50:41 2023 +1300

    Fix functional tests.

commit e65f2c6
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 16:37:24 2023 +1300

    Logging tweaks.

commit 1148487
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 16:15:10 2023 +1300

    Fix an integration test.

commit 50a5b49
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 15:55:34 2023 +1300

    Transient task proxy comment.

commit 4dc5a55
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 10:40:46 2023 +1300

    Tweak satisfy_me methods.

commit cceb740
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 14 10:10:17 2023 +1300

    Remove duplicate block.

commit 6503ac5
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 13 16:29:24 2023 +1300

    Demote warning to debug.

commit 70fd970
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 12 22:25:10 2023 +1300

    Code review tweaks 2.

commit 1e93707
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 12 16:03:40 2023 +1300

    Update cylc/flow/task_pool.py [skip ci]

    Co-authored-by: Oliver Sanders <[email protected]>

commit 9c8b64a
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 12 15:29:33 2023 +1300

    Update cylc/flow/task_proxy.py [skip ci]

    Co-authored-by: Oliver Sanders <[email protected]>

commit f592516
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 12 13:23:00 2023 +1300

    code review tweaks [skip ci]

commit d2839b4
Author: Hilary James Oliver <[email protected]>
Date:   Fri Dec 8 14:32:48 2023 +1300

    Update tui test.

commit 08938bd
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 7 18:24:50 2023 +1300

    Tweak wording of set --help. [skip ci]

commit 6e0f050
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 7 16:30:12 2023 +1300

    Don't spawn parentless if removing after flow-stop.

commit 452c05d
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 7 14:30:36 2023 +1300

    New func test.

commit b22b817
Author: Hilary James Oliver <[email protected]>
Date:   Thu Dec 7 14:29:59 2023 +1300

    Handle removal of parentless runahead tasks.

commit d9be15c
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 6 19:41:43 2023 +1300

    Remove useless func test.

commit 69f2923
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 6 18:51:04 2023 +1300

    Record task completion in DB, for spawning.

commit a66b50e
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 6 18:09:28 2023 +1300

    Separate flows_nums stringify function.

commit 759faf9
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 6 13:04:36 2023 +1300

    Fix some func tests.

commit 365a9c7
Author: Hilary James Oliver <[email protected]>
Date:   Wed Dec 6 12:17:41 2023 +1300

    expired tasks: dequeue and don't log as incomplete

commit acefbba
Author: Hilary James Oliver <[email protected]>
Date:   Tue Dec 5 17:21:58 2023 +1300

    Better handling of implied outputs.

commit 9fa07f9
Author: Hilary James Oliver <[email protected]>
Date:   Mon Dec 4 15:41:07 2023 +1300

    working on tui test...

commit 9702fd8
Author: Hilary James Oliver <[email protected]>
Date:   Mon Dec 4 14:42:28 2023 +1300

    Fix a tui test.

commit a79de36
Author: Hilary James Oliver <[email protected]>
Date:   Mon Dec 4 14:06:53 2023 +1300

    Simplify and tidy.

commit a7ee355
Author: Hilary James Oliver <[email protected]>
Date:   Mon Dec 4 10:42:28 2023 +1300

    mypy fix

commit cf197a1
Author: Hilary James Oliver <[email protected]>
Date:   Fri Dec 1 15:23:58 2023 +1300

    Add cylc-set func tests TEMP

commit d0e3c78
Author: Hilary James Oliver <[email protected]>
Date:   Thu Nov 23 13:29:19 2023 +1300

    cylc-set: glob in pool for now.

commit 78d5599
Author: Hilary James Oliver <[email protected]>
Date:   Sun Nov 19 11:40:59 2023 +1300

    Revert CI change.

commit ec954c2
Author: Hilary James Oliver <[email protected]>
Date:   Thu Nov 16 20:22:10 2023 +1300

    Fix command logging and test.

commit c61476f
Author: Hilary James Oliver <[email protected]>
Date:   Thu Nov 16 18:11:36 2023 +1300

    set command: dead end, not alias.

commit 6c308d8
Author: Hilary James Oliver <[email protected]>
Date:   Wed Nov 15 15:32:05 2023 +1300

    cylc set --pre: infer succeeded

commit 44d84df
Author: Hilary James Oliver <[email protected]>
Date:   Wed Nov 15 15:08:45 2023 +1300

    Clean up command logging.

commit cf8a26a
Author: Oliver Sanders <[email protected]>
Date:   Tue Nov 14 14:48:41 2023 +0000

    completion_server: support "cylc set" arguments

    * Support the `--pre` and `--out` arguments to `cylc set`.
    * This requires the task ID(s) to be provided *before* the `--pre` /
      `--out` option because otherwise we don't have the required
      information to complete the arguments.
    * This lists prereqs/outputs from `cylc show` which is currently
      restricted to n=1 tasks.
    * This does not support completing comma separared prereqs/outputs, use
      the `--pre` / `--out` options multiple times to do this.

commit a01a56b
Author: Hilary James Oliver <[email protected]>
Date:   Tue Nov 14 20:23:17 2023 +1300

    Fix simulation job stuff.

commit e81caf4
Author: Hilary James Oliver <[email protected]>
Date:   Tue Nov 14 18:34:51 2023 +1300

    Fix tests.

commit a7cbdd1
Author: Hilary James Oliver <[email protected]>
Date:   Tue Nov 14 16:14:48 2023 +1300

    Better command logging.

commit da2ced9
Author: Hilary James Oliver <[email protected]>
Date:   Tue Nov 14 15:30:53 2023 +1300

    Don't log state changes for transient tasks.

commit 34fc997
Author: Hilary James Oliver <[email protected]>
Date:   Tue Nov 14 10:35:34 2023 +1300

    Some fixes; and remove cylc-set functional tests.

commit 8acb090
Author: Hilary James Oliver <[email protected]>
Date:   Sun Nov 12 16:41:29 2023 +1300

    Add integration tests.

commit eac5276
Author: Hilary James Oliver <[email protected]>
Date:   Sun Nov 12 09:42:48 2023 +1300

    Clean up future trigger comments.

commit ba759c9
Author: Hilary James Oliver <[email protected]>
Date:   Sat Nov 11 23:04:24 2023 +1300

    tiny docstring tweak

commit c100520
Author: Hilary James Oliver <[email protected]>
Date:   Sat Nov 11 15:12:56 2023 +1300

    cylc-set doctests

commit 93152f2
Author: Hilary James Oliver <[email protected]>
Date:   Sat Nov 11 15:01:33 2023 +1300

    Handle missed started events.

commit 799f6ff
Author: Hilary James Oliver <[email protected]>
Date:   Sat Nov 11 13:57:13 2023 +1300

    Remove duplicate blocks, post merge from master.

commit 8cea473
Author: Hilary James Oliver <[email protected]>
Date:   Fri Nov 10 10:36:17 2023 +1300

    Squash "cylc set" dev branch. (History too messy!)
  • Loading branch information
hjoliver committed Jan 16, 2024
1 parent c07392d commit 1b0940e
Show file tree
Hide file tree
Showing 164 changed files with 3,330 additions and 1,481 deletions.
1 change: 1 addition & 0 deletions changes.d/5658.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New "cylc set" command for setting task prerequisites and outputs.
5 changes: 3 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,9 @@ def update_workflow(self, reloaded=False):
w_delta.n_edge_distance = self.n_edge_distance
delta_set = True

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
if self.schd.pool.active_tasks:
pool_points = set(self.schd.pool.active_tasks)

oldest_point = str(min(pool_points))
if w_data.oldest_active_cycle_point != oldest_point:
w_delta.oldest_active_cycle_point = oldest_point
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
Expand Down
153 changes: 130 additions & 23 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,99 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError


if TYPE_CHECKING:
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


FlowNums = Set[int]
# Flow constants
FLOW_ALL = "all"
FLOW_NEW = "new"
FLOW_NONE = "none"

# For flow-related CLI options:
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def add_flow_opts(parser):
parser.add_option(
"--flow", action="append", dest="flow", metavar="FLOW",
help=f'Assign new tasks to all active flows ("{FLOW_ALL}");'
f' no flow ("{FLOW_NONE}"); a new flow ("{FLOW_NEW}");'
f' or a specific flow (e.g. "2"). The default is "{FLOW_ALL}".'
' Specific flow numbers can be new or existing.'
' Reuse the option to assign multiple flow numbers.'
)

parser.add_option(
"--meta", metavar="DESCRIPTION", action="store",
dest="flow_descr", default=None,
help=f"description of new flow (with --flow={FLOW_NEW})."
)

parser.add_option(
"--wait", action="store_true", default=False, dest="flow_wait",
help="Wait for merge with current active flows before flowing on."
)


def validate_flow_opts(options):
"""Check validity of flow-related CLI options."""
if options.flow is None:
# Default to all active flows
options.flow = [FLOW_ALL]

for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
If the set contains only the original flow 1, return an empty string
so that users can disregard flows unless they trigger new ones.
Otherwise return e.g. "(1,2,3)".
Examples:
>>> stringify_flow_nums({})
'(none)'
>>> stringify_flow_nums({1})
''
>>> stringify_flow_nums({1}, True)
'(1)'
>>> stringify_flow_nums({1,2,3})
'(1,2,3)'
"""
if not full and flow_nums == {1}:
return ""
return (
"("
f"{','.join(str(i) for i in flow_nums) or 'none'}"
")"
)


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""
Expand All @@ -42,28 +123,54 @@ def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0

def get_new_flow(self, description: Optional[str] = None) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
description = description or "no description"
self.flows[self.counter] = {
"description": description,
"start_time": now_sec
}
LOG.info(
f"New flow: {self.counter} "
f"({description}) "
f"{now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
self.counter,
self.flows[self.counter]
)
return self.counter
def get_flow_num(
self,
flow_num: Optional[int] = None,
meta: Optional[str] = None
) -> int:
"""Return a valid flow number, and record a new flow if necessary.
If asked for a new flow:
- increment the automatic counter until we find an unused number
If given a flow number:
- record a new flow if the number is unused
- else return it, as an existing flow number.
The metadata string is only used if it is a new flow.
"""
if flow_num is None:
self.counter += 1
while self.counter in self.flows:
# Skip manually-created out-of-sequence flows.
self.counter += 1
flow_num = self.counter

if flow_num in self.flows:
if meta is not None:
LOG.warning(
f'Ignoring flow metadata "{meta}":'
f' {flow_num} is not a new flow'
)
else:
# Record a new flow.
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
meta = meta or "no description"
self.flows[flow_num] = {
"description": meta,
"start_time": now_sec
}
LOG.info(
f"New flow: {flow_num} ({meta}) {now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
flow_num,
self.flows[flow_num]
)
return flow_num

def load_from_db(self, flow_nums: FlowNums) -> None:
"""Load flow data for scheduler restart.
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand All @@ -41,6 +42,8 @@
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_EXPIRE_ALL,
QUAL_FAM_EXPIRE_ANY,
QUAL_FAM_SUCCEED_ALL,
QUAL_FAM_SUCCEED_ANY,
QUAL_FAM_FAIL_ALL,
Expand Down Expand Up @@ -124,6 +127,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -140,6 +145,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -738,6 +745,10 @@ def _set_output_opt(
if suicide:
return

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)

from cylc.flow import LOG
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED


class IDTokens(Enum):
Expand Down Expand Up @@ -388,6 +389,14 @@ def update(self, other):
"""
return self.update_tokens(**other)

def to_prereq_tuple(self) -> Tuple[str, str, str]:
"""Return (cycle, task, selector) as used for task prerequisites."""
return (
self['cycle'],
self['task'],
self['task_sel'] or TASK_OUTPUT_SUCCEEDED
)

def duplicate(
self,
tokens: 'Optional[Tokens]' = None,
Expand Down
87 changes: 41 additions & 46 deletions cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@


def filter_ids(
pools: 'List[Pool]',
pool: 'Pool',
ids: 'Iterable[str]',
*,
warn: 'bool' = True,
Expand Down Expand Up @@ -145,28 +145,25 @@ def filter_ids(
if tokens.get(lowest_token.value):
break

# This needs to be a set to avoid getting two copies of matched tasks
# in cycle points that appear in both pools:
cycles = set()
tasks = []

# filter by cycle
if lowest_token == IDTokens.Cycle:
cycle = tokens[IDTokens.Cycle.value]
cycle_sel = tokens.get(IDTokens.Cycle.value + '_sel') or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
break
break

# filter by task
elif lowest_token == IDTokens.Task: # noqa SIM106
Expand All @@ -176,36 +173,35 @@ def filter_ids(
task = tokens[IDTokens.Task.value]
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
task_sel = task_sel_raw or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
(
(
(
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
or match(itask.state.status, cycle_sel)
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
or match(itask.state.status, task_sel)
or match(itask.state.status, cycle_sel)
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
):
tasks.append(itask)
or match(itask.state.status, task_sel)
)
):
tasks.append(itask)

else:
raise NotImplementedError
Expand All @@ -226,10 +222,9 @@ def filter_ids(
})
ret = _cycles
elif out == IDTokens.Task:
for pool in pools:
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
ret = _tasks
return ret, _not_matched

Expand Down
Loading

0 comments on commit 1b0940e

Please sign in to comment.