diff --git a/changes.d/5562.fix.md b/changes.d/5562.fix.md new file mode 100644 index 00000000000..f1ff0d8cd5b --- /dev/null +++ b/changes.d/5562.fix.md @@ -0,0 +1 @@ +Prevent the same message being logged multiple times when polled. \ No newline at end of file diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index c17fca165e6..50d7f0e31db 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -1088,3 +1088,25 @@ def select_jobs_for_datastore( def vacuum(self): """Vacuum to the database.""" return self.connect().execute("VACUUM") + + def message_in_db(self, itask, event_time, submit_num, message): + """Has this message been logged in the DB (task_events table)? + + If the event is a standard task message (e.g. Submit) then + there will be no message, so we check the event column too. + """ + task_name = itask.tokens['task'] + cycle = itask.tokens['cycle'] + stmt = r""" + SELECT EXISTS ( + SELECT 1 FROM task_events WHERE + name == ? + AND cycle == ? + AND (message == ? OR event == ?) + AND submit_num == ? + AND time == ? + ) + """ + stmt_args = [ + task_name, cycle, message, message, submit_num, event_time] + return bool(self.connect().execute(stmt, stmt_args).fetchone()[0]) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..2379a044ae7 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1518,7 +1518,8 @@ def late_tasks_check(self): itask.is_late = True LOG.warning(f"[{itask}] {msg}") self.task_events_mgr.setup_event_handlers( - itask, self.task_events_mgr.EVENT_LATE, msg) + itask, time2str(now), + self.task_events_mgr.EVENT_LATE, msg) self.workflow_db_mgr.put_insert_task_late_flags(itask) def reset_inactivity_timer(self): @@ -1747,6 +1748,7 @@ async def main_loop(self) -> None: self.broadcast_mgr.expire_broadcast(self.pool.get_min_point()) self.late_tasks_check() + self.process_workflow_db_queue() self.process_queued_task_messages() await self.process_command_queue() self.task_events_mgr.process_events(self) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 9a8a19ba18a..398da816ed6 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -75,6 +75,7 @@ TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED) from cylc.flow.wallclock import ( get_current_time_string, + get_time_string_from_unix_time, get_seconds_as_interval_string as intvl_as_str ) from cylc.flow.workflow_events import ( @@ -408,7 +409,7 @@ def check_poll_time(itask, now=None): itask.poll_timer.next(no_exhaust=True) return True - def check_job_time(self, itask, now): + def check_job_time(self, itask, now, repeat_message): """Check/handle job timeout and poll timer""" can_poll = self.check_poll_time(itask, now) if itask.timeout is None or now <= itask.timeout: @@ -426,7 +427,10 @@ def check_job_time(self, itask, now): itask.timeout = None # emit event only once if msg and event: LOG.warning(f"[{itask}] {msg}") - self.setup_event_handlers(itask, event, msg) + self.setup_event_handlers( + itask, get_time_string_from_unix_time(now), event, msg, + repeat_message, + ) return True else: return can_poll @@ -587,8 +591,20 @@ def process_message( # Any message represents activity. self.reset_inactivity_timer_func() + # In live mode check whether the message has been recieved and + # put in the database before: + repeat_message = False + if ( + itask.tdef.run_mode == 'live' + and self.workflow_db_mgr.pri_dao.message_in_db( + itask, event_time, submit_num, message) + ): + repeat_message = True + if not self._process_message_check( - itask, severity, message, event_time, flag, submit_num): + itask, severity, message, event_time, flag, submit_num, + quiet=repeat_message, + ): return None # always update the workflow state summary for latest message @@ -596,14 +612,15 @@ def process_message( new_msg = f'{message} {self.FLAG_POLLED}' else: new_msg = message - self.data_store_mgr.delta_job_msg( - itask.tokens.duplicate(job=str(submit_num)), - new_msg - ) + + if not repeat_message: + self.data_store_mgr.delta_job_msg( + itask.tokens.duplicate(job=str(submit_num)), + new_msg + ) # Satisfy my output, if possible, and spawn children. # (first remove signal: failed/EXIT -> failed) - msg0 = message.split('/')[0] completed_trigger = itask.state.outputs.set_msg_trg_completion( message=msg0, is_completed=True) @@ -615,7 +632,10 @@ def process_message( self.EVENT_STARTED] and not itask.state.outputs.is_completed(TASK_OUTPUT_STARTED)): self.setup_event_handlers( - itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}') + itask, event_time, + self.EVENT_STARTED, f'job {self.EVENT_STARTED}', + repeat_message, + ) self.spawn_func(itask, TASK_OUTPUT_STARTED) if message == self.EVENT_STARTED: @@ -629,14 +649,15 @@ def process_message( # one, so assume that a successful submission occurred and act # accordingly. Note the submitted message is internal, whereas # the started message comes in on the network. - self._process_message_submitted(itask, event_time) + self._process_message_submitted( + itask, event_time, repeat_message) self.spawn_func(itask, TASK_OUTPUT_SUBMITTED) - self._process_message_started(itask, event_time) + self._process_message_started(itask, event_time, repeat_message) self.spawn_func(itask, TASK_OUTPUT_STARTED) elif message == self.EVENT_SUCCEEDED: - self._process_message_succeeded(itask, event_time) + self._process_message_succeeded(itask, event_time, repeat_message) self.spawn_func(itask, TASK_OUTPUT_SUCCEEDED) elif message == self.EVENT_FAILED: if ( @@ -645,7 +666,7 @@ def process_message( ): return True if self._process_message_failed( - itask, event_time, self.JOB_FAILED): + itask, event_time, self.JOB_FAILED, repeat_message): self.spawn_func(itask, TASK_OUTPUT_FAILED) elif message == self.EVENT_SUBMIT_FAILED: if ( @@ -656,7 +677,8 @@ def process_message( if self._process_message_submit_failed( itask, event_time, - submit_num + submit_num, + repeat_message, ): self.spawn_func(itask, TASK_OUTPUT_SUBMIT_FAILED) elif message == self.EVENT_SUBMITTED: @@ -672,7 +694,8 @@ def process_message( # If not in the preparing state we already assumed and handled # job submission under the started event above... # (sim mode does not have the job prep state) - self._process_message_submitted(itask, event_time) + self._process_message_submitted( + itask, event_time, repeat_message) self.spawn_func(itask, TASK_OUTPUT_SUBMITTED) # ... but either way update the job ID in the job proxy (it only @@ -692,11 +715,12 @@ def process_message( ): return True signal = message[len(FAIL_MESSAGE_PREFIX):] - self._db_events_insert(itask, "signaled", signal) + if not repeat_message: + self._db_events_insert(itask, event_time, "signaled", signal) self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": signal}) if self._process_message_failed( - itask, event_time, self.JOB_FAILED): + itask, event_time, self.JOB_FAILED, repeat_message): self.spawn_func(itask, TASK_OUTPUT_FAILED) elif message.startswith(ABORT_MESSAGE_PREFIX): # Task aborted with message @@ -706,14 +730,18 @@ def process_message( ): return True aborted_with = message[len(ABORT_MESSAGE_PREFIX):] - self._db_events_insert(itask, "aborted", message) + if not repeat_message: + self._db_events_insert(itask, event_time, "aborted", message) self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": aborted_with}) - if self._process_message_failed(itask, event_time, aborted_with): + if self._process_message_failed( + itask, event_time, aborted_with, repeat_message + ): self.spawn_func(itask, TASK_OUTPUT_FAILED) elif message.startswith(VACATION_MESSAGE_PREFIX): # Task job pre-empted into a vacation state - self._db_events_insert(itask, "vacated", message) + if not repeat_message: + self._db_events_insert(itask, event_time, "vacated", message) itask.set_summary_time('started') # unset if TimerFlags.SUBMISSION_RETRY in itask.try_timers: itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0 @@ -728,11 +756,12 @@ def process_message( # this feature can only be used on the deprecated loadleveler # system, we should probably aim to remove support for job vacation # instead. Otherwise, we should have: - # self.setup_event_handlers(itask, 'vacated', message) + # self.setup_event_handlers(itask, event_time, 'vacated', message) elif completed_trigger: # Message of an as-yet unreported custom task output. # No state change. - self.setup_event_handlers(itask, completed_trigger, message) + self.setup_event_handlers( + itask, event_time, completed_trigger, message, repeat_message) self.spawn_func(itask, msg0) else: # Unhandled messages. These include: @@ -741,11 +770,13 @@ def process_message( # Note that all messages are logged already at the top. # No state change. LOG.debug(f"[{itask}] unhandled: {message}") - self._db_events_insert( - itask, (f"message {lseverity}"), message) + if not repeat_message: + self._db_events_insert( + itask, event_time, (f"message {lseverity}"), message) if lseverity in self.NON_UNIQUE_EVENTS: itask.non_unique_events.update({lseverity: 1}) - self.setup_event_handlers(itask, lseverity, message) + self.setup_event_handlers( + itask, event_time, lseverity, message, repeat_message) return None def _process_message_check( @@ -756,6 +787,7 @@ def _process_message_check( event_time: str, flag: str, submit_num: int, + quiet: bool = False, ) -> bool: """Helper for `.process_message`. @@ -767,6 +799,7 @@ def _process_message_check( timestamp = f" at {event_time}" else: timestamp = "" + if flag == self.FLAG_RECEIVED and submit_num != itask.submit_num: # Ignore received messages from old jobs LOG.warning( @@ -811,25 +844,38 @@ def _process_message_check( ) return False - severity = cast(int, LOG_LEVELS.get(severity, INFO)) - # Demote log level to DEBUG if this is a message that duplicates what - # gets logged by itask state change anyway (and not manual poll) - if severity > DEBUG and flag != self.FLAG_POLLED and message in { - self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED, - self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR' - }: - severity = DEBUG - LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}") + if not quiet: + severity_int = cast(int, LOG_LEVELS.get(severity, INFO)) + # Demote log level to DEBUG if this is a message that duplicates + # what gets logged by itask state change anyway + # (and not manual poll) + if ( + severity_int > DEBUG + and flag != self.FLAG_POLLED + and message in { + self.EVENT_SUBMITTED, + self.EVENT_STARTED, + self.EVENT_SUCCEEDED, + self.EVENT_SUBMIT_FAILED, + f'{FAIL_MESSAGE_PREFIX}ERR' + } + ): + severity_int = DEBUG + LOG.log(severity_int, f"[{itask}] {flag}{message}{timestamp}") return True - def setup_event_handlers(self, itask, event, message): + def setup_event_handlers( + self, itask: 'TaskProxy', event_time: str, event: str, message: str, + repeat_message: bool = False + ) -> None: """Set up handlers for a task event.""" if itask.tdef.run_mode != 'live': return msg = "" if message != f"job {event}": msg = message - self._db_events_insert(itask, event, msg) + if not repeat_message: + self._db_events_insert(itask, event_time, event, msg) self._setup_job_logs_retrieval(itask, event) self._setup_event_mail(itask, event) self._setup_custom_event_handlers(itask, event, message) @@ -843,10 +889,10 @@ def _custom_handler_callback(self, ctx, schd, id_key): else: self.unset_waiting_event_timer(id_key) - def _db_events_insert(self, itask, event="", message=""): + def _db_events_insert(self, itask, event_time, event="", message=""): """Record an event to the DB.""" self.workflow_db_mgr.put_insert_task_events(itask, { - "time": get_current_time_string(), + "time": event_time, "event": event, "message": message}) @@ -1102,7 +1148,9 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): if itask.state_reset(TASK_STATUS_WAITING): self.data_store_mgr.delta_task_state(itask) - def _process_message_failed(self, itask, event_time, message): + def _process_message_failed( + self, itask, event_time, message, repeat_message + ): """Helper for process_message, handle a failed message. Return True if no retries (hence go to the failed state). @@ -1124,7 +1172,10 @@ def _process_message_failed(self, itask, event_time, message): ): # No retry lined up: definitive failure. if itask.state_reset(TASK_STATUS_FAILED): - self.setup_event_handlers(itask, self.EVENT_FAILED, message) + self.setup_event_handlers( + itask, event_time, self.EVENT_FAILED, message, + repeat_message + ) self.data_store_mgr.delta_task_state(itask) no_retries = True else: @@ -1134,11 +1185,12 @@ def _process_message_failed(self, itask, event_time, message): delay_msg = f"retrying in {timer.delay_timeout_as_str()}" LOG.warning(f"[{itask}] {delay_msg}") msg = f"{self.JOB_FAILED}, {delay_msg}" - self.setup_event_handlers(itask, self.EVENT_RETRY, msg) + self.setup_event_handlers( + itask, event_time, self.EVENT_RETRY, msg, repeat_message) self._reset_job_timers(itask) return no_retries - def _process_message_started(self, itask, event_time): + def _process_message_started(self, itask, event_time, repeat_message): """Helper for process_message, handle a started message.""" if itask.job_vacated: itask.job_vacated = False @@ -1151,7 +1203,10 @@ def _process_message_started(self, itask, event_time): "time_run": itask.summary['started_time_string']}) if itask.state_reset(TASK_STATUS_RUNNING): self.setup_event_handlers( - itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}') + itask, event_time, + self.EVENT_STARTED, f'job {self.EVENT_STARTED}', + repeat_message, + ) self.data_store_mgr.delta_task_state(itask) self._reset_job_timers(itask) @@ -1159,7 +1214,7 @@ def _process_message_started(self, itask, event_time): if TimerFlags.SUBMISSION_RETRY in itask.try_timers: itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0 - def _process_message_succeeded(self, itask, event_time): + def _process_message_succeeded(self, itask, event_time, repeat_message): """Helper for process_message, handle a succeeded message.""" job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) @@ -1177,11 +1232,16 @@ def _process_message_succeeded(self, itask, event_time): itask.summary['started_time']) if itask.state_reset(TASK_STATUS_SUCCEEDED): self.setup_event_handlers( - itask, self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}") + itask, event_time, + self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}", + repeat_message, + ) self.data_store_mgr.delta_task_state(itask) self._reset_job_timers(itask) - def _process_message_submit_failed(self, itask, event_time, submit_num): + def _process_message_submit_failed( + self, itask, event_time, submit_num, repeat_message + ): """Helper for process_message, handle a submit-failed message. Return True if no retries (hence go to the submit-failed state). @@ -1204,8 +1264,10 @@ def _process_message_submit_failed(self, itask, event_time, submit_num): no_retries = True if itask.state_reset(TASK_STATUS_SUBMIT_FAILED): self.setup_event_handlers( - itask, self.EVENT_SUBMIT_FAILED, - f'job {self.EVENT_SUBMIT_FAILED}') + itask, event_time, self.EVENT_SUBMIT_FAILED, + f'job {self.EVENT_SUBMIT_FAILED}', + repeat_message, + ) self.data_store_mgr.delta_task_state(itask) else: # There is a submission retry lined up. @@ -1214,7 +1276,10 @@ def _process_message_submit_failed(self, itask, event_time, submit_num): delay_msg = f"retrying in {timer.delay_timeout_as_str()}" LOG.warning(f"[{itask}] {delay_msg}") msg = f"job {self.EVENT_SUBMIT_FAILED}, {delay_msg}" - self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg) + self.setup_event_handlers( + itask, event_time, self.EVENT_SUBMIT_RETRY, + msg, repeat_message + ) # Register newly submit-failed job with the database and datastore. job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) @@ -1229,7 +1294,7 @@ def _process_message_submit_failed(self, itask, event_time, submit_num): return no_retries def _process_message_submitted( - self, itask: 'TaskProxy', event_time: str + self, itask: 'TaskProxy', event_time: str, repeat_message: bool ) -> None: """Helper for process_message, handle a submit-succeeded message.""" with suppress(KeyError): @@ -1261,8 +1326,10 @@ def _process_message_submitted( itask.state_reset(is_queued=False) self.setup_event_handlers( itask, + event_time, self.EVENT_SUBMITTED, f'job {self.EVENT_SUBMITTED}', + repeat_message, ) self.data_store_mgr.delta_task_state(itask) self.data_store_mgr.delta_task_queued(itask) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 20ee7379d27..597600ae4ef 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -165,7 +165,7 @@ def check_task_jobs(self, workflow, task_pool): now = time() poll_tasks = set() for itask in task_pool.get_tasks(): - if self.task_events_mgr.check_job_time(itask, now): + if self.task_events_mgr.check_job_time(itask, now, False): poll_tasks.add(itask) if itask.poll_timer.delay is not None: LOG.info( @@ -1009,7 +1009,7 @@ def _simulation_submit_task_jobs(self, itasks, workflow): self.get_simulation_job_conf(itask, workflow) ) self.task_events_mgr.process_message( - itask, INFO, TASK_OUTPUT_SUBMITTED + itask, INFO, TASK_OUTPUT_SUBMITTED, ) return itasks diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 697bc72a155..f2b987bf024 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -67,7 +67,8 @@ serialise, deserialise ) -from cylc.flow.wallclock import get_current_time_string +from cylc.flow.wallclock import ( + get_current_time_string, get_time_string_from_unix_time) from cylc.flow.platforms import get_platform from cylc.flow.task_queues.independent import IndepQueueManager @@ -1819,10 +1820,12 @@ def _set_expired_task(self, itask): itask.expire_time = ( itask.get_point_as_seconds() + itask.get_offset_as_seconds(itask.tdef.expiration_offset)) - if time() > itask.expire_time: + now = time() + if now > itask.expire_time: msg = 'Task expired (skipping job).' LOG.warning(f"[{itask}] {msg}") - self.task_events_mgr.setup_event_handlers(itask, "expired", msg) + self.task_events_mgr.setup_event_handlers( + itask, get_time_string_from_unix_time(now), "expired", msg) # TODO succeeded and expired states are useless due to immediate # removal under all circumstances (unhandled failed is still used). if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False): diff --git a/tests/functional/cylc-poll/15-job-st-file-no-batch.t b/tests/functional/cylc-poll/15-job-st-file-no-batch.t index 4331cab1b63..8ca0d77d398 100755 --- a/tests/functional/cylc-poll/15-job-st-file-no-batch.t +++ b/tests/functional/cylc-poll/15-job-st-file-no-batch.t @@ -31,4 +31,3 @@ run_ok "${TEST_NAME_BASE}-log-2" \ grep -E '1/t1 running .*\(polled\)failed' "${LOG}" purge -exit diff --git a/tests/functional/restart/58-removed-task.t b/tests/functional/restart/58-removed-task.t index 17dc19f626e..eee6b8e4aa5 100755 --- a/tests/functional/restart/58-removed-task.t +++ b/tests/functional/restart/58-removed-task.t @@ -22,7 +22,7 @@ . "$(dirname "$0")/test_header" -set_test_number 7 +set_test_number 8 install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" @@ -39,10 +39,11 @@ workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}" TEST_NAME="${TEST_NAME_BASE}-restart" workflow_run_ok "${TEST_NAME}" cylc play --set="INCL_B_C=False" --no-detach "${WORKFLOW_NAME}" -grep_workflow_log_ok "grep-3" "\[1/a running job:01 flows:1\] (polled)started" -grep_workflow_log_ok "grep-4" "\[1/b failed job:01 flows:1\] (polled)failed" +grep_ok '\[jobs-poll out\] .* "run_status": 0' "${WORKFLOW_RUN_DIR}/log/job/1/a/NN/job-activity.log" +grep_ok '\[jobs-poll out\] .* "run_status": 1' "${WORKFLOW_RUN_DIR}/log/job/1/b/NN/job-activity.log" +grep_workflow_log_ok "grep-3" "\[1/b .* => failed" # Failed (but not incomplete) task c should not have been polled. -grep_fail "\[1/c failed job:01 flows:1\] (polled)failed" "${WORKFLOW_RUN_DIR}/log/scheduler/log" +grep_fail "jobs-poll" "${WORKFLOW_RUN_DIR}/log/job/1/c/NN/job-activity.log" purge diff --git a/tests/functional/restart/58-removed-task/flow.cylc b/tests/functional/restart/58-removed-task/flow.cylc index 94c5cf27b24..f1166b96824 100644 --- a/tests/functional/restart/58-removed-task/flow.cylc +++ b/tests/functional/restart/58-removed-task/flow.cylc @@ -25,8 +25,7 @@ cylc__job__poll_grep_workflow_log "1/b .*failed" cylc__job__poll_grep_workflow_log "1/c .*failed" cylc stop --now $CYLC_WORKFLOW_ID - cylc__job__poll_grep_workflow_log "1/a .*(polled)started" - cylc__job__poll_grep_workflow_log "1/b .*(polled)failed" + cylc__job__poll_grep_workflow_log "1/b .*failed" """ [[b, c]] script = "false" diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/59-waiting-manual-triggered.t similarity index 100% rename from tests/functional/restart/58-waiting-manual-triggered.t rename to tests/functional/restart/59-waiting-manual-triggered.t diff --git a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc b/tests/functional/restart/59-waiting-manual-triggered/flow.cylc similarity index 100% rename from tests/functional/restart/58-waiting-manual-triggered/flow.cylc rename to tests/functional/restart/59-waiting-manual-triggered/flow.cylc diff --git a/tests/functional/startup/00-state-summary.t b/tests/functional/startup/00-state-summary.t index a4a02208899..73b206f8d07 100644 --- a/tests/functional/startup/00-state-summary.t +++ b/tests/functional/startup/00-state-summary.t @@ -28,9 +28,9 @@ run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}" #------------------------------------------------------------------------------- # Workflow runs and shuts down with a failed task. cylc play --no-detach "${WORKFLOW_NAME}" > /dev/null 2>&1 + # Restart with a failed task and a succeeded task. -cylc play "${WORKFLOW_NAME}" -poll_grep_workflow_log -E '1/foo .* \(polled\)failed' +cylc play "${WORKFLOW_NAME}" > /dev/null 2>&1 cylc dump "${WORKFLOW_NAME}" > dump.out TEST_NAME=${TEST_NAME_BASE}-grep # State summary should not just say "Initializing..." diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 2f1494ee26b..c9fbd5000b9 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -17,6 +17,7 @@ from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext from cylc.flow.scheduler import Scheduler +from pathlib import Path from typing import Any as Fixture @@ -44,6 +45,37 @@ async def test_process_job_logs_retrieval_warns_no_platform( assert 'Unable to retrieve' in warning.msg +async def test_process_message_no_repeat( + one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture +): + """Don't log received messages if they are found again when.""" + reg: str = flow(one_conf) + schd: 'Scheduler' = scheduler(reg, paused_start=True) + message: str = 'The dead swans lay in the stagnant pool' + message_time: str = 'Thursday Lunchtime' + + async with run(schd) as log: + # Set up the database with a message already received: + itask = schd.pool.get_tasks()[0] + itask.tdef.run_mode = 'live' + schd.workflow_db_mgr.put_insert_task_events( + itask, {'time': message_time, 'event': '', 'message': message}) + schd.process_workflow_db_queue() + + # Task event manager returns None: + assert schd.task_events_mgr.process_message( + itask=itask, severity='comical', message=message, + event_time=message_time, submit_num=0, + flag=schd.task_events_mgr.FLAG_POLLED + ) is None + + # Log doesn't contain a repeat message: + assert ( + schd.task_events_mgr.FLAG_POLLED + not in log.records[-1].message + ) + + async def test__reset_job_timers( one_conf: Fixture, flow: Fixture, scheduler: Fixture, start: Fixture, caplog: Fixture, mock_glbl_cfg: Fixture, diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py index 372162b1387..382709b94a5 100644 --- a/tests/unit/test_rundb.py +++ b/tests/unit/test_rundb.py @@ -24,6 +24,7 @@ from types import SimpleNamespace import pytest +from pytest import param from cylc.flow.exceptions import PlatformLookupError from cylc.flow.rundb import CylcWorkflowDAO @@ -175,3 +176,81 @@ def callback(index, row): match='not defined.*\n.*foo.*\n.*bar' ): dao.select_task_pool_for_restart(callback) + + +@pytest.fixture(scope='module') +def setup_message_in_db(tmp_path_factory): + """Create a database for use when testing message_in_db""" + tmp_path = tmp_path_factory.mktemp('message_in_db') + db_file = tmp_path / 'db' + setup_stmt = r""" + INSERT INTO task_events + VALUES + ( + "qux", "-191", + "morning", "1", "started", ""), + ( + "qux", "-190", + "afternoon", "1", "message critical", "Hello Rome"); + """ + with CylcWorkflowDAO(db_file, create_tables=True) as dao: + dao.connect().execute(setup_stmt) + yield dao + + +@pytest.mark.parametrize( + 'query, expect', + ( + param( + ( + SimpleNamespace(tokens={'task': 'qux', 'cycle': '-191'}), + "morning", 1, "started", + ), + True, + id="event-name-in-db" + ), + param( + ( + SimpleNamespace(tokens={'task': 'qux', 'cycle': '-190'}), + "afternoon", 1, "Hello Rome", + ), + True, + id="message-in-db" + ), + ) +) +def test_message_in_db(setup_message_in_db, query, expect): + """Method correctly says if message is in DB. + """ + assert setup_message_in_db.message_in_db(*query) is expect + + +def test_message_not_in_db(setup_message_in_db): + """Method correctly says if message is NOT in DB: + """ + def use_message_in_db(args_): + """Gather up boilerplate of setting up a fake itask and + providing args to message_in_db.""" + itask = SimpleNamespace( + tokens={'task': args_['name'], 'cycle': args_['cycle']}) + return setup_message_in_db.message_in_db( + itask, args_['event_time'], args_['submit'], args_['message']) + + # A list of args which _should_ be in the db: + args_ = { + 'name': "qux", + 'cycle': '-190', + 'event_time': "afternoon", + 'submit': 1, + 'message': 'Hello Rome', + } + + # Control - can we get True if all args are correct? + assert use_message_in_db(args_) is True + + # One at a time break each arg: + for key in args_: + old_arg = args_[key] + args_[key] = 'garbage' + assert use_message_in_db(args_) is False + args_[key] = old_arg