From 06553d04f3cd066553fe8319f913350e20fdbe67 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Tue, 25 Jul 2023 16:43:36 +0100
Subject: [PATCH 01/17] Bump dev version 8.3.0.dev
---
cylc/flow/__init__.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py
index ac1743a704f..bc9f24aa573 100644
--- a/cylc/flow/__init__.py
+++ b/cylc/flow/__init__.py
@@ -53,7 +53,7 @@ def environ_init():
environ_init()
-__version__ = '8.2.1.dev'
+__version__ = '8.3.0.dev'
def iter_entry_points(entry_point_name):
From 85d4e06936253bcc101fac96ed89e95140ff8197 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Fri, 14 Jul 2023 12:56:54 +0100
Subject: [PATCH 02/17] Prevent logging of messages already logged
---
CHANGES.md | 4 +
cylc/flow/rundb.py | 24 ++++++
cylc/flow/scheduler.py | 4 +-
cylc/flow/task_events_mgr.py | 66 +++++++++++-----
cylc/flow/task_job_mgr.py | 2 +-
cylc/flow/task_pool.py | 9 ++-
tests/functional/cylc-message/01-newline.t | 1 +
.../functional/restart/28-execution-timeout.t | 7 +-
tests/functional/restart/58-removed-task.t | 7 +-
.../restart/58-removed-task/flow.cylc | 3 +-
...ggered.t => 59-waiting-manual-triggered.t} | 0
.../flow.cylc | 0
tests/functional/startup/00-state-summary.t | 4 +-
tests/integration/test_task_events_mgr.py | 32 ++++++++
tests/unit/test_rundb.py | 79 +++++++++++++++++++
15 files changed, 202 insertions(+), 40 deletions(-)
rename tests/functional/restart/{58-waiting-manual-triggered.t => 59-waiting-manual-triggered.t} (100%)
rename tests/functional/restart/{58-waiting-manual-triggered => 59-waiting-manual-triggered}/flow.cylc (100%)
diff --git a/CHANGES.md b/CHANGES.md
index 483023c0dd4..65ee51bd89a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -126,6 +126,10 @@ resuming a workflow.
[#5625](https://github.com/cylc/cylc-flow/pull/5625) - Exclude `setuptools`
version (v67) which results in dependency check failure with editable installs.
+[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
+message being logged multiple times when polled.
+
+
## __cylc-8.1.4 (Released 2023-05-04)__
### Fixes
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index f7632b8997e..07d15c013ee 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -1081,3 +1081,27 @@ def select_jobs_for_datastore(
def vacuum(self):
"""Vacuum to the database."""
return self.connect().execute("VACUUM")
+
+ def message_in_db(self, itask, event_time, submit, message):
+ """Has this message been logged in the DB (task_events table)?
+
+ SQL Query in plain English:
+ Say whether the inner query returns any results:
+
+ If the event is a standard task message (e.g. Submit) then
+ there will be no message, so we chect the event column too.
+ """
+ task_name = itask.tokens['task']
+ cycle = itask.tokens['cycle']
+ stmt = r"""
+ SELECT EXISTS (
+ SELECT 1 FROM task_events WHERE
+ name == ?
+ AND cycle == ?
+ AND (message == ? OR event == ?)
+ AND submit_num == ?
+ AND time == ?
+ )
+ """
+ stmt_args = [task_name, cycle, message, message, submit, event_time]
+ return bool(self.connect().execute(stmt, stmt_args).fetchone()[0])
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index f73e197ab1b..043e6d4a086 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1508,7 +1508,8 @@ def late_tasks_check(self):
itask.is_late = True
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(
- itask, self.task_events_mgr.EVENT_LATE, msg)
+ itask, time2str(now),
+ self.task_events_mgr.EVENT_LATE, msg)
self.workflow_db_mgr.put_insert_task_late_flags(itask)
def reset_inactivity_timer(self):
@@ -1737,6 +1738,7 @@ async def main_loop(self) -> None:
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
+ self.process_workflow_db_queue()
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 010181ab3af..ce489231238 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -75,6 +75,7 @@
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED)
from cylc.flow.wallclock import (
get_current_time_string,
+ get_time_string_from_unix_time,
get_seconds_as_interval_string as intvl_as_str
)
from cylc.flow.workflow_events import (
@@ -426,7 +427,8 @@ def check_job_time(self, itask, now):
itask.timeout = None # emit event only once
if msg and event:
LOG.warning(f"[{itask}] {msg}")
- self.setup_event_handlers(itask, event, msg)
+ self.setup_event_handlers(
+ itask, get_time_string_from_unix_time(now), event, msg)
return True
else:
return can_poll
@@ -587,8 +589,18 @@ def process_message(
# Any message represents activity.
self.reset_inactivity_timer_func()
- if not self._process_message_check(
- itask, severity, message, event_time, flag, submit_num):
+ message_already_received = self.workflow_db_mgr.pri_dao.message_in_db(
+ itask, event_time, submit_num, message)
+ if (
+ itask.tdef.run_mode == 'live'
+ and message_already_received
+ ):
+ return None
+
+ if (
+ not self._process_message_check(
+ itask, severity, message, event_time, flag, submit_num)
+ ):
return None
# always update the workflow state summary for latest message
@@ -615,7 +627,8 @@ def process_message(
self.EVENT_STARTED]
and not itask.state.outputs.is_completed(TASK_OUTPUT_STARTED)):
self.setup_event_handlers(
- itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
+ itask, event_time,
+ self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
self.spawn_func(itask, TASK_OUTPUT_STARTED)
if message == self.EVENT_STARTED:
@@ -692,7 +705,7 @@ def process_message(
):
return True
signal = message[len(FAIL_MESSAGE_PREFIX):]
- self._db_events_insert(itask, "signaled", signal)
+ self._db_events_insert(itask, event_time, "signaled", signal)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
@@ -706,14 +719,14 @@ def process_message(
):
return True
aborted_with = message[len(ABORT_MESSAGE_PREFIX):]
- self._db_events_insert(itask, "aborted", message)
+ self._db_events_insert(itask, event_time, "aborted", message)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(itask, event_time, aborted_with):
self.spawn_func(itask, TASK_OUTPUT_FAILED)
elif message.startswith(VACATION_MESSAGE_PREFIX):
# Task job pre-empted into a vacation state
- self._db_events_insert(itask, "vacated", message)
+ self._db_events_insert(itask, event_time, "vacated", message)
itask.set_summary_time('started') # unset
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
@@ -728,11 +741,12 @@ def process_message(
# this feature can only be used on the deprecated loadleveler
# system, we should probably aim to remove support for job vacation
# instead. Otherwise, we should have:
- # self.setup_event_handlers(itask, 'vacated', message)
+ # self.setup_event_handlers(itask, event_time, 'vacated', message)
elif completed_trigger:
# Message of an as-yet unreported custom task output.
# No state change.
- self.setup_event_handlers(itask, completed_trigger, message)
+ self.setup_event_handlers(
+ itask, event_time, completed_trigger, message)
self.spawn_func(itask, msg0)
else:
# Unhandled messages. These include:
@@ -742,10 +756,10 @@ def process_message(
# No state change.
LOG.debug(f"[{itask}] unhandled: {message}")
self._db_events_insert(
- itask, (f"message {lseverity}"), message)
+ itask, event_time, (f"message {lseverity}"), message)
if lseverity in self.NON_UNIQUE_EVENTS:
itask.non_unique_events.update({lseverity: 1})
- self.setup_event_handlers(itask, lseverity, message)
+ self.setup_event_handlers(itask, event_time, lseverity, message)
return None
def _process_message_check(
@@ -767,6 +781,7 @@ def _process_message_check(
timestamp = f" at {event_time}"
else:
timestamp = ""
+
if flag == self.FLAG_RECEIVED and submit_num != itask.submit_num:
# Ignore received messages from old jobs
LOG.warning(
@@ -819,17 +834,20 @@ def _process_message_check(
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity = DEBUG
+
LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
return True
- def setup_event_handlers(self, itask, event, message):
+ def setup_event_handlers(
+ self, itask: 'TaskProxy', event_time: str, event: str, message: str
+ ) -> None:
"""Set up handlers for a task event."""
if itask.tdef.run_mode != 'live':
return
msg = ""
if message != f"job {event}":
msg = message
- self._db_events_insert(itask, event, msg)
+ self._db_events_insert(itask, event_time, event, msg)
self._setup_job_logs_retrieval(itask, event)
self._setup_event_mail(itask, event)
self._setup_custom_event_handlers(itask, event, message)
@@ -843,10 +861,10 @@ def _custom_handler_callback(self, ctx, schd, id_key):
else:
self.unset_waiting_event_timer(id_key)
- def _db_events_insert(self, itask, event="", message=""):
+ def _db_events_insert(self, itask, event_time, event="", message=""):
"""Record an event to the DB."""
self.workflow_db_mgr.put_insert_task_events(itask, {
- "time": get_current_time_string(),
+ "time": event_time,
"event": event,
"message": message})
@@ -1124,7 +1142,8 @@ def _process_message_failed(self, itask, event_time, message):
):
# No retry lined up: definitive failure.
if itask.state_reset(TASK_STATUS_FAILED):
- self.setup_event_handlers(itask, self.EVENT_FAILED, message)
+ self.setup_event_handlers(
+ itask, event_time, self.EVENT_FAILED, message)
self.data_store_mgr.delta_task_state(itask)
no_retries = True
else:
@@ -1134,7 +1153,8 @@ def _process_message_failed(self, itask, event_time, message):
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
msg = f"{self.JOB_FAILED}, {delay_msg}"
- self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
+ self.setup_event_handlers(
+ itask, event_time, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
return no_retries
@@ -1151,7 +1171,8 @@ def _process_message_started(self, itask, event_time):
"time_run": itask.summary['started_time_string']})
if itask.state_reset(TASK_STATUS_RUNNING):
self.setup_event_handlers(
- itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
+ itask, event_time,
+ self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
@@ -1177,7 +1198,8 @@ def _process_message_succeeded(self, itask, event_time):
itask.summary['started_time'])
if itask.state_reset(TASK_STATUS_SUCCEEDED):
self.setup_event_handlers(
- itask, self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}")
+ itask, event_time,
+ self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}")
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
@@ -1204,7 +1226,7 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
no_retries = True
if itask.state_reset(TASK_STATUS_SUBMIT_FAILED):
self.setup_event_handlers(
- itask, self.EVENT_SUBMIT_FAILED,
+ itask, event_time, self.EVENT_SUBMIT_FAILED,
f'job {self.EVENT_SUBMIT_FAILED}')
self.data_store_mgr.delta_task_state(itask)
else:
@@ -1214,7 +1236,8 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
msg = f"job {self.EVENT_SUBMIT_FAILED}, {delay_msg}"
- self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg)
+ self.setup_event_handlers(
+ itask, event_time, self.EVENT_SUBMIT_RETRY, msg)
# Register newly submit-failed job with the database and datastore.
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
@@ -1261,6 +1284,7 @@ def _process_message_submitted(
itask.state_reset(is_queued=False)
self.setup_event_handlers(
itask,
+ event_time,
self.EVENT_SUBMITTED,
f'job {self.EVENT_SUBMITTED}',
)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 524ac6d5b93..f7b5df86c30 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1009,7 +1009,7 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
self.get_simulation_job_conf(itask, workflow)
)
self.task_events_mgr.process_message(
- itask, INFO, TASK_OUTPUT_SUBMITTED
+ itask, INFO, TASK_OUTPUT_SUBMITTED,
)
return itasks
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index e7d85f66938..dfe3ce7b8f6 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -66,7 +66,8 @@
serialise,
deserialise
)
-from cylc.flow.wallclock import get_current_time_string
+from cylc.flow.wallclock import (
+ get_current_time_string, get_time_string_from_unix_time)
from cylc.flow.platforms import get_platform
from cylc.flow.task_queues.independent import IndepQueueManager
@@ -1786,10 +1787,12 @@ def _set_expired_task(self, itask):
itask.expire_time = (
itask.get_point_as_seconds() +
itask.get_offset_as_seconds(itask.tdef.expiration_offset))
- if time() > itask.expire_time:
+ now = time()
+ if now > itask.expire_time:
msg = 'Task expired (skipping job).'
LOG.warning(f"[{itask}] {msg}")
- self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
+ self.task_events_mgr.setup_event_handlers(
+ itask, get_time_string_from_unix_time(now), "expired", msg)
# TODO succeeded and expired states are useless due to immediate
# removal under all circumstances (unhandled failed is still used).
if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False):
diff --git a/tests/functional/cylc-message/01-newline.t b/tests/functional/cylc-message/01-newline.t
index a0fceca4c57..2c984bbe124 100755
--- a/tests/functional/cylc-message/01-newline.t
+++ b/tests/functional/cylc-message/01-newline.t
@@ -45,6 +45,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLO
LOG="${WORKFLOW_RUN_DIR}/log/job/1/foo/01/job-activity.log"
sed -n '/event-handler-00/,$p' "${LOG}" >'edited-job-activity.log'
sed -i '/job-logs-retrieve/d' 'edited-job-activity.log'
+sed -i '/.*succeeded.*/d' 'edited-job-activity.log'
cmp_ok 'edited-job-activity.log' - <<__LOG__
[(('event-handler-00', 'custom-1'), 1) cmd]
diff --git a/tests/functional/restart/28-execution-timeout.t b/tests/functional/restart/28-execution-timeout.t
index e0b51fd4304..c26296b5429 100755
--- a/tests/functional/restart/28-execution-timeout.t
+++ b/tests/functional/restart/28-execution-timeout.t
@@ -17,17 +17,12 @@
#-------------------------------------------------------------------------------
# Test restart with running task with execution timeout.
. "$(dirname "$0")/test_header"
-set_test_number 4
+set_test_number 3
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play "${WORKFLOW_NAME}" --no-detach --reference-test
-contains_ok "${WORKFLOW_RUN_DIR}/log/job/1/foo/NN/job-activity.log" <<'__LOG__'
-[(('event-handler-00', 'execution timeout'), 1) cmd] echo 1/foo 'execution timeout'
-[(('event-handler-00', 'execution timeout'), 1) ret_code] 0
-[(('event-handler-00', 'execution timeout'), 1) out] 1/foo execution timeout
-__LOG__
purge
exit
diff --git a/tests/functional/restart/58-removed-task.t b/tests/functional/restart/58-removed-task.t
index 17dc19f626e..59839c751c1 100755
--- a/tests/functional/restart/58-removed-task.t
+++ b/tests/functional/restart/58-removed-task.t
@@ -22,7 +22,7 @@
. "$(dirname "$0")/test_header"
-set_test_number 7
+set_test_number 6
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
@@ -39,10 +39,9 @@ workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}"
TEST_NAME="${TEST_NAME_BASE}-restart"
workflow_run_ok "${TEST_NAME}" cylc play --set="INCL_B_C=False" --no-detach "${WORKFLOW_NAME}"
-grep_workflow_log_ok "grep-3" "\[1/a running job:01 flows:1\] (polled)started"
-grep_workflow_log_ok "grep-4" "\[1/b failed job:01 flows:1\] (polled)failed"
+grep_workflow_log_ok "grep-3" "=> failed"
# Failed (but not incomplete) task c should not have been polled.
-grep_fail "\[1/c failed job:01 flows:1\] (polled)failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log"
+grep_fail "\[1/c failed job:01 flows:1\] failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log"
purge
diff --git a/tests/functional/restart/58-removed-task/flow.cylc b/tests/functional/restart/58-removed-task/flow.cylc
index 94c5cf27b24..f1166b96824 100644
--- a/tests/functional/restart/58-removed-task/flow.cylc
+++ b/tests/functional/restart/58-removed-task/flow.cylc
@@ -25,8 +25,7 @@
cylc__job__poll_grep_workflow_log "1/b .*failed"
cylc__job__poll_grep_workflow_log "1/c .*failed"
cylc stop --now $CYLC_WORKFLOW_ID
- cylc__job__poll_grep_workflow_log "1/a .*(polled)started"
- cylc__job__poll_grep_workflow_log "1/b .*(polled)failed"
+ cylc__job__poll_grep_workflow_log "1/b .*failed"
"""
[[b, c]]
script = "false"
diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/59-waiting-manual-triggered.t
similarity index 100%
rename from tests/functional/restart/58-waiting-manual-triggered.t
rename to tests/functional/restart/59-waiting-manual-triggered.t
diff --git a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc b/tests/functional/restart/59-waiting-manual-triggered/flow.cylc
similarity index 100%
rename from tests/functional/restart/58-waiting-manual-triggered/flow.cylc
rename to tests/functional/restart/59-waiting-manual-triggered/flow.cylc
diff --git a/tests/functional/startup/00-state-summary.t b/tests/functional/startup/00-state-summary.t
index a4a02208899..73b206f8d07 100644
--- a/tests/functional/startup/00-state-summary.t
+++ b/tests/functional/startup/00-state-summary.t
@@ -28,9 +28,9 @@ run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
# Workflow runs and shuts down with a failed task.
cylc play --no-detach "${WORKFLOW_NAME}" > /dev/null 2>&1
+
# Restart with a failed task and a succeeded task.
-cylc play "${WORKFLOW_NAME}"
-poll_grep_workflow_log -E '1/foo .* \(polled\)failed'
+cylc play "${WORKFLOW_NAME}" > /dev/null 2>&1
cylc dump "${WORKFLOW_NAME}" > dump.out
TEST_NAME=${TEST_NAME_BASE}-grep
# State summary should not just say "Initializing..."
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index baa133cd6cd..2bb566dc3dc 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
+from pathlib import Path
from typing import Any as Fixture
@@ -42,3 +43,34 @@ async def test_process_job_logs_retrieval_warns_no_platform(
warning = caplog.records[-1]
assert warning.levelname == 'WARNING'
assert 'Unable to retrieve' in warning.msg
+
+
+async def test_process_message_no_repeat(
+ one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture
+):
+ """Don't log received messages if they are found again when."""
+ reg: str = flow(one_conf)
+ schd: 'Scheduler' = scheduler(reg, paused_start=True)
+ message: str = 'The dead swans lay in the stagnant pool'
+ message_time: str = 'Thursday Lunchtime'
+
+ async with run(schd) as log:
+ # Set up the database with a message already received:
+ itask = schd.pool.get_tasks()[0]
+ itask.tdef.run_mode = 'live'
+ schd.workflow_db_mgr.put_insert_task_events(
+ itask, {'time': message_time, 'event': '', 'message': message})
+ schd.process_workflow_db_queue()
+
+ # Task event manager returns None:
+ assert schd.task_events_mgr.process_message(
+ itask=itask, severity='comical', message=message,
+ event_time=message_time, submit_num=0,
+ flag=schd.task_events_mgr.FLAG_POLLED
+ ) is None
+
+ # Log doesn't contain a repeat message:
+ assert (
+ schd.task_events_mgr.FLAG_POLLED
+ not in log.records[-1].message
+ )
diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py
index 372162b1387..32ca9c3a4b8 100644
--- a/tests/unit/test_rundb.py
+++ b/tests/unit/test_rundb.py
@@ -24,6 +24,7 @@
from types import SimpleNamespace
import pytest
+from pytest import param
from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.rundb import CylcWorkflowDAO
@@ -175,3 +176,81 @@ def callback(index, row):
match='not defined.*\n.*foo.*\n.*bar'
):
dao.select_task_pool_for_restart(callback)
+
+
+@pytest.fixture(scope='module')
+def setup_message_in_db(tmp_path_factory):
+ """Create a database for use when testing message_in_db"""
+ tmp_path = tmp_path_factory.mktemp('message_in_db')
+ db_file = tmp_path / 'db'
+ dao = CylcWorkflowDAO(db_file, create_tables=True)
+ setup_stmt = r"""
+ INSERT INTO task_events
+ VALUES
+ (
+ "qux", "-191",
+ "morning", "1", "started", ""),
+ (
+ "qux", "-190",
+ "afternoon", "1", "message critical", "Hello Rome");
+ """
+ dao.connect().execute(setup_stmt)
+ yield dao
+
+
+@pytest.mark.parametrize(
+ 'query, expect',
+ (
+ param(
+ (
+ SimpleNamespace(tokens={'task': 'qux', 'cycle': '-191'}),
+ "morning", 1, "started",
+ ),
+ True,
+ id="event-name-in-db"
+ ),
+ param(
+ (
+ SimpleNamespace(tokens={'task': 'qux', 'cycle': '-190'}),
+ "afternoon", 1, "Hello Rome",
+ ),
+ True,
+ id="message-in-db"
+ ),
+ )
+)
+def test_message_in_db(setup_message_in_db, query, expect):
+ """Method correctly says if message is in DB.
+ """
+ assert setup_message_in_db.message_in_db(*query) is expect
+
+
+def test_message_not_in_db(setup_message_in_db):
+ """Method correctly says if message is NOT in DB:
+ """
+ def use_message_in_db(args_):
+ """Gather up boilerplate of setting up a fake itask and
+ providing args to message_in_db."""
+ itask = SimpleNamespace(
+ tokens={'task': args_['name'], 'cycle': args_['cycle']})
+ return setup_message_in_db.message_in_db(
+ itask, args_['event_time'], args_['submit'], args_['message'])
+
+ # A list of args which _should_ be in the db:
+ args_ = {
+ 'name': "qux",
+ 'cycle': '-190',
+ 'event_time': "afternoon",
+ 'submit': 1,
+ 'message': 'Hello Rome',
+ }
+
+ # Control - can we get True if all args are correct?
+ assert use_message_in_db(args_) is True
+
+ # One at a time break each arg:
+ for key in args_:
+ old_arg = args_[key]
+ args_[key] = 'garbage'
+ assert use_message_in_db(args_) is False
+ args_[key] = old_arg
From e628d430298df86149f0c9bbfc9f4e5c8a997f08 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 19 Jul 2023 08:36:09 +0100
Subject: [PATCH 03/17] Response to review
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
cylc/flow/rundb.py | 12 +++++-------
tests/unit/test_rundb.py | 6 +++---
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index 07d15c013ee..3ca9b53d770 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -1082,14 +1082,11 @@ def vacuum(self):
"""Vacuum to the database."""
return self.connect().execute("VACUUM")
- def message_in_db(self, itask, event_time, submit, message):
+ def message_in_db(self, itask, event_time, submit_num, message):
"""Has this message been logged in the DB (task_events table)?
- SQL Query in plain English:
- Say whether the inner query returns any results:
-
- If the event is a standard task message (e.g. Submit) then
- there will be no message, so we chect the event column too.
+ If the event is a standard task message (e.g. Submit) then
+ there will be no message, so we check the event column too.
"""
task_name = itask.tokens['task']
cycle = itask.tokens['cycle']
@@ -1103,5 +1100,6 @@ def message_in_db(self, itask, event_time, submit, message):
AND time == ?
)
"""
- stmt_args = [task_name, cycle, message, message, submit, event_time]
+ stmt_args = [
+ task_name, cycle, message, message, submit_num, event_time]
return bool(self.connect().execute(stmt, stmt_args).fetchone()[0])
diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py
index 32ca9c3a4b8..382709b94a5 100644
--- a/tests/unit/test_rundb.py
+++ b/tests/unit/test_rundb.py
@@ -183,7 +183,6 @@ def setup_message_in_db(tmp_path_factory):
"""Create a database for use when testing message_in_db"""
tmp_path = tmp_path_factory.mktemp('message_in_db')
db_file = tmp_path / 'db'
- dao = CylcWorkflowDAO(db_file, create_tables=True)
setup_stmt = r"""
INSERT INTO task_events
VALUES
@@ -194,8 +193,9 @@ def setup_message_in_db(tmp_path_factory):
"qux", "-190",
"afternoon", "1", "message critical", "Hello Rome");
"""
- dao.connect().execute(setup_stmt)
- yield dao
+ with CylcWorkflowDAO(db_file, create_tables=True) as dao:
+ dao.connect().execute(setup_stmt)
+ yield dao
@pytest.mark.parametrize(
From 647bd55bbd424b5ab5ce40ae33cf7d51f5a26856 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:02:07 +0100
Subject: [PATCH 04/17] put test back
---
cylc/flow/task_events_mgr.py | 19 ++++++++++---------
.../functional/restart/28-execution-timeout.t | 7 ++++++-
2 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index ce489231238..59f504de575 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -589,17 +589,16 @@ def process_message(
# Any message represents activity.
self.reset_inactivity_timer_func()
- message_already_received = self.workflow_db_mgr.pri_dao.message_in_db(
- itask, event_time, submit_num, message)
+ quiet_msg_check = False
if (
itask.tdef.run_mode == 'live'
- and message_already_received
+ and self.workflow_db_mgr.pri_dao.message_in_db(
+ itask, event_time, submit_num, message)
):
- return None
-
- if (
- not self._process_message_check(
- itask, severity, message, event_time, flag, submit_num)
+ quiet_msg_check = True
+ if not self._process_message_check(
+ itask, severity, message, event_time, flag, submit_num,
+ quiet=quiet_msg_check,
):
return None
@@ -770,6 +769,7 @@ def _process_message_check(
event_time: str,
flag: str,
submit_num: int,
+ quiet: bool = False,
) -> bool:
"""Helper for `.process_message`.
@@ -835,7 +835,8 @@ def _process_message_check(
}:
severity = DEBUG
- LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
+ if not quiet:
+ LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
return True
def setup_event_handlers(
diff --git a/tests/functional/restart/28-execution-timeout.t b/tests/functional/restart/28-execution-timeout.t
index c26296b5429..e0b51fd4304 100755
--- a/tests/functional/restart/28-execution-timeout.t
+++ b/tests/functional/restart/28-execution-timeout.t
@@ -17,12 +17,17 @@
#-------------------------------------------------------------------------------
# Test restart with running task with execution timeout.
. "$(dirname "$0")/test_header"
-set_test_number 3
+set_test_number 4
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play "${WORKFLOW_NAME}" --no-detach --reference-test
+contains_ok "${WORKFLOW_RUN_DIR}/log/job/1/foo/NN/job-activity.log" <<'__LOG__'
+[(('event-handler-00', 'execution timeout'), 1) cmd] echo 1/foo 'execution timeout'
+[(('event-handler-00', 'execution timeout'), 1) ret_code] 0
+[(('event-handler-00', 'execution timeout'), 1) out] 1/foo execution timeout
+__LOG__
purge
exit
From 26a37cae9509a0f7a13fc7d0ccf298486a5f5028 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:36:04 +0100
Subject: [PATCH 05/17] update changelog
---
CHANGES.md | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 65ee51bd89a..34851588c61 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,12 @@ $ towncrier create ..md --content "Short description"
+## __cylc-8.2.1 (Upcoming)__
+# Fixes
+
+[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
+message being logged multiple times when polled.
+
## __cylc-8.2.0 (Released 2023-07-21)__
### Breaking Changes
@@ -126,10 +132,6 @@ resuming a workflow.
[#5625](https://github.com/cylc/cylc-flow/pull/5625) - Exclude `setuptools`
version (v67) which results in dependency check failure with editable installs.
-[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
-message being logged multiple times when polled.
-
-
## __cylc-8.1.4 (Released 2023-05-04)__
### Fixes
From 0ef9f2d7aed033d77102cb8c6a85f3c6d20d079d Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 26 Jul 2023 09:53:18 +0100
Subject: [PATCH 06/17] another attempt to fix this problem
---
cylc/flow/task_events_mgr.py | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 59f504de575..ebb2d66f6ec 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -589,16 +589,19 @@ def process_message(
# Any message represents activity.
self.reset_inactivity_timer_func()
- quiet_msg_check = False
+ # In live mode check whether the message has been recieved and
+ # put in the database before:
+ repeat_message = False
if (
itask.tdef.run_mode == 'live'
and self.workflow_db_mgr.pri_dao.message_in_db(
itask, event_time, submit_num, message)
):
- quiet_msg_check = True
+ repeat_message = True
+
if not self._process_message_check(
itask, severity, message, event_time, flag, submit_num,
- quiet=quiet_msg_check,
+ quiet=repeat_message,
):
return None
@@ -607,14 +610,15 @@ def process_message(
new_msg = f'{message} {self.FLAG_POLLED}'
else:
new_msg = message
- self.data_store_mgr.delta_job_msg(
- itask.tokens.duplicate(job=str(submit_num)),
- new_msg
- )
+
+ if not repeat_message:
+ self.data_store_mgr.delta_job_msg(
+ itask.tokens.duplicate(job=str(submit_num)),
+ new_msg
+ )
# Satisfy my output, if possible, and spawn children.
# (first remove signal: failed/EXIT -> failed)
-
msg0 = message.split('/')[0]
completed_trigger = itask.state.outputs.set_msg_trg_completion(
message=msg0, is_completed=True)
From 70b739358a2a72917116fb4323913f4173b002e8 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 26 Jul 2023 11:12:11 +0100
Subject: [PATCH 07/17] convert to towncrier
---
CHANGES.md | 6 ------
changes.d/5562.fix.md | 1 +
2 files changed, 1 insertion(+), 6 deletions(-)
create mode 100644 changes.d/5562.fix.md
diff --git a/CHANGES.md b/CHANGES.md
index 34851588c61..483023c0dd4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,12 +11,6 @@ $ towncrier create ..md --content "Short description"
-## __cylc-8.2.1 (Upcoming)__
-# Fixes
-
-[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
-message being logged multiple times when polled.
-
## __cylc-8.2.0 (Released 2023-07-21)__
### Breaking Changes
diff --git a/changes.d/5562.fix.md b/changes.d/5562.fix.md
new file mode 100644
index 00000000000..f1ff0d8cd5b
--- /dev/null
+++ b/changes.d/5562.fix.md
@@ -0,0 +1 @@
+Prevent the same message being logged multiple times when polled.
\ No newline at end of file
From f6cc2ca30df86c015d40816496d2e5a06b6c1376 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Fri, 14 Jul 2023 12:56:54 +0100
Subject: [PATCH 08/17] Prevent logging of messages already logged
---
CHANGES.md | 4 ++++
tests/functional/restart/28-execution-timeout.t | 7 +------
2 files changed, 5 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 483023c0dd4..65ee51bd89a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -126,6 +126,10 @@ resuming a workflow.
[#5625](https://github.com/cylc/cylc-flow/pull/5625) - Exclude `setuptools`
version (v67) which results in dependency check failure with editable installs.
+[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
+message being logged multiple times when polled.
+
+
## __cylc-8.1.4 (Released 2023-05-04)__
### Fixes
diff --git a/tests/functional/restart/28-execution-timeout.t b/tests/functional/restart/28-execution-timeout.t
index e0b51fd4304..c26296b5429 100755
--- a/tests/functional/restart/28-execution-timeout.t
+++ b/tests/functional/restart/28-execution-timeout.t
@@ -17,17 +17,12 @@
#-------------------------------------------------------------------------------
# Test restart with running task with execution timeout.
. "$(dirname "$0")/test_header"
-set_test_number 4
+set_test_number 3
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play "${WORKFLOW_NAME}" --no-detach --reference-test
-contains_ok "${WORKFLOW_RUN_DIR}/log/job/1/foo/NN/job-activity.log" <<'__LOG__'
-[(('event-handler-00', 'execution timeout'), 1) cmd] echo 1/foo 'execution timeout'
-[(('event-handler-00', 'execution timeout'), 1) ret_code] 0
-[(('event-handler-00', 'execution timeout'), 1) out] 1/foo execution timeout
-__LOG__
purge
exit
From abf393b3c43a14cf838f5881e445968c32a978e8 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:02:07 +0100
Subject: [PATCH 09/17] put test back
---
tests/functional/restart/28-execution-timeout.t | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/tests/functional/restart/28-execution-timeout.t b/tests/functional/restart/28-execution-timeout.t
index c26296b5429..e0b51fd4304 100755
--- a/tests/functional/restart/28-execution-timeout.t
+++ b/tests/functional/restart/28-execution-timeout.t
@@ -17,12 +17,17 @@
#-------------------------------------------------------------------------------
# Test restart with running task with execution timeout.
. "$(dirname "$0")/test_header"
-set_test_number 3
+set_test_number 4
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play "${WORKFLOW_NAME}" --no-detach --reference-test
+contains_ok "${WORKFLOW_RUN_DIR}/log/job/1/foo/NN/job-activity.log" <<'__LOG__'
+[(('event-handler-00', 'execution timeout'), 1) cmd] echo 1/foo 'execution timeout'
+[(('event-handler-00', 'execution timeout'), 1) ret_code] 0
+[(('event-handler-00', 'execution timeout'), 1) out] 1/foo execution timeout
+__LOG__
purge
exit
From 376f1600d0292c039f3f7470dc04025c60c3edb9 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:36:04 +0100
Subject: [PATCH 10/17] update changelog
---
CHANGES.md | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 65ee51bd89a..34851588c61 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,12 @@ $ towncrier create ..md --content "Short description"
+## __cylc-8.2.1 (Upcoming)__
+# Fixes
+
+[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
+message being logged multiple times when polled.
+
## __cylc-8.2.0 (Released 2023-07-21)__
### Breaking Changes
@@ -126,10 +132,6 @@ resuming a workflow.
[#5625](https://github.com/cylc/cylc-flow/pull/5625) - Exclude `setuptools`
version (v67) which results in dependency check failure with editable installs.
-[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
-message being logged multiple times when polled.
-
-
## __cylc-8.1.4 (Released 2023-05-04)__
### Fixes
From bbacf0dcf34f9a32e0de576abc28fc2ab8f7cb38 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 26 Jul 2023 11:12:48 +0100
Subject: [PATCH 11/17] f
---
CHANGES.md | 6 ------
1 file changed, 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 34851588c61..483023c0dd4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,12 +11,6 @@ $ towncrier create ..md --content "Short description"
-## __cylc-8.2.1 (Upcoming)__
-# Fixes
-
-[#5652](https://github.com/cylc/cylc-flow/pull/5562) - Prevent the same
-message being logged multiple times when polled.
-
## __cylc-8.2.0 (Released 2023-07-21)__
### Breaking Changes
From 81dbabe5d42516186ed20f0b6a153f4d45e97d16 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 26 Jul 2023 11:15:10 +0100
Subject: [PATCH 12/17] Undo accidental update of version
---
cylc/flow/__init__.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py
index bc9f24aa573..ac1743a704f 100644
--- a/cylc/flow/__init__.py
+++ b/cylc/flow/__init__.py
@@ -53,7 +53,7 @@ def environ_init():
environ_init()
-__version__ = '8.3.0.dev'
+__version__ = '8.2.1.dev'
def iter_entry_points(entry_point_name):
From e36a254bf7318a926994137bf06b137849f4c6dc Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 22 Aug 2023 10:08:41 +0100
Subject: [PATCH 13/17] Only run severity recalculation if required
---
cylc/flow/task_events_mgr.py | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index ebb2d66f6ec..25292472389 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -830,16 +830,15 @@ def _process_message_check(
)
return False
- severity = cast(int, LOG_LEVELS.get(severity, INFO))
- # Demote log level to DEBUG if this is a message that duplicates what
- # gets logged by itask state change anyway (and not manual poll)
- if severity > DEBUG and flag != self.FLAG_POLLED and message in {
- self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
- self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
- }:
- severity = DEBUG
-
if not quiet:
+ severity = cast(int, LOG_LEVELS.get(severity, INFO))
+ # Demote log level to DEBUG if this is a message that duplicates what
+ # gets logged by itask state change anyway (and not manual poll)
+ if severity > DEBUG and flag != self.FLAG_POLLED and message in {
+ self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
+ self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
+ }:
+ severity = DEBUG
LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
return True
From bd18821776d47eaabac716cc665817a7bc10cb61 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 22 Aug 2023 10:38:40 +0100
Subject: [PATCH 14/17] do not write to db if message repeated
---
cylc/flow/task_events_mgr.py | 92 ++++++++++++++++++++++++------------
1 file changed, 61 insertions(+), 31 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 25292472389..ac0c5d085f4 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -631,7 +631,9 @@ def process_message(
and not itask.state.outputs.is_completed(TASK_OUTPUT_STARTED)):
self.setup_event_handlers(
itask, event_time,
- self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
+ self.EVENT_STARTED, f'job {self.EVENT_STARTED}',
+ repeat_message,
+ )
self.spawn_func(itask, TASK_OUTPUT_STARTED)
if message == self.EVENT_STARTED:
@@ -645,14 +647,15 @@ def process_message(
# one, so assume that a successful submission occurred and act
# accordingly. Note the submitted message is internal, whereas
# the started message comes in on the network.
- self._process_message_submitted(itask, event_time)
+ self._process_message_submitted(
+ itask, event_time, repeat_message)
self.spawn_func(itask, TASK_OUTPUT_SUBMITTED)
- self._process_message_started(itask, event_time)
+ self._process_message_started(itask, event_time, repeat_message)
self.spawn_func(itask, TASK_OUTPUT_STARTED)
elif message == self.EVENT_SUCCEEDED:
- self._process_message_succeeded(itask, event_time)
+ self._process_message_succeeded(itask, event_time, repeat_message)
self.spawn_func(itask, TASK_OUTPUT_SUCCEEDED)
elif message == self.EVENT_FAILED:
if (
@@ -661,7 +664,7 @@ def process_message(
):
return True
if self._process_message_failed(
- itask, event_time, self.JOB_FAILED):
+ itask, event_time, self.JOB_FAILED, repeat_message):
self.spawn_func(itask, TASK_OUTPUT_FAILED)
elif message == self.EVENT_SUBMIT_FAILED:
if (
@@ -672,7 +675,8 @@ def process_message(
if self._process_message_submit_failed(
itask,
event_time,
- submit_num
+ submit_num,
+ repeat_message,
):
self.spawn_func(itask, TASK_OUTPUT_SUBMIT_FAILED)
elif message == self.EVENT_SUBMITTED:
@@ -688,7 +692,8 @@ def process_message(
# If not in the preparing state we already assumed and handled
# job submission under the started event above...
# (sim mode does not have the job prep state)
- self._process_message_submitted(itask, event_time)
+ self._process_message_submitted(
+ itask, event_time, repeat_message)
self.spawn_func(itask, TASK_OUTPUT_SUBMITTED)
# ... but either way update the job ID in the job proxy (it only
@@ -708,11 +713,12 @@ def process_message(
):
return True
signal = message[len(FAIL_MESSAGE_PREFIX):]
- self._db_events_insert(itask, event_time, "signaled", signal)
+ if not repeat_message:
+ self._db_events_insert(itask, event_time, "signaled", signal)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
- itask, event_time, self.JOB_FAILED):
+ itask, event_time, self.JOB_FAILED, repeat_message):
self.spawn_func(itask, TASK_OUTPUT_FAILED)
elif message.startswith(ABORT_MESSAGE_PREFIX):
# Task aborted with message
@@ -722,14 +728,18 @@ def process_message(
):
return True
aborted_with = message[len(ABORT_MESSAGE_PREFIX):]
- self._db_events_insert(itask, event_time, "aborted", message)
+ if not repeat_message:
+ self._db_events_insert(itask, event_time, "aborted", message)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
- if self._process_message_failed(itask, event_time, aborted_with):
+ if self._process_message_failed(
+ itask, event_time, aborted_with, repeat_message
+ ):
self.spawn_func(itask, TASK_OUTPUT_FAILED)
elif message.startswith(VACATION_MESSAGE_PREFIX):
# Task job pre-empted into a vacation state
- self._db_events_insert(itask, event_time, "vacated", message)
+ if not repeat_message:
+ self._db_events_insert(itask, event_time, "vacated", message)
itask.set_summary_time('started') # unset
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
@@ -749,7 +759,7 @@ def process_message(
# Message of an as-yet unreported custom task output.
# No state change.
self.setup_event_handlers(
- itask, event_time, completed_trigger, message)
+ itask, event_time, completed_trigger, message, repeat_message)
self.spawn_func(itask, msg0)
else:
# Unhandled messages. These include:
@@ -758,11 +768,13 @@ def process_message(
# Note that all messages are logged already at the top.
# No state change.
LOG.debug(f"[{itask}] unhandled: {message}")
- self._db_events_insert(
- itask, event_time, (f"message {lseverity}"), message)
+ if not repeat_message:
+ self._db_events_insert(
+ itask, event_time, (f"message {lseverity}"), message)
if lseverity in self.NON_UNIQUE_EVENTS:
itask.non_unique_events.update({lseverity: 1})
- self.setup_event_handlers(itask, event_time, lseverity, message)
+ self.setup_event_handlers(
+ itask, event_time, lseverity, message, repeat_message)
return None
def _process_message_check(
@@ -832,8 +844,9 @@ def _process_message_check(
if not quiet:
severity = cast(int, LOG_LEVELS.get(severity, INFO))
- # Demote log level to DEBUG if this is a message that duplicates what
- # gets logged by itask state change anyway (and not manual poll)
+ # Demote log level to DEBUG if this is a message that duplicates
+ # what gets logged by itask state change anyway
+ # (and not manual poll)
if severity > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
@@ -843,7 +856,8 @@ def _process_message_check(
return True
def setup_event_handlers(
- self, itask: 'TaskProxy', event_time: str, event: str, message: str
+ self, itask: 'TaskProxy', event_time: str, event: str, message: str,
+ repeat_message: bool
) -> None:
"""Set up handlers for a task event."""
if itask.tdef.run_mode != 'live':
@@ -851,7 +865,8 @@ def setup_event_handlers(
msg = ""
if message != f"job {event}":
msg = message
- self._db_events_insert(itask, event_time, event, msg)
+ if not repeat_message:
+ self._db_events_insert(itask, event_time, event, msg)
self._setup_job_logs_retrieval(itask, event)
self._setup_event_mail(itask, event)
self._setup_custom_event_handlers(itask, event, message)
@@ -1124,7 +1139,9 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)
- def _process_message_failed(self, itask, event_time, message):
+ def _process_message_failed(
+ self, itask, event_time, message, repeat_message
+ ):
"""Helper for process_message, handle a failed message.
Return True if no retries (hence go to the failed state).
@@ -1147,7 +1164,9 @@ def _process_message_failed(self, itask, event_time, message):
# No retry lined up: definitive failure.
if itask.state_reset(TASK_STATUS_FAILED):
self.setup_event_handlers(
- itask, event_time, self.EVENT_FAILED, message)
+ itask, event_time, self.EVENT_FAILED, message,
+ repeat_message
+ )
self.data_store_mgr.delta_task_state(itask)
no_retries = True
else:
@@ -1158,11 +1177,11 @@ def _process_message_failed(self, itask, event_time, message):
LOG.warning(f"[{itask}] {delay_msg}")
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(
- itask, event_time, self.EVENT_RETRY, msg)
+ itask, event_time, self.EVENT_RETRY, msg, repeat_message)
self._reset_job_timers(itask)
return no_retries
- def _process_message_started(self, itask, event_time):
+ def _process_message_started(self, itask, event_time, repeat_message):
"""Helper for process_message, handle a started message."""
if itask.job_vacated:
itask.job_vacated = False
@@ -1176,7 +1195,9 @@ def _process_message_started(self, itask, event_time):
if itask.state_reset(TASK_STATUS_RUNNING):
self.setup_event_handlers(
itask, event_time,
- self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
+ self.EVENT_STARTED, f'job {self.EVENT_STARTED}',
+ repeat_message,
+ )
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
@@ -1184,7 +1205,7 @@ def _process_message_started(self, itask, event_time):
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
- def _process_message_succeeded(self, itask, event_time):
+ def _process_message_succeeded(self, itask, event_time, repeat_message):
"""Helper for process_message, handle a succeeded message."""
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
@@ -1203,11 +1224,15 @@ def _process_message_succeeded(self, itask, event_time):
if itask.state_reset(TASK_STATUS_SUCCEEDED):
self.setup_event_handlers(
itask, event_time,
- self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}")
+ self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}",
+ repeat_message,
+ )
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
- def _process_message_submit_failed(self, itask, event_time, submit_num):
+ def _process_message_submit_failed(
+ self, itask, event_time, submit_num, repeat_message
+ ):
"""Helper for process_message, handle a submit-failed message.
Return True if no retries (hence go to the submit-failed state).
@@ -1231,7 +1256,9 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
if itask.state_reset(TASK_STATUS_SUBMIT_FAILED):
self.setup_event_handlers(
itask, event_time, self.EVENT_SUBMIT_FAILED,
- f'job {self.EVENT_SUBMIT_FAILED}')
+ f'job {self.EVENT_SUBMIT_FAILED}',
+ repeat_message,
+ )
self.data_store_mgr.delta_task_state(itask)
else:
# There is a submission retry lined up.
@@ -1241,7 +1268,9 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
LOG.warning(f"[{itask}] {delay_msg}")
msg = f"job {self.EVENT_SUBMIT_FAILED}, {delay_msg}"
self.setup_event_handlers(
- itask, event_time, self.EVENT_SUBMIT_RETRY, msg)
+ itask, event_time, self.EVENT_SUBMIT_RETRY,
+ msg, repeat_message
+ )
# Register newly submit-failed job with the database and datastore.
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
@@ -1256,7 +1285,7 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
return no_retries
def _process_message_submitted(
- self, itask: 'TaskProxy', event_time: str
+ self, itask: 'TaskProxy', event_time: str, repeat_message: bool
) -> None:
"""Helper for process_message, handle a submit-succeeded message."""
with suppress(KeyError):
@@ -1291,6 +1320,7 @@ def _process_message_submitted(
event_time,
self.EVENT_SUBMITTED,
f'job {self.EVENT_SUBMITTED}',
+ repeat_message,
)
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_queued(itask)
From 001cc12df41c5beccb745e8e1f5ee6623a878092 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 22 Aug 2023 10:55:20 +0100
Subject: [PATCH 15/17] fixmypy
---
cylc/flow/task_events_mgr.py | 29 ++++++++++++-------
cylc/flow/task_job_mgr.py | 2 +-
.../cylc-poll/15-job-st-file-no-batch.t | 2 +-
3 files changed, 21 insertions(+), 12 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index ac0c5d085f4..5ae04fdfca4 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -409,7 +409,7 @@ def check_poll_time(itask, now=None):
itask.poll_timer.next(no_exhaust=True)
return True
- def check_job_time(self, itask, now):
+ def check_job_time(self, itask, now, repeat_message):
"""Check/handle job timeout and poll timer"""
can_poll = self.check_poll_time(itask, now)
if itask.timeout is None or now <= itask.timeout:
@@ -428,7 +428,9 @@ def check_job_time(self, itask, now):
if msg and event:
LOG.warning(f"[{itask}] {msg}")
self.setup_event_handlers(
- itask, get_time_string_from_unix_time(now), event, msg)
+ itask, get_time_string_from_unix_time(now), event, msg,
+ repeat_message,
+ )
return True
else:
return can_poll
@@ -843,21 +845,28 @@ def _process_message_check(
return False
if not quiet:
- severity = cast(int, LOG_LEVELS.get(severity, INFO))
+ severity_int = cast(int, LOG_LEVELS.get(severity, INFO))
# Demote log level to DEBUG if this is a message that duplicates
# what gets logged by itask state change anyway
# (and not manual poll)
- if severity > DEBUG and flag != self.FLAG_POLLED and message in {
- self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
- self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
- }:
- severity = DEBUG
- LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
+ if (
+ severity_int > DEBUG
+ and flag != self.FLAG_POLLED
+ and message in {
+ self.EVENT_SUBMITTED,
+ self.EVENT_STARTED,
+ self.EVENT_SUCCEEDED,
+ self.EVENT_SUBMIT_FAILED,
+ f'{FAIL_MESSAGE_PREFIX}ERR'
+ }
+ ):
+ severity_int = DEBUG
+ LOG.log(severity_int, f"[{itask}] {flag}{message}{timestamp}")
return True
def setup_event_handlers(
self, itask: 'TaskProxy', event_time: str, event: str, message: str,
- repeat_message: bool
+ repeat_message: bool = False
) -> None:
"""Set up handlers for a task event."""
if itask.tdef.run_mode != 'live':
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index f7b5df86c30..5f589d751e9 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -165,7 +165,7 @@ def check_task_jobs(self, workflow, task_pool):
now = time()
poll_tasks = set()
for itask in task_pool.get_tasks():
- if self.task_events_mgr.check_job_time(itask, now):
+ if self.task_events_mgr.check_job_time(itask, now, False):
poll_tasks.add(itask)
if itask.poll_timer.delay is not None:
LOG.info(
diff --git a/tests/functional/cylc-poll/15-job-st-file-no-batch.t b/tests/functional/cylc-poll/15-job-st-file-no-batch.t
index 4331cab1b63..6831c677041 100755
--- a/tests/functional/cylc-poll/15-job-st-file-no-batch.t
+++ b/tests/functional/cylc-poll/15-job-st-file-no-batch.t
@@ -30,5 +30,5 @@ run_ok "${TEST_NAME_BASE}-log-1" \
run_ok "${TEST_NAME_BASE}-log-2" \
grep -E '1/t1 running .*\(polled\)failed' "${LOG}"
-purge
+# purge
exit
From b2c1c548c1f6fb38b5d2c341e8f099c4e18e5200 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 23 Aug 2023 11:16:27 +0100
Subject: [PATCH 16/17] Update
tests/functional/cylc-poll/15-job-st-file-no-batch.t
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
tests/functional/cylc-poll/15-job-st-file-no-batch.t | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/tests/functional/cylc-poll/15-job-st-file-no-batch.t b/tests/functional/cylc-poll/15-job-st-file-no-batch.t
index 6831c677041..8ca0d77d398 100755
--- a/tests/functional/cylc-poll/15-job-st-file-no-batch.t
+++ b/tests/functional/cylc-poll/15-job-st-file-no-batch.t
@@ -30,5 +30,4 @@ run_ok "${TEST_NAME_BASE}-log-1" \
run_ok "${TEST_NAME_BASE}-log-2" \
grep -E '1/t1 running .*\(polled\)failed' "${LOG}"
-# purge
-exit
+purge
From 31f8b7cbea97a677bdabfd3a299375cbd357ebff Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 23 Aug 2023 16:18:20 +0100
Subject: [PATCH 17/17] Apply suggestions from code review
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
tests/functional/cylc-message/01-newline.t | 1 -
tests/functional/restart/58-removed-task.t | 8 +++++---
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/tests/functional/cylc-message/01-newline.t b/tests/functional/cylc-message/01-newline.t
index 2c984bbe124..a0fceca4c57 100755
--- a/tests/functional/cylc-message/01-newline.t
+++ b/tests/functional/cylc-message/01-newline.t
@@ -45,7 +45,6 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLO
LOG="${WORKFLOW_RUN_DIR}/log/job/1/foo/01/job-activity.log"
sed -n '/event-handler-00/,$p' "${LOG}" >'edited-job-activity.log'
sed -i '/job-logs-retrieve/d' 'edited-job-activity.log'
-sed -i '/.*succeeded.*/d' 'edited-job-activity.log'
cmp_ok 'edited-job-activity.log' - <<__LOG__
[(('event-handler-00', 'custom-1'), 1) cmd]
diff --git a/tests/functional/restart/58-removed-task.t b/tests/functional/restart/58-removed-task.t
index 59839c751c1..eee6b8e4aa5 100755
--- a/tests/functional/restart/58-removed-task.t
+++ b/tests/functional/restart/58-removed-task.t
@@ -22,7 +22,7 @@
. "$(dirname "$0")/test_header"
-set_test_number 6
+set_test_number 8
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
@@ -39,9 +39,11 @@ workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}"
TEST_NAME="${TEST_NAME_BASE}-restart"
workflow_run_ok "${TEST_NAME}" cylc play --set="INCL_B_C=False" --no-detach "${WORKFLOW_NAME}"
-grep_workflow_log_ok "grep-3" "=> failed"
+grep_ok '\[jobs-poll out\] .* "run_status": 0' "${WORKFLOW_RUN_DIR}/log/job/1/a/NN/job-activity.log"
+grep_ok '\[jobs-poll out\] .* "run_status": 1' "${WORKFLOW_RUN_DIR}/log/job/1/b/NN/job-activity.log"
+grep_workflow_log_ok "grep-3" "\[1/b .* => failed"
# Failed (but not incomplete) task c should not have been polled.
-grep_fail "\[1/c failed job:01 flows:1\] failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log"
+grep_fail "jobs-poll" "${WORKFLOW_RUN_DIR}/log/job/1/c/NN/job-activity.log"
purge