Skip to content

Commit

Permalink
Fix: Use flow numbers to map jobs to tasks in 8.1.x workflows. (#5309)
Browse files Browse the repository at this point in the history
Fix: Use flow numbers to map jobs to tasks in 8.1.x workflows.
- Handle exception and warn user for 8.0.x workflows.
- Carry on as before for 7.x workflows.
  • Loading branch information
wxtim authored Feb 21, 2023
1 parent b5bd8f2 commit 6e829df
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
5 changes: 5 additions & 0 deletions lib/cylc/cylc-review/static/css/cylc-review.css
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ th,
.livestamp {
white-space: nowrap;
}

.warning {
background-color: #FFBF00;
border-radius: 4px;
}
10 changes: 10 additions & 0 deletions lib/cylc/cylc-review/template/taskjobs.html
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@

<div class="container-fluid">
<div class="row">

{% if eight_point_zero == True %}
<div class="warning col-md-11">
<b>WARNING:</b> Cylc Review cannot display all jobs for Cylc 8.0.x Workflows, some jobs
may be hidden.
<br>
Upgrading to 8.1.x will fix this problem, but may not be possible for workflows with multiple "flows".
<br>
</div>
{% endif %}
<div class="col-md-11">
{% include "suite-state.html" -%}
</div>
Expand Down
11 changes: 8 additions & 3 deletions lib/cylc/review.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ def taskjobs(
self.suite_dao.get_suite_state_summary(user, suite))
data["states"]["last_activity_time"] = (
self.get_last_activity_time(user, suite))
entries, of_n_entries = self.suite_dao.get_suite_job_entries(
user, suite, cycles, tasks, task_status, job_status, order,
per_page, (page - 1) * per_page)
(
entries, of_n_entries, eight_point_zero
) = self.suite_dao.get_suite_job_entries(
user, suite, cycles, tasks,
task_status, job_status, order,
per_page, (page - 1) * per_page
)
data["eight_point_zero"] = eight_point_zero
data["entries"] = entries
data["of_n_entries"] = of_n_entries
if per_page:
Expand Down
37 changes: 31 additions & 6 deletions lib/cylc/review_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tarfile
import re
from glob import glob
from sqlite3 import OperationalError

from cylc.rundb import CylcSuiteDAO
from cylc.task_state import TASK_STATUS_GROUPS
Expand Down Expand Up @@ -95,6 +96,10 @@ class CylcReviewDAO(object):

REC_CYCLE_QUERY_OP = re.compile(r"\A(before |after |[<>]=?)(.+)\Z")
REC_SEQ_LOG = re.compile(r"\A(.+\.)([^\.]+)(\.[^\.]+)\Z")
CANNOT_JOIN_FLOW_NUMS = (
'cannot join using column flow_nums - '
'column not present in both tables'
)

def __init__(self):
self.daos = {}
Expand Down Expand Up @@ -169,7 +174,7 @@ def get_suite_broadcast_events(self, user_name, suite_name):

def get_suite_job_entries(
self, user_name, suite_name, cycles, tasks, task_status,
job_status, order, limit, offset):
job_status, order, limit, offset, flow_nums='flow_nums'):
"""Query suite runtime database to return a listing of task jobs.
user -- A string containing a valid user ID
suite -- A string containing a valid suite ID
Expand All @@ -189,6 +194,8 @@ def get_suite_job_entries(
the keys in CylcReviewDAO.ORDERS.
limit -- Limit number of returned entries
offset -- Offset entry number
flow_nums -- whether to use flow_nums
Return (entries, of_n_entries) where:
entries -- A list of matching entries
of_n_entries -- Total number of entries matching query
Expand All @@ -201,6 +208,8 @@ def get_suite_job_entries(
"out": {...},
"err": {...},
...}}
eight_zero_warning - boolean flag indicating that the database is
a Cylc 8.0 database and we can only get the latest task job.
"""
where_expr, where_args = self._get_suite_job_entries_where(
cycles, tasks, task_status, job_status)
Expand Down Expand Up @@ -236,7 +245,7 @@ def get_suite_job_entries(
" time_run, time_run_exit, run_signal, run_status," +
" platform_name, job_runner_name, job_id" +
" FROM task_states LEFT JOIN task_jobs USING " +
"(cycle, name, submit_num)" +
"(cycle, name, " + flow_nums + ") " +
where_expr +
" ORDER BY " +
self.JOB_ORDERS.get(order, self.JOB_ORDERS["time_desc"])
Expand Down Expand Up @@ -266,9 +275,25 @@ def get_suite_job_entries(
stmt += " LIMIT ? OFFSET ?"
limit_args = [limit, offset]

db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
# Try except loop deals with case (Cylc 8.0) where the database
# doesn't contain enough information to identify multiple jobs
# belonging to the same task:
# https://github.com/cylc/cylc-flow/issues/5247
eight_zero_warning = False
try:
db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
except OperationalError as exc:
if exc.message == self.CANNOT_JOIN_FLOW_NUMS:
stmt = stmt.replace('flow_nums', 'submit_num')
db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
eight_zero_warning = True
else:
raise exc

for row in db_data:
(
cycle, name, submit_num, submit_num_max, task_status,
Expand Down Expand Up @@ -296,7 +321,7 @@ def get_suite_job_entries(
self._db_close(user_name, suite_name)
if entries:
self._get_job_logs(user_name, suite_name, entries, entry_of)
return (entries, of_n_entries)
return (entries, of_n_entries, eight_zero_warning)

def _get_suite_job_entries_where(
self, cycles, tasks, task_status, job_status):
Expand Down

0 comments on commit 6e829df

Please sign in to comment.