Skip to content

Commit

Permalink
Implement "cylc set" command.
Browse files Browse the repository at this point in the history
A gazillion squashed commits (history got too messy).
  • Loading branch information
hjoliver committed Feb 27, 2024
1 parent d862c0e commit 2dfc50b
Show file tree
Hide file tree
Showing 169 changed files with 3,927 additions and 1,648 deletions.
1 change: 1 addition & 0 deletions changes.d/5658.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New "cylc set" command for setting task prerequisites and outputs.
5 changes: 3 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,9 @@ def update_workflow(self, reloaded=False):
w_delta.n_edge_distance = self.n_edge_distance
delta_set = True

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
if self.schd.pool.active_tasks:
pool_points = set(self.schd.pool.active_tasks)

oldest_point = str(min(pool_points))
if w_data.oldest_active_cycle_point != oldest_point:
w_delta.oldest_active_cycle_point = oldest_point
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
Expand Down
161 changes: 137 additions & 24 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,163 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError


if TYPE_CHECKING:
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


FlowNums = Set[int]
# Flow constants
FLOW_ALL = "all"
FLOW_NEW = "new"
FLOW_NONE = "none"

# For flow-related CLI options:
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def add_flow_opts(parser):
parser.add_option(
"--flow", action="append", dest="flow", metavar="FLOW",
help=f'Assign new tasks to all active flows ("{FLOW_ALL}");'
f' no flow ("{FLOW_NONE}"); a new flow ("{FLOW_NEW}");'
f' or a specific flow (e.g. "2"). The default is "{FLOW_ALL}".'
' Specific flow numbers can be new or existing.'
' Reuse the option to assign multiple flow numbers.'
)

parser.add_option(
"--meta", metavar="DESCRIPTION", action="store",
dest="flow_descr", default=None,
help=f"description of new flow (with --flow={FLOW_NEW})."
)

parser.add_option(
"--wait", action="store_true", default=False, dest="flow_wait",
help="Wait for merge with current active flows before flowing on."
)


def validate_flow_opts(options):
"""Check validity of flow-related CLI options."""
if options.flow is None:
# Default to all active flows
options.flow = [FLOW_ALL]

for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
If the set contains only the original flow 1, return an empty string
so that users can disregard flows unless they trigger new ones.
Otherwise return e.g. "(1,2,3)".
Examples:
>>> stringify_flow_nums({})
'(flows=none)'
>>> stringify_flow_nums({1})
''
>>> stringify_flow_nums({1}, True)
'(flows=1)'
>>> stringify_flow_nums({1,2,3})
'(flows=1,2,3)'
"""
if not full and flow_nums == {1}:
return ""
else:
return (
"(flows="
f"{','.join(str(i) for i in flow_nums) or 'none'}"
")"
)


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""

def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
def __init__(
self,
db_mgr: "WorkflowDatabaseManager",
utc: bool = True
) -> None:
"""Initialise the flow manager."""
self.db_mgr = db_mgr
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0
self._timezone = datetime.timezone.utc if utc else None

def get_new_flow(self, description: Optional[str] = None) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
description = description or "no description"
self.flows[self.counter] = {
"description": description,
"start_time": now_sec
}
LOG.info(
f"New flow: {self.counter} "
f"({description}) "
f"{now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
self.counter,
self.flows[self.counter]
)
return self.counter
def get_flow_num(
self,
flow_num: Optional[int] = None,
meta: Optional[str] = None
) -> int:
"""Return a valid flow number, and record a new flow if necessary.
If asked for a new flow:
- increment the automatic counter until we find an unused number
If given a flow number:
- record a new flow if the number is unused
- else return it, as an existing flow number.
The metadata string is only used if it is a new flow.
"""
if flow_num is None:
self.counter += 1
while self.counter in self.flows:
# Skip manually-created out-of-sequence flows.
self.counter += 1
flow_num = self.counter

if flow_num in self.flows:
if meta is not None:
LOG.warning(
f'Ignoring flow metadata "{meta}":'
f' {flow_num} is not a new flow'
)
else:
# Record a new flow.
now_sec = datetime.datetime.now(tz=self._timezone).isoformat(
timespec="seconds"
)
meta = meta or "no description"
self.flows[flow_num] = {
"description": meta,
"start_time": now_sec
}
LOG.info(
f"New flow: {flow_num} ({meta}) {now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
flow_num,
self.flows[flow_num]
)
return flow_num

def load_from_db(self, flow_nums: FlowNums) -> None:
"""Load flow data for scheduler restart.
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand All @@ -41,6 +42,8 @@
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_EXPIRE_ALL,
QUAL_FAM_EXPIRE_ANY,
QUAL_FAM_SUCCEED_ALL,
QUAL_FAM_SUCCEED_ANY,
QUAL_FAM_FAIL_ALL,
Expand Down Expand Up @@ -124,6 +127,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -140,6 +145,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -738,6 +745,10 @@ def _set_output_opt(
if suicide:
return

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
87 changes: 41 additions & 46 deletions cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@


def filter_ids(
pools: 'List[Pool]',
pool: 'Pool',
ids: 'Iterable[str]',
*,
warn: 'bool' = True,
Expand Down Expand Up @@ -145,28 +145,25 @@ def filter_ids(
if tokens.get(lowest_token.value):
break

# This needs to be a set to avoid getting two copies of matched tasks
# in cycle points that appear in both pools:
cycles = set()
tasks = []

# filter by cycle
if lowest_token == IDTokens.Cycle:
cycle = tokens[IDTokens.Cycle.value]
cycle_sel = tokens.get(IDTokens.Cycle.value + '_sel') or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
break
break

# filter by task
elif lowest_token == IDTokens.Task: # noqa SIM106
Expand All @@ -176,36 +173,35 @@ def filter_ids(
task = tokens[IDTokens.Task.value]
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
task_sel = task_sel_raw or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
(
(
(
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
or match(itask.state.status, cycle_sel)
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
or match(itask.state.status, task_sel)
or match(itask.state.status, cycle_sel)
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
):
tasks.append(itask)
or match(itask.state.status, task_sel)
)
):
tasks.append(itask)

else:
raise NotImplementedError
Expand All @@ -226,10 +222,9 @@ def filter_ids(
})
ret = _cycles
elif out == IDTokens.Task:
for pool in pools:
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
ret = _tasks
return ret, _not_matched

Expand Down
Loading

0 comments on commit 2dfc50b

Please sign in to comment.