Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent task events manager logging the same message more than once #5562

Draft
wants to merge 18 commits into
base: 8.2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5562.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent the same message being logged multiple times when polled.
22 changes: 22 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,3 +1081,25 @@ def select_jobs_for_datastore(
def vacuum(self):
"""Vacuum to the database."""
return self.connect().execute("VACUUM")

def message_in_db(self, itask, event_time, submit_num, message):
"""Has this message been logged in the DB (task_events table)?

If the event is a standard task message (e.g. Submit) then
there will be no message, so we check the event column too.
"""
task_name = itask.tokens['task']
cycle = itask.tokens['cycle']
stmt = r"""
SELECT EXISTS (
SELECT 1 FROM task_events WHERE
name == ?
AND cycle == ?
AND (message == ? OR event == ?)
AND submit_num == ?
AND time == ?
)
"""
stmt_args = [
task_name, cycle, message, message, submit_num, event_time]
return bool(self.connect().execute(stmt, stmt_args).fetchone()[0])
4 changes: 3 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,8 @@ def late_tasks_check(self):
itask.is_late = True
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(
itask, self.task_events_mgr.EVENT_LATE, msg)
itask, time2str(now),
self.task_events_mgr.EVENT_LATE, msg)
self.workflow_db_mgr.put_insert_task_late_flags(itask)

def reset_inactivity_timer(self):
Expand Down Expand Up @@ -1737,6 +1738,7 @@ async def main_loop(self) -> None:
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

self.process_workflow_db_queue()
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
Expand Down
167 changes: 117 additions & 50 deletions cylc/flow/task_events_mgr.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def check_task_jobs(self, workflow, task_pool):
now = time()
poll_tasks = set()
for itask in task_pool.get_tasks():
if self.task_events_mgr.check_job_time(itask, now):
if self.task_events_mgr.check_job_time(itask, now, False):
poll_tasks.add(itask)
if itask.poll_timer.delay is not None:
LOG.info(
Expand Down Expand Up @@ -1009,7 +1009,7 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
self.get_simulation_job_conf(itask, workflow)
)
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED
itask, INFO, TASK_OUTPUT_SUBMITTED,
)

return itasks
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
serialise,
deserialise
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.wallclock import (
get_current_time_string, get_time_string_from_unix_time)
from cylc.flow.platforms import get_platform
from cylc.flow.task_queues.independent import IndepQueueManager

Expand Down Expand Up @@ -1786,10 +1787,12 @@ def _set_expired_task(self, itask):
itask.expire_time = (
itask.get_point_as_seconds() +
itask.get_offset_as_seconds(itask.tdef.expiration_offset))
if time() > itask.expire_time:
now = time()
if now > itask.expire_time:
msg = 'Task expired (skipping job).'
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
self.task_events_mgr.setup_event_handlers(
itask, get_time_string_from_unix_time(now), "expired", msg)
# TODO succeeded and expired states are useless due to immediate
# removal under all circumstances (unhandled failed is still used).
if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False):
Expand Down
1 change: 1 addition & 0 deletions tests/functional/cylc-message/01-newline.t
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLO
LOG="${WORKFLOW_RUN_DIR}/log/job/1/foo/01/job-activity.log"
sed -n '/event-handler-00/,$p' "${LOG}" >'edited-job-activity.log'
sed -i '/job-logs-retrieve/d' 'edited-job-activity.log'
sed -i '/.*succeeded.*/d' 'edited-job-activity.log'
wxtim marked this conversation as resolved.
Show resolved Hide resolved

cmp_ok 'edited-job-activity.log' - <<__LOG__
[(('event-handler-00', 'custom-1'), 1) cmd]
Expand Down
1 change: 0 additions & 1 deletion tests/functional/cylc-poll/15-job-st-file-no-batch.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ run_ok "${TEST_NAME_BASE}-log-2" \
grep -E '1/t1 running .*\(polled\)failed' "${LOG}"

purge
exit
7 changes: 3 additions & 4 deletions tests/functional/restart/58-removed-task.t
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

. "$(dirname "$0")/test_header"

set_test_number 7
set_test_number 6
wxtim marked this conversation as resolved.
Show resolved Hide resolved

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

Expand All @@ -39,10 +39,9 @@ workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}"
TEST_NAME="${TEST_NAME_BASE}-restart"
workflow_run_ok "${TEST_NAME}" cylc play --set="INCL_B_C=False" --no-detach "${WORKFLOW_NAME}"

grep_workflow_log_ok "grep-3" "\[1/a running job:01 flows:1\] (polled)started"
grep_workflow_log_ok "grep-4" "\[1/b failed job:01 flows:1\] (polled)failed"
grep_workflow_log_ok "grep-3" "=> failed"
wxtim marked this conversation as resolved.
Show resolved Hide resolved

# Failed (but not incomplete) task c should not have been polled.
grep_fail "\[1/c failed job:01 flows:1\] (polled)failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log"
grep_fail "\[1/c failed job:01 flows:1\] failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log"
wxtim marked this conversation as resolved.
Show resolved Hide resolved

purge
3 changes: 1 addition & 2 deletions tests/functional/restart/58-removed-task/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
cylc__job__poll_grep_workflow_log "1/b .*failed"
cylc__job__poll_grep_workflow_log "1/c .*failed"
cylc stop --now $CYLC_WORKFLOW_ID
cylc__job__poll_grep_workflow_log "1/a .*(polled)started"
cylc__job__poll_grep_workflow_log "1/b .*(polled)failed"
cylc__job__poll_grep_workflow_log "1/b .*failed"
"""
[[b, c]]
script = "false"
4 changes: 2 additions & 2 deletions tests/functional/startup/00-state-summary.t
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
# Workflow runs and shuts down with a failed task.
cylc play --no-detach "${WORKFLOW_NAME}" > /dev/null 2>&1

# Restart with a failed task and a succeeded task.
cylc play "${WORKFLOW_NAME}"
poll_grep_workflow_log -E '1/foo .* \(polled\)failed'
cylc play "${WORKFLOW_NAME}" > /dev/null 2>&1
cylc dump "${WORKFLOW_NAME}" > dump.out
TEST_NAME=${TEST_NAME_BASE}-grep
# State summary should not just say "Initializing..."
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler

from pathlib import Path
from typing import Any as Fixture


Expand All @@ -42,3 +43,34 @@ async def test_process_job_logs_retrieval_warns_no_platform(
warning = caplog.records[-1]
assert warning.levelname == 'WARNING'
assert 'Unable to retrieve' in warning.msg


async def test_process_message_no_repeat(
one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture
):
"""Don't log received messages if they are found again when."""
reg: str = flow(one_conf)
schd: 'Scheduler' = scheduler(reg, paused_start=True)
message: str = 'The dead swans lay in the stagnant pool'
wxtim marked this conversation as resolved.
Show resolved Hide resolved
message_time: str = 'Thursday Lunchtime'

async with run(schd) as log:
# Set up the database with a message already received:
itask = schd.pool.get_tasks()[0]
itask.tdef.run_mode = 'live'
schd.workflow_db_mgr.put_insert_task_events(
itask, {'time': message_time, 'event': '', 'message': message})
schd.process_workflow_db_queue()

# Task event manager returns None:
assert schd.task_events_mgr.process_message(
itask=itask, severity='comical', message=message,
event_time=message_time, submit_num=0,
flag=schd.task_events_mgr.FLAG_POLLED
) is None

# Log doesn't contain a repeat message:
assert (
schd.task_events_mgr.FLAG_POLLED
not in log.records[-1].message
)
79 changes: 79 additions & 0 deletions tests/unit/test_rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from types import SimpleNamespace

import pytest
from pytest import param

from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.rundb import CylcWorkflowDAO
Expand Down Expand Up @@ -175,3 +176,81 @@ def callback(index, row):
match='not defined.*\n.*foo.*\n.*bar'
):
dao.select_task_pool_for_restart(callback)


@pytest.fixture(scope='module')
def setup_message_in_db(tmp_path_factory):
"""Create a database for use when testing message_in_db"""
tmp_path = tmp_path_factory.mktemp('message_in_db')
db_file = tmp_path / 'db'
setup_stmt = r"""
INSERT INTO task_events
VALUES
(
"qux", "-191",
"morning", "1", "started", ""),
(
"qux", "-190",
"afternoon", "1", "message critical", "Hello Rome");
"""
with CylcWorkflowDAO(db_file, create_tables=True) as dao:
dao.connect().execute(setup_stmt)
yield dao


@pytest.mark.parametrize(
'query, expect',
(
param(
(
SimpleNamespace(tokens={'task': 'qux', 'cycle': '-191'}),
"morning", 1, "started",
),
True,
id="event-name-in-db"
),
param(
(
SimpleNamespace(tokens={'task': 'qux', 'cycle': '-190'}),
"afternoon", 1, "Hello Rome",
),
True,
id="message-in-db"
),
)
)
def test_message_in_db(setup_message_in_db, query, expect):
"""Method correctly says if message is in DB.
"""
assert setup_message_in_db.message_in_db(*query) is expect


def test_message_not_in_db(setup_message_in_db):
"""Method correctly says if message is NOT in DB:
"""
def use_message_in_db(args_):
"""Gather up boilerplate of setting up a fake itask and
providing args to message_in_db."""
itask = SimpleNamespace(
tokens={'task': args_['name'], 'cycle': args_['cycle']})
return setup_message_in_db.message_in_db(
itask, args_['event_time'], args_['submit'], args_['message'])

# A list of args which _should_ be in the db:
args_ = {
'name': "qux",
'cycle': '-190',
'event_time': "afternoon",
'submit': 1,
'message': 'Hello Rome',
}

# Control - can we get True if all args are correct?
assert use_message_in_db(args_) is True

# One at a time break each arg:
for key in args_:
old_arg = args_[key]
args_[key] = 'garbage'
assert use_message_in_db(args_) is False
args_[key] = old_arg
Loading