Skip to content

Commit

Permalink
Prevent task events manager logging the same message more than once
Browse files Browse the repository at this point in the history
Deals with a bug was returning every message ever sent by a task on
each poll.
  • Loading branch information
wxtim committed Jun 6, 2023
1 parent bbccb21 commit add64fe
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ for `cylc play` when called by `cylc vip` or `cylc vr`.
Enabled the "stop", "poll", "kill" and "message" commands to be issued from
the UI whilst the workflow is in the process of shutting down.

[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
message being logged multiple times when polled.


## __cylc-8.1.4 (<span actions:bind='release-date'>Released 2023-05-04</span>)__

### Fixes
Expand Down
12 changes: 11 additions & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from cylc.flow.parsec.config import ItemNotFoundError
from cylc.flow.pathutil import (
get_remote_workflow_run_job_dir,
get_workflow_run_job_dir)
get_workflow_run_job_dir,
get_workflow_run_scheduler_log_path,
)
from cylc.flow.subprocctx import SubFuncContext, SubProcContext
from cylc.flow.task_action_timer import (
TaskActionTimer,
Expand Down Expand Up @@ -819,6 +821,14 @@ def _process_message_check(
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity = DEBUG

# If polling gets a message already in the log, don't log it:
if flag == self.FLAG_POLLED:
workflow_log = get_workflow_run_scheduler_log_path(self.workflow)
with open(workflow_log, "r") as handle:
if f'{message} at {event_time}' in handle.read():
return False

LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
return True

Expand Down
45 changes: 45 additions & 0 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler

from pathlib import Path
from typing import Any as Fixture


Expand All @@ -42,3 +43,47 @@ async def test_process_job_logs_retrieval_warns_no_platform(
warning = caplog.records[-1]
assert warning.levelname == 'WARNING'
assert 'Unable to retrieve' in warning.msg


async def test_process_message_no_repeat(
one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture, start
):
"""Don't log received messages if they are found again."""
reg: str = flow(one_conf)
schd: 'Scheduler' = scheduler(reg)

async with start(schd):
# Setup `job-activity.log` path:
scheduler_log = (
Path(schd.workflow_run_dir)
/ 'log/scheduler/log'
)
scheduler_log.parent.mkdir(parents=True, exist_ok=True)
scheduler_log.touch()

args = {
'itask': schd.pool.get_tasks()[0],
'severity': 'comical',
'message': 'The dead swans lay in the stagnant pool',
'event_time': 'Thursday',
'flag': '(received)',
'submit_num': 0
}
# Process message should continue (i.e. check is True):
assert schd.task_events_mgr._process_message_check(**args) is True
# We have logged this message.
# assert schd.task_events_mgr.FLAG_RECEIVED in log.records[-1].message

args = {
'itask': schd.pool.get_tasks()[0],
'severity': 'comical',
'message': 'The dead swans lay in the stagnant pool',
'event_time': 'Thursday',
'flag': '(polled)',
'submit_num': 0
}
# Process message should not continue - we've seen it before,
# albeit with a different flag:
assert schd.task_events_mgr._process_message_check(**args) is True
# We haven't logged another message:
# assert schd.task_events_mgr.FLAG_RECEIVED in log.records[-1].message

0 comments on commit add64fe

Please sign in to comment.