diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index 569aa835a8d..b9a3a40e85e 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -19,9 +19,9 @@ import os import sqlite3 import sys -from typing import Optional, List -from textwrap import dedent +from typing import Dict, Iterable, Optional, List, Union +from cylc.flow import LOG from cylc.flow.exceptions import InputError from cylc.flow.cycling.util import add_offset from cylc.flow.cycling.integer import ( @@ -33,13 +33,20 @@ from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.task_outputs import ( TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED + TASK_OUTPUT_FAILED, + TASK_OUTPUT_FINISHED, ) from cylc.flow.util import deserialise_set from metomi.isodatetime.parsers import TimePointParser from metomi.isodatetime.exceptions import ISO8601SyntaxError +output_fallback_msg = ( + "Unable to filter by task output label for tasks run in Cylc versions " + "between 8.0.0-8.3.0. Falling back to filtering by task message instead." +) + + class CylcWorkflowDBChecker: """Object for querying a workflow database.""" @@ -58,12 +65,12 @@ def __init__(self, rund, workflow, db_path=None): # Get workflow point format. try: self.db_point_fmt = self._get_db_point_format() - self.back_compat_mode = False + self.c7_back_compat_mode = False except sqlite3.OperationalError as exc: # BACK COMPAT: Cylc 7 DB (see method below). try: self.db_point_fmt = self._get_db_point_format_compat() - self.back_compat_mode = True + self.c7_back_compat_mode = True except sqlite3.OperationalError: raise exc # original error @@ -182,7 +189,7 @@ def workflow_state_query( ] For an output query: [ - [name, cycle, "[out1: msg1, out2: msg2, ...]"], + [name, cycle, "{out1: msg1, out2: msg2, ...}"], ... ] """ @@ -196,16 +203,16 @@ def workflow_state_query( target_table = CylcWorkflowDAO.TABLE_TASK_STATES mask = "name, cycle, status" - if not self.back_compat_mode: + if not self.c7_back_compat_mode: # Cylc 8 DBs only mask += ", flow_nums" - stmt = dedent(rf''' + stmt = rf''' SELECT {mask} FROM {target_table} - ''') # nosec + ''' # nosec # * mask is hardcoded # * target_table is a code constant @@ -229,7 +236,10 @@ def workflow_state_query( stmt_wheres.append("cycle==?") stmt_args.append(cycle) - if selector is not None and not (is_output or is_message): + if ( + selector is not None + and target_table == CylcWorkflowDAO.TABLE_TASK_STATES + ): # Can select by status in the DB but not outputs. stmt_wheres.append("status==?") stmt_args.append(selector) @@ -237,12 +247,9 @@ def workflow_state_query( if stmt_wheres: stmt += "WHERE\n " + (" AND ").join(stmt_wheres) - if not (is_output or is_message): + if target_table == CylcWorkflowDAO.TABLE_TASK_STATES: # (outputs table doesn't record submit number) - stmt += dedent(""" - ORDER BY - submit_num - """) + stmt += r"ORDER BY submit_num" # Query the DB and drop incompatible rows. db_res = [] @@ -252,7 +259,7 @@ def workflow_state_query( if row[2] is None: # status can be None in Cylc 7 DBs continue - if not self.back_compat_mode: + if not self.c7_back_compat_mode: flow_nums = deserialise_set(row[3]) if flow_num is not None and flow_num not in flow_nums: # skip result, wrong flow @@ -262,34 +269,37 @@ def workflow_state_query( res.append(fstr) db_res.append(res) - if not (is_output or is_message): + if target_table == CylcWorkflowDAO.TABLE_TASK_STATES: return db_res + warn_output_fallback = is_output results = [] for row in db_res: - outputs_map = json.loads(row[2]) - if self.back_compat_mode or is_message: - # task message - try: - outputs = list(outputs_map.values()) - except AttributeError: - # Cylc 8 pre 8.3.0 back-compat: only output messages stored - outputs = list(outputs_map) + outputs: Union[Dict[str, str], List[str]] = json.loads(row[2]) + if isinstance(outputs, dict): + messages: Iterable[str] = outputs.values() else: - # task output - outputs = list(outputs_map) + if warn_output_fallback: + LOG.warning(output_fallback_msg) + warn_output_fallback = False + messages = outputs if ( selector is None or + (is_message and selector in messages) or selector in outputs or - ( - selector in ("finished", "finish") - and ( - TASK_OUTPUT_SUCCEEDED in outputs - or TASK_OUTPUT_FAILED in outputs - ) - ) + self._is_finished(selector, outputs) ): results.append(row[:2] + [str(outputs)] + row[3:]) return results + + @staticmethod + def _is_finished(selector: str, outputs: Iterable[str]) -> bool: + return ( + selector in (TASK_OUTPUT_FINISHED, "finish") + and ( + TASK_OUTPUT_SUCCEEDED in outputs + or TASK_OUTPUT_FAILED in outputs + ) + ) diff --git a/cylc/flow/scripts/workflow_state.py b/cylc/flow/scripts/workflow_state.py index 427f992f32d..77842e26eaf 100755 --- a/cylc/flow/scripts/workflow_state.py +++ b/cylc/flow/scripts/workflow_state.py @@ -113,6 +113,27 @@ INTERVAL = 5 +def unquote(s: str) -> str: + """Remove leading & trailing quotes from a string. + + Examples: + >>> unquote('"foo"') + 'foo' + >>> unquote("'foo'") + 'foo' + >>> unquote('foo') + 'foo' + >>> unquote("'tis a fine morning") + "'tis a fine morning" + """ + if ( + s.startswith('"') and s.endswith('"') + or s.startswith("'") and s.endswith("'") + ): + return s[1:-1] + return s + + class WorkflowPoller(Poller): """An object that polls for task states or outputs in a workflow DB.""" @@ -137,6 +158,8 @@ def __init__( tokens = Tokens(self.id_) self.workflow_id_raw = tokens.workflow_id self.task_sel = tokens["task_sel"] or default_status + if self.task_sel: + self.task_sel = unquote(self.task_sel) self.cycle_raw = tokens["cycle"] self.task = tokens["task"] diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index b5dd12dd1bc..5da3c7a415e 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -586,7 +586,7 @@ def put_insert_task_outputs(self, itask): itask, { "flow_nums": serialise_set(itask.flow_nums), - "outputs": json.dumps([]) + "outputs": json.dumps({}) } ) diff --git a/tests/functional/workflow-state/08-integer.t b/tests/functional/workflow-state/08-integer.t index 21451207760..ff045db4338 100755 --- a/tests/functional/workflow-state/08-integer.t +++ b/tests/functional/workflow-state/08-integer.t @@ -45,9 +45,9 @@ TEST_NAME="${TEST_NAME_BASE}_check_1_outputs" run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --output "${WORKFLOW_NAME}" contains_ok "${TEST_NAME}.stdout" <<__END__ -1/foo:['submitted', 'started', 'succeeded', 'x'] -2/foo:[] -1/bar:['submitted', 'started', 'succeeded'] +1/foo:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded', 'x': 'hello'} +2/foo:{} +1/bar:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded'} __END__ TEST_NAME="${TEST_NAME_BASE}_poll_fail" diff --git a/tests/functional/workflow-state/09-datetime.t b/tests/functional/workflow-state/09-datetime.t index a773883eaab..8d7d3153abd 100755 --- a/tests/functional/workflow-state/09-datetime.t +++ b/tests/functional/workflow-state/09-datetime.t @@ -47,9 +47,9 @@ TEST_NAME="${TEST_NAME_BASE}_check_1_outputs" run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --output "${WORKFLOW_NAME}" contains_ok "${TEST_NAME}.stdout" <<__END__ -2051/foo:['submitted', 'started', 'succeeded', 'x'] -2052/foo:[] -2051/bar:['submitted', 'started', 'succeeded'] +2051/foo:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded', 'x': 'hello'} +2052/foo:{} +2051/bar:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded'} __END__ TEST_NAME="${TEST_NAME_BASE}_poll_fail" diff --git a/tests/functional/workflow-state/10-backcompat.t b/tests/functional/workflow-state/10-backcompat.t index ae2f6162c9d..b22c1d33228 100755 --- a/tests/functional/workflow-state/10-backcompat.t +++ b/tests/functional/workflow-state/10-backcompat.t @@ -33,7 +33,7 @@ contains_ok "${TEST_NAME}.stdout" <<__END__ __END__ # recreate Cylc 7 DB with one NULL status -rm "${WORKFLOW_RUN_DIR}/log/db" +rm "${WORKFLOW_RUN_DIR}/log/db" run_ok "create-db" sqlite3 "${WORKFLOW_RUN_DIR}/log/db" < schema-2.sql TEST_NAME="${TEST_NAME_BASE}_compat_2" @@ -43,12 +43,12 @@ contains_ok "${TEST_NAME}.stdout" <<__END__ 2051/foo:succeeded __END__ -# Cylc 7 DB only contains custom outputs, and only the task message. +# Cylc 7 DB only contains custom outputs TEST_NAME="${TEST_NAME_BASE}_outputs" run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --output "${WORKFLOW_NAME}" contains_ok "${TEST_NAME}.stdout" <<__END__ -2051/foo:['the quick brown fox'] +2051/foo:{'x': 'the quick brown fox'} __END__ purge diff --git a/tests/unit/xtriggers/test_workflow_state.py b/tests/unit/xtriggers/test_workflow_state.py index 64ed79130b6..ce00fde0c6b 100644 --- a/tests/unit/xtriggers/test_workflow_state.py +++ b/tests/unit/xtriggers/test_workflow_state.py @@ -14,20 +14,28 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import logging import sqlite3 -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from shutil import copytree, rmtree +import pytest + +from cylc.flow.dbstatecheck import output_fallback_msg +from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.workflow_files import WorkflowFiles -from cylc.flow.xtriggers.workflow_state import workflow_state +from cylc.flow.xtriggers.workflow_state import ( + _workflow_state_backcompat, + workflow_state, +) +from cylc.flow.xtriggers.suite_state import suite_state if TYPE_CHECKING: - from typing import Callable - from pytest import CaptureFixture + from typing import Any, Callable from pathlib import Path -def test_inferred_run(tmp_run_dir: 'Callable', capsys: 'CaptureFixture'): +def test_inferred_run(tmp_run_dir: 'Callable', capsys: pytest.CaptureFixture): """Test that the workflow_state xtrigger infers the run number. Method: the faked run-dir has no DB to connect to, but the WorkflowPoller @@ -58,7 +66,7 @@ def test_inferred_run(tmp_run_dir: 'Callable', capsys: 'CaptureFixture'): assert expected_workflow_id in capsys.readouterr().err -def test_back_compat(tmp_run_dir: 'Callable', caplog: 'CaptureFixture'): +def test_c7_db_back_compat(tmp_run_dir: 'Callable'): """Test workflow_state xtrigger backwards compatibility with Cylc 7 database.""" id_ = 'celebrimbor' @@ -80,6 +88,11 @@ def test_back_compat(tmp_run_dir: 'Callable', caplog: 'CaptureFixture'): submit_num INTEGER, status TEXT, PRIMARY KEY(name, cycle) ); """) + conn.execute(r""" + CREATE TABLE task_outputs( + cycle TEXT, name TEXT, outputs TEXT, PRIMARY KEY(cycle, name) + ); + """) conn.executemany( r'INSERT INTO "suite_params" VALUES(?,?);', [('cylc_version', '7.8.12'), @@ -87,9 +100,14 @@ def test_back_compat(tmp_run_dir: 'Callable', caplog: 'CaptureFixture'): ('cycle_point_tz', 'Z')] ) conn.execute(r""" - INSERT INTO "task_states" VALUES( - 'mithril','2012','2023-01-30T18:19:15Z','2023-01-30T18:19:15Z', - 0,'succeeded' + INSERT INTO "task_states" VALUES( + 'mithril','2012','2023-01-30T18:19:15Z','2023-01-30T18:19:15Z', + 0,'succeeded' + ); + """) + conn.execute(r""" + INSERT INTO "task_outputs" VALUES( + '2012','mithril','{"frodo": "bag end"}' ); """) conn.commit() @@ -97,14 +115,148 @@ def test_back_compat(tmp_run_dir: 'Callable', caplog: 'CaptureFixture'): conn.close() # Test workflow_state function - satisfied, _ = workflow_state(id_ + '//2012/mithril') + satisfied, _ = workflow_state(f'{id_}//2012/mithril') + assert satisfied + satisfied, _ = workflow_state(f'{id_}//2012/mithril:succeeded') + assert satisfied + satisfied, _ = workflow_state(f'{id_}//2012/mithril:frodo', is_output=True) assert satisfied + satisfied, _ = workflow_state( + f'{id_}//2012/mithril:"bag end"', is_message=True + ) + assert satisfied + satisfied, _ = workflow_state(f'{id_}//2012/mithril:pippin') + assert not satisfied satisfied, _ = workflow_state(id_ + '//2012/arkenstone') assert not satisfied # Test back-compat (old suite_state function) - from cylc.flow.xtriggers.suite_state import suite_state satisfied, _ = suite_state(suite=id_, task='mithril', point='2012') assert satisfied + satisfied, _ = suite_state( + suite=id_, task='mithril', point='2012', status='succeeded' + ) + assert satisfied + satisfied, _ = suite_state( + suite=id_, task='mithril', point='2012', message='bag end' + ) + assert satisfied satisfied, _ = suite_state(suite=id_, task='arkenstone', point='2012') assert not satisfied + + +def test_c8_db_back_compat( + tmp_run_dir: 'Callable', + caplog: pytest.LogCaptureFixture, + log_filter: 'Callable', +): + """Test workflow_state xtrigger backwards compatibility with Cylc < 8.3.0 + database.""" + id_ = 'nazgul' + run_dir: Path = tmp_run_dir(id_) + db_file = run_dir / 'log' / 'db' + db_file.parent.mkdir(exist_ok=True) + # Note: don't use CylcWorkflowDAO here as DB should be frozen + conn = sqlite3.connect(str(db_file)) + try: + conn.execute(r""" + CREATE TABLE workflow_params( + key TEXT, value TEXT, PRIMARY KEY(key) + ); + """) + conn.execute(r""" + CREATE TABLE task_states( + name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, + time_updated TEXT, submit_num INTEGER, status TEXT, + flow_wait INTEGER, is_manual_submit INTEGER, + PRIMARY KEY(name, cycle, flow_nums) + ); + """) + conn.execute(r""" + CREATE TABLE task_outputs( + cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, + PRIMARY KEY(cycle, name, flow_nums) + ); + """) + conn.executemany( + r'INSERT INTO "workflow_params" VALUES(?,?);', + [('cylc_version', '8.2.7'), + ('cycle_point_format', '%Y'), + ('cycle_point_tz', 'Z')] + ) + conn.execute(r""" + INSERT INTO "task_states" VALUES( + 'gimli','2012','[1]','2023-01-30T18:19:15Z', + '2023-01-30T18:19:15Z',1,'succeeded',0,0 + ); + """) + conn.execute(r""" + INSERT INTO "task_outputs" VALUES( + '2012','gimli','[1]', + '["submitted", "started", "succeeded", "axe"]' + ); + """) + conn.commit() + finally: + conn.close() + + gimli = f'{id_}//2012/gimli' + + satisfied, _ = workflow_state(gimli) + assert satisfied + satisfied, _ = workflow_state(f'{gimli}:succeeded') + assert satisfied + caplog.clear() + satisfied, _ = workflow_state(f'{gimli}:axe', is_message=True) + assert satisfied + assert not caplog.records + # Output label selector falls back to message + # (won't work if messsage != output label) + caplog.clear() + satisfied, _ = workflow_state(f'{gimli}:axe', is_output=True) + assert satisfied + assert log_filter( + caplog, level=logging.WARNING, exact_match=output_fallback_msg + ) + + +def test__workflow_state_backcompat(tmp_run_dir: 'Callable'): + """Test the _workflow_state_backcompat & suite_state functions on a + *current* Cylc database.""" + id_ = 'dune' + run_dir: Path = tmp_run_dir(id_) + db_file = run_dir / 'log' / 'db' + db_file.parent.mkdir(exist_ok=True) + with CylcWorkflowDAO(db_file, create_tables=True) as dao: + conn = dao.connect() + conn.executemany( + r'INSERT INTO "workflow_params" VALUES(?,?);', + [('cylc_version', '8.3.0'), + ('cycle_point_format', '%Y'), + ('cycle_point_tz', 'Z')] + ) + conn.execute(r""" + INSERT INTO "task_states" VALUES( + 'arrakis','2012','[1]','2023-01-30T18:19:15Z', + '2023-01-30T18:19:15Z',1,'succeeded',0,0 + ); + """) + conn.execute(r""" + INSERT INTO "task_outputs" VALUES( + '2012','arrakis','[1]', + '{"submitted": "submitted", "started": "started", "succeeded": "succeeded", "paul": "lisan al-gaib"}' + ); + """) + conn.commit() + + func: Any + for func in (_workflow_state_backcompat, suite_state): + satisfied, _ = func(id_, 'arrakis', '2012') + assert satisfied + satisfied, _ = func(id_, 'arrakis', '2012', status='succeeded') + assert satisfied + # Both output label and message work + satisfied, _ = func(id_, 'arrakis', '2012', message='paul') + assert satisfied + satisfied, _ = func(id_, 'arrakis', '2012', message='lisan al-gaib') + assert satisfied