Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task_job_mgr: make try/except more strict #6338

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
PlatformLookupError,
WorkflowConfigError,
)
import cylc.flow.flags
from cylc.flow.hostuserutil import (
get_host,
is_remote_platform
Expand Down Expand Up @@ -756,20 +757,35 @@ def _manip_task_jobs_callback(
for line in out.splitlines(True):
for prefix, callback in handlers:
if line.startswith(prefix):
# process one line of the output
line = line[len(prefix):].strip()
exc = None
try:
# TODO this massive try block should be unpacked.
path = line.split("|", 2)[1] # timestamp, path, status
point, name, submit_num = path.split(os.sep, 2)
except IndexError as _exc:
exc = _exc
else:
if prefix == self.job_runner_mgr.OUT_PREFIX_SUMMARY:
del bad_tasks[(point, name, submit_num)]
itask = tasks[(point, name, submit_num)]
callback(workflow, itask, ctx, line)
except (LookupError, ValueError) as exc:
# (Note this catches KeyError too).
try:
del bad_tasks[(point, name, submit_num)]
except KeyError as _exc:
exc = _exc
else:
itask = tasks[(point, name, submit_num)]
# NOTE: no error catching for the callback
callback(workflow, itask, ctx, line)

if exc:
# log any non-fatal errors that occurred
LOG.warning(
'Unhandled %s output: %s', ctx.cmd_key, line)
LOG.warning(str(exc))
'Unhandled %s output: %s\n%s: %s',
ctx.cmd_key,
line,
exc.__class__.__name__,
exc,
)

# Task jobs that are in the original command but did not get a status
# in the output. Handle as failures.
for key, itask in sorted(bad_tasks.items()):
Expand Down
152 changes: 152 additions & 0 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
import logging
from typing import Any as Fixture

import pytest

from cylc.flow import CYLC_LOG
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.job_runner_mgr import JobRunnerManager
from cylc.flow.scheduler import Scheduler
from cylc.flow.subprocctx import SubProcContext
from cylc.flow.task_state import TASK_STATUS_RUNNING

OUT_PREFIX = JobRunnerManager.OUT_PREFIX_SUMMARY


async def test_run_job_cmd_no_hosts_error(
Expand Down Expand Up @@ -187,3 +193,149 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit(
schd.task_job_mgr._prep_submit_task_job(
schd.workflow, task_a)
assert not task_a.summary.get('execution_time_limit', '')


async def test_manip_task_jobs_callback(flow, scheduler, start, log_filter):
"""Test the _manip_task_jobs_callback function.

This function should handle and log:
* Invalid lines in output.
* Output for tasks it wasn't expecting to find.
* Missing output for tasks it was expecting to find.

This function should not handle or log:
* Exceptions in callbacks, these are internal errors which should cause a
crash.
"""
# create a dummy subprocess call
ctx = SubProcContext('my-key', 'my-cmd')
ctx.ret_code = 0

# create standin callback functions
calls = []

def callback(*args, **kwargs):
"""A callback which logs the call and passes."""
nonlocal calls
calls.append((args, kwargs))

def error_callback(*args, **kwargs):
"""A callback which logs the call and fails."""
callback(*args, **kwargs)
raise Exception('exception-in-callback')

def get_callback_calls():
"""Return all callback calls and clear the calls list.

Returns:
{(task_name, state), ...}

"""
nonlocal calls
ret = {
(call[0][1].identity, call[0][3].split('|')[-1].strip())
for call in calls
}
calls.clear()
return ret

# create a workflow with two tasks
id_ = flow({
'scheduling': {
'graph': {'R1': 'one & two'}
}
})
schd = scheduler(id_)
schd: Scheduler
async with start(schd) as log:
# put the two tasks into the submitted state
one = schd.pool.get_task(IntegerPoint('1'), 'one')
one.state.reset('submitted')
one.submit_num = 1
two = schd.pool.get_task(IntegerPoint('1'), 'two')
two.state.reset('submitted')
two.submit_num = 1

# -- Case 1
# test command output with a mixture of valid / invalid / irrelevant
# items
ctx.out = '\n'.join((
# invalid command output:
f'{OUT_PREFIX} elephant',
# valid output for a task we were not expecting:
f'{OUT_PREFIX} 20000101T00Z|1/no-such-task/01|running',
# valid output for a task we were expecting:
f'{OUT_PREFIX} 20000101T00Z|1/one/01|running',
))

# it should log the errors along with the exception summary
schd.task_job_mgr._manip_task_jobs_callback(
ctx, schd.workflow, schd.pool.get_tasks(), callback,
)
assert log_filter(
log,
contains=(
"Unhandled my-key output: 20000101T00Z|1/no-such-task/01|running"
"\nKeyError: ('1', 'no-such-task', '01')"
),
)
assert log_filter(
log,
contains=(
'Unhandled my-key output: elephant'
'\nIndexError: list index out of range'
),
)
log.clear()

# it should make two callback calls:
assert get_callback_calls() == {
# one call for the task "one" with the "running" status received in
# the message:
('1/one', 'running'),
# one call for the task "two" (which we expected to appear in the
# output but wasn't there) with the error status "1":
('1/two', '1')
}

# -- Case 2
# test an exception in the callback for valid output
ctx.out = '\n'.join((
# invalid command output:
f'{OUT_PREFIX} elephant',
# valid output for a task we were expecting:
f'{OUT_PREFIX} 20000101T00Z|1/one/01|running',
))

# the exception should not be caught - it will kill the scheduler
with pytest.raises(Exception, match='exception-in-callback'):
schd.task_job_mgr._manip_task_jobs_callback(
ctx, schd.workflow, schd.pool.get_tasks(), error_callback,
)

# it should log any errors that occur before the failure
assert log_filter(log, contains='Unhandled my-key output: elephant')
log.clear()

# the callback should have been called for the one valid entry
assert get_callback_calls() == {('1/one', 'running')}

# -- Case 3
# test an exception in the callback for a "bad_task"
ctx.out = '\n'.join((
# invalid command output:
f'{OUT_PREFIX} elephant',
))

# the exception should not be caught - it will kill the scheduler
with pytest.raises(Exception, match='exception-in-callback'):
schd.task_job_mgr._manip_task_jobs_callback(
ctx, schd.workflow, schd.pool.get_tasks(), error_callback,
)

# it should log any errors that occur before the failure
assert log_filter(log, contains='Unhandled my-key output: elephant')
log.clear()

# the callback should have been called for a "bad_task" (status of "1")
assert get_callback_calls() == {('1/one', '1')}
Loading