From 6e829dfcecc064b2254edab821422015b4a62afe Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 21 Feb 2023 11:29:26 +0000 Subject: [PATCH] Fix: Use flow numbers to map jobs to tasks in 8.1.x workflows. (#5309) Fix: Use flow numbers to map jobs to tasks in 8.1.x workflows. - Handle exception and warn user for 8.0.x workflows. - Carry on as before for 7.x workflows. --- .../cylc-review/static/css/cylc-review.css | 5 +++ lib/cylc/cylc-review/template/taskjobs.html | 10 +++++ lib/cylc/review.py | 11 ++++-- lib/cylc/review_dao.py | 37 ++++++++++++++++--- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/lib/cylc/cylc-review/static/css/cylc-review.css b/lib/cylc/cylc-review/static/css/cylc-review.css index aab95adf27c..0a21b1b922d 100644 --- a/lib/cylc/cylc-review/static/css/cylc-review.css +++ b/lib/cylc/cylc-review/static/css/cylc-review.css @@ -113,3 +113,8 @@ th, .livestamp { white-space: nowrap; } + +.warning { + background-color: #FFBF00; + border-radius: 4px; +} diff --git a/lib/cylc/cylc-review/template/taskjobs.html b/lib/cylc/cylc-review/template/taskjobs.html index 41551fcca80..b206452cdd7 100644 --- a/lib/cylc/cylc-review/template/taskjobs.html +++ b/lib/cylc/cylc-review/template/taskjobs.html @@ -180,6 +180,16 @@
+ + {% if eight_point_zero == True %} +
+ WARNING: Cylc Review cannot display all jobs for Cylc 8.0.x Workflows, some jobs + may be hidden. +
+ Upgrading to 8.1.x will fix this problem, but may not be possible for workflows with multiple "flows". +
+
+ {% endif %}
{% include "suite-state.html" -%}
diff --git a/lib/cylc/review.py b/lib/cylc/review.py index a298657de17..98543dde8bf 100644 --- a/lib/cylc/review.py +++ b/lib/cylc/review.py @@ -379,9 +379,14 @@ def taskjobs( self.suite_dao.get_suite_state_summary(user, suite)) data["states"]["last_activity_time"] = ( self.get_last_activity_time(user, suite)) - entries, of_n_entries = self.suite_dao.get_suite_job_entries( - user, suite, cycles, tasks, task_status, job_status, order, - per_page, (page - 1) * per_page) + ( + entries, of_n_entries, eight_point_zero + ) = self.suite_dao.get_suite_job_entries( + user, suite, cycles, tasks, + task_status, job_status, order, + per_page, (page - 1) * per_page + ) + data["eight_point_zero"] = eight_point_zero data["entries"] = entries data["of_n_entries"] = of_n_entries if per_page: diff --git a/lib/cylc/review_dao.py b/lib/cylc/review_dao.py index e455154d3ea..e52bddae666 100644 --- a/lib/cylc/review_dao.py +++ b/lib/cylc/review_dao.py @@ -22,6 +22,7 @@ import tarfile import re from glob import glob +from sqlite3 import OperationalError from cylc.rundb import CylcSuiteDAO from cylc.task_state import TASK_STATUS_GROUPS @@ -95,6 +96,10 @@ class CylcReviewDAO(object): REC_CYCLE_QUERY_OP = re.compile(r"\A(before |after |[<>]=?)(.+)\Z") REC_SEQ_LOG = re.compile(r"\A(.+\.)([^\.]+)(\.[^\.]+)\Z") + CANNOT_JOIN_FLOW_NUMS = ( + 'cannot join using column flow_nums - ' + 'column not present in both tables' + ) def __init__(self): self.daos = {} @@ -169,7 +174,7 @@ def get_suite_broadcast_events(self, user_name, suite_name): def get_suite_job_entries( self, user_name, suite_name, cycles, tasks, task_status, - job_status, order, limit, offset): + job_status, order, limit, offset, flow_nums='flow_nums'): """Query suite runtime database to return a listing of task jobs. user -- A string containing a valid user ID suite -- A string containing a valid suite ID @@ -189,6 +194,8 @@ def get_suite_job_entries( the keys in CylcReviewDAO.ORDERS. limit -- Limit number of returned entries offset -- Offset entry number + flow_nums -- whether to use flow_nums + Return (entries, of_n_entries) where: entries -- A list of matching entries of_n_entries -- Total number of entries matching query @@ -201,6 +208,8 @@ def get_suite_job_entries( "out": {...}, "err": {...}, ...}} + eight_zero_warning - boolean flag indicating that the database is + a Cylc 8.0 database and we can only get the latest task job. """ where_expr, where_args = self._get_suite_job_entries_where( cycles, tasks, task_status, job_status) @@ -236,7 +245,7 @@ def get_suite_job_entries( " time_run, time_run_exit, run_signal, run_status," + " platform_name, job_runner_name, job_id" + " FROM task_states LEFT JOIN task_jobs USING " + - "(cycle, name, submit_num)" + + "(cycle, name, " + flow_nums + ") " + where_expr + " ORDER BY " + self.JOB_ORDERS.get(order, self.JOB_ORDERS["time_desc"]) @@ -266,9 +275,25 @@ def get_suite_job_entries( stmt += " LIMIT ? OFFSET ?" limit_args = [limit, offset] - db_data = self._db_exec( - user_name, suite_name, stmt, where_args + limit_args - ) + # Try except loop deals with case (Cylc 8.0) where the database + # doesn't contain enough information to identify multiple jobs + # belonging to the same task: + # https://github.com/cylc/cylc-flow/issues/5247 + eight_zero_warning = False + try: + db_data = self._db_exec( + user_name, suite_name, stmt, where_args + limit_args + ) + except OperationalError as exc: + if exc.message == self.CANNOT_JOIN_FLOW_NUMS: + stmt = stmt.replace('flow_nums', 'submit_num') + db_data = self._db_exec( + user_name, suite_name, stmt, where_args + limit_args + ) + eight_zero_warning = True + else: + raise exc + for row in db_data: ( cycle, name, submit_num, submit_num_max, task_status, @@ -296,7 +321,7 @@ def get_suite_job_entries( self._db_close(user_name, suite_name) if entries: self._get_job_logs(user_name, suite_name, entries, entry_of) - return (entries, of_n_entries) + return (entries, of_n_entries, eight_zero_warning) def _get_suite_job_entries_where( self, cycles, tasks, task_status, job_status):