diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index 2d5024f19b6..4a7d960f3fc 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -18,7 +18,7 @@ import os import sqlite3 import sys -from typing import Optional, Union +from typing import Optional, Tuple, Union from textwrap import dedent from cylc.flow.pathutil import expand_path @@ -68,18 +68,7 @@ def __init__(self, rund, workflow, db_path=None): if not os.path.exists(db_path): raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path) self.conn = sqlite3.connect(db_path, timeout=10.0) - - # Get workflow point format. - try: - self.point_fmt = self._get_pt_fmt() - self.back_compat_mode = False - except sqlite3.OperationalError as exc: - # BACK COMPAT: Cylc 7 DB (see method below). - try: - self.point_fmt = self._get_pt_fmt_compat() - self.back_compat_mode = True - except sqlite3.OperationalError: - raise exc # original error + self.point_fmt, self.back_compat_mode = self._get_point() @staticmethod def display_maps(res): @@ -89,6 +78,22 @@ def display_maps(res): for row in res: sys.stdout.write((", ").join([str(s) for s in row]) + "\n") + def _get_point(self) -> Tuple[Union[None, str], bool]: + """Get point format irrespective of compat mode + + Returns: + (Cycle point format(None if int cycling), is_back_compat) + """ + # Get workflow point format. + try: + return (self._get_pt_fmt(), False) + except sqlite3.OperationalError as exc: + # BACK COMPAT: Cylc 7 DB (see method below). + try: + return (self._get_pt_fmt_compat(), True) + except sqlite3.OperationalError: + raise exc # original error + def _get_pt_fmt(self) -> Union[None, str]: """Query a workflow database for a 'cycle point format' entry diff --git a/tests/unit/test_db_compat.py b/tests/unit/test_db_compat.py index 9b9ca8541d3..955a23ef555 100644 --- a/tests/unit/test_db_compat.py +++ b/tests/unit/test_db_compat.py @@ -131,14 +131,18 @@ def test_cylc_7_db_wflow_params_table(_setup_db): db_file_name = _setup_db([create, insert]) checker = CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name) - with pytest.raises( - sqlite3.OperationalError, match="no such table: workflow_params" - ): - checker._get_pt_fmt() - + assert checker._get_point() == ('CCYY', True) assert checker.point_fmt == ptformat +def test_garbage_db_wflow_params_table(tmp_path): + """If our logic fails we fallback to sqlite error.""" + db_path = tmp_path / 'foo.db' + db_path.touch() + with pytest.raises(sqlite3.OperationalError): + CylcWorkflowDBChecker('foo', 'bar', db_path=db_path) + + def test_pre_830_task_action_timers(_setup_db): """Test back compat for task_action_timers table.