Skip to content

Commit

Permalink
that's got it... [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed May 20, 2024
1 parent 4cf9b73 commit 7772bbc
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 150 deletions.
106 changes: 93 additions & 13 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1729,13 +1729,75 @@ def get_script_common_text(this: str, example: Optional[str] = None):
{REPLACES}``[runtime][task][events]mail to``
''')

with Conf('suite state polling', desc=f'''
Deprecated: this supports Cylc 7 automatic suite state polling
tasks. However the "user" and "host" items are not supported.
Please upgrade to use the workflow_state xtrigger instead.
Configure automatic workflow polling tasks as described in
:ref:`WorkflowStatePolling`.
The config items here reflect the options of the
``cylc workflow-state`` command, except that the workflow ID
and the ``--task``, ``--cycle``, and ``--status`` options are
taken from the graph notation.
.. deprecated:: 8.0.0
.. seealso::
workflow state polling
the workflow_state xtrigger
'''):
Conf('user', VDR.V_STRING, desc='''
Username of your account on the workflow host.
The polling
``cylc workflow-state`` command will be
run on the remote account.
''')
Conf('host', VDR.V_STRING, desc='''
The hostname of the target workflow.
The polling
``cylc workflow-state`` command will be run there.
''')
Conf('interval', VDR.V_INTERVAL, desc='''
Polling interval.
''')
Conf('max-polls', VDR.V_INTEGER, desc='''
The maximum number of polls before timing out and entering
the "failed" state.
''')
Conf('message', VDR.V_STRING, desc='''
Wait for the task in the target workflow to receive a
specified message rather than achieve a state.
''')
Conf('run-dir', VDR.V_STRING, desc='''
Specify the location of the top level cylc-run directory
for the other workflow.
For your own workflows, there is no need to set this as it
is always ``~/cylc-run/``. But for other workflows,
(e.g those owned by others), or mirrored workflow databases
use this item to specify the location of the top level
cylc run directory (the database should be in a the same
place relative to this location for each workflow).
''')
Conf('verbose mode', VDR.V_BOOLEAN, desc='''
Run the polling ``cylc workflow-state`` command in verbose
output mode.
''')
with Conf('workflow state polling', desc=f'''
Deprecated: please use the workflow_state xtrigger instead.
Configure automatic workflow polling tasks as described in
:ref:`WorkflowStatePolling`.
The config items in this section reflect the options of the
``cylc workflow-state`` command, but with the target workflow ID
and status or output taken from the graph syntax.
``cylc workflow-state`` command, but the target workflow ID
and task status or output are taken from the graph syntax.
.. versionchanged:: 8.0.0
Expand All @@ -1753,6 +1815,9 @@ def get_script_common_text(this: str, example: Optional[str] = None):
Conf('output', VDR.V_STRING, desc='''
Target task output.
''')
Conf('message', VDR.V_STRING, desc='''
Target task message.
''')
Conf('alt-cylc-run-dir', VDR.V_STRING, desc='''
The cylc-run directory location of the target workflow.
Expand Down Expand Up @@ -1936,9 +2001,6 @@ def upg(cfg, descr):
"""
u = upgrader(cfg, descr)
u.obsolete(
'7.8.0',
['runtime', '__MANY__', 'suite state polling', 'template'])
u.obsolete('7.8.1', ['cylc', 'events', 'reset timer'])
u.obsolete('7.8.1', ['cylc', 'events', 'reset inactivity timer'])
u.obsolete('8.0.0', ['cylc', 'force run mode'])
Expand Down Expand Up @@ -1974,6 +2036,12 @@ def upg(cfg, descr):
['cylc', 'mail', 'task event batch interval'],
silent=cylc.flow.flags.cylc7_back_compat,
)
u.obsolete(
'8.3.0', ['runtime', '__MANY__', 'suite state polling', 'host'],
)
u.obsolete(
'8.3.0', ['runtime', '__MANY__', 'suite state polling', 'owner'],
)
u.deprecate(
'8.0.0',
['cylc', 'parameters'],
Expand Down Expand Up @@ -2041,14 +2109,6 @@ def upg(cfg, descr):
silent=cylc.flow.flags.cylc7_back_compat,
)

u.deprecate(
'8.0.0',
['runtime', '__MANY__', 'suite state polling'],
['runtime', '__MANY__', 'workflow state polling'],
silent=cylc.flow.flags.cylc7_back_compat,
is_section=True
)

for job_setting in [
'execution polling intervals',
'execution retry delays',
Expand Down Expand Up @@ -2124,6 +2184,8 @@ def upg(cfg, descr):
)
u.upgrade()

upgrade_suite_state_polling(cfg)

upgrade_graph_section(cfg, descr)
upgrade_param_env_templates(cfg, descr)

Expand All @@ -2133,6 +2195,24 @@ def upg(cfg, descr):
return u


def upgrade_suite_state_polling(cfg: Dict[str, Any]) -> None:
""" ... """
for ns, ns_cfg in cfg['runtime'].items():
if 'suite suite polling' not in ns_cfg:
continue
if 'workflow state polling' in ns_cfg:
raise UpgradeError("xxxxxx")

cfg['runtime'][ns]['workflow state polling'] = {}

if 'owner' in ns_cfg:
raise UpgradeError("Blargh")
if 'host' in ns_cfg:
raise UpgradeError("Flargh")

cfg['runtime'][ns]['workflow state polling'] = ns_cfg


def upgrade_graph_section(cfg: Dict[str, Any], descr: str) -> None:
"""Upgrade Cylc 7 `[scheduling][dependencies][X]graph` format to
`[scheduling][graph]X`."""
Expand Down
123 changes: 32 additions & 91 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,14 @@
from cylc.flow.pathutil import expand_path
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
)
from cylc.flow.task_state import (
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUSES_ORDERED
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED
)
from cylc.flow.util import deserialise_set
from metomi.isodatetime.parsers import TimePointParser
from metomi.isodatetime.exceptions import ISO8601SyntaxError


# map transient states to outputs
TRANSIENT_STATUSES = {
TASK_STATUS_SUBMITTED: TASK_OUTPUT_SUBMITTED,
TASK_STATUS_RUNNING: TASK_OUTPUT_STARTED
}


class CylcWorkflowDBChecker:
"""Object for querying a workflow database."""

Expand Down Expand Up @@ -172,44 +158,12 @@ def _get_db_point_format_compat(self):
):
return row[0]

def status_or_output(self, task_sel):
"""Determine whether to query task status or outputs.
For transient statuses, query the corresponding output
instead to avoid missing it between polls.
xtrigger defaults to succeeded.
CLI does not, in order to allow non-specific queries.
"""
status = None
output = None

if task_sel in TRANSIENT_STATUSES:
if self.back_compat_mode:
# Cylc 7 only stored custom outputs.
status = task_sel
else:
output = TRANSIENT_STATUSES[task_sel]

elif task_sel in TASK_STATUSES_ORDERED:
status = task_sel

elif task_sel in ("finished", "finish"):
status = "finished" # handled by query construction

else:
# Custom output
output = task_sel

return (status, output)

def workflow_state_query(
self,
task: Optional[str] = None,
cycle: Optional[str] = None,
status: Optional[str] = None,
output: Optional[str] = None,
selector: Optional[str] = None,
is_output: Optional[bool] = False,
flow_num: Optional[int] = None,
print_outputs: bool = False
):
Expand Down Expand Up @@ -239,7 +193,7 @@ def workflow_state_query(
stmt_args = []
stmt_wheres = []

if output or (status is None and print_outputs):
if is_output:
target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS
mask = "name, cycle, outputs"
else:
Expand Down Expand Up @@ -279,21 +233,16 @@ def workflow_state_query(
stmt_wheres.append("cycle==?")
stmt_args.append(cycle)

if status:
stmt_frags = []
if status == "finished":
for state in (TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED):
stmt_args.append(state)
stmt_frags.append("status==?")
stmt_wheres.append("(" + (" OR ").join(stmt_frags) + ")")
else:
stmt_wheres.append("status==?")
stmt_args.append(status)
if selector is not None and not is_output:
# Can select by status in the DB but not outputs.
stmt_wheres.append("status==?")
stmt_args.append(selector)

if stmt_wheres:
stmt += "WHERE\n " + (" AND ").join(stmt_wheres)

if status:
if not is_output:
# (outputs table doesn't record submit number)
stmt += dedent("""
ORDER BY
submit_num
Expand All @@ -317,38 +266,30 @@ def workflow_state_query(
res.append(fstr)
db_res.append(res)

if (
status is not None
or (output is None and not print_outputs)
):
if not is_output:
return db_res

results = []
for row in db_res:
outputs = list(json.loads(row[2]))
if output is not None and output not in outputs:
continue
results.append(row[:2] + [str(outputs)] + row[3:])
outputs_map = json.loads(row[2])
if self.back_compat_mode:
# task message
outputs = list(outputs_map.values())
else:
# task output
outputs = list(outputs_map)

if (
selector is None or
selector in outputs or
(
selector in ("finished", "finish")
and (
TASK_OUTPUT_SUCCEEDED in outputs
or TASK_OUTPUT_FAILED in outputs
)
)
):
results.append(row[:2] + [str(outputs)] + row[3:])

return results

def task_state_met(
self,
task: str,
cycle: str,
status: Optional[str] = None,
output: Optional[str] = None,
flow_num: Optional[int] = None
):
"""Return True if cycle/task has achieved status or output.
Call when polling for a task status or output.
"""
# Default to flow 1 for polling a specific task.
if flow_num is None:
flow_num = 1

return bool(
self.workflow_state_query(task, cycle, status, output, flow_num)
)
Loading

0 comments on commit 7772bbc

Please sign in to comment.