Skip to content

Commit

Permalink
cat-log: add timeout to processes
Browse files Browse the repository at this point in the history
* Partially addresses cylc#643
* Add a timeout to `cylc cat-log` processes:
  * Ensures we don't accumulate them by accident.
  * Improves efficiency by closing long-lived log streams
    (the user probably isn't looking at the log file any more and
    the log file has probably stopped growing anyway).
* At present, `cylc cat-log` processes may be left behind
  if the websocket is closed abruptly. This has been observed as
  the result of a proxy timeout.
  • Loading branch information
oliver-sanders committed Nov 11, 2024
1 parent 87ca49e commit c4ab54b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
21 changes: 21 additions & 0 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,26 @@ class CylcUIServer(ExtensionApp):
default_value=False,
)

log_timeout = Float(
# Note: This timeout it intended to clean up log streams that are no
# longer being actively monitored and prevent the associated "cat-log"
# processes from persisting in situations where they should not be
# (e.g. if the websocket connection unexpectedly closes)
config=True,
help='''
The maximum length of time Cylc will stream a log file for in
seconds.
The "Log" view in the Cylc GUI streams log files allowing you to
monitor the file while is grows.
After the configured timeout, the stream will close. The log
view in the GUI will display a "reconnect" button allowing you
to restart the stream if desired.
''',
default_value=(60 * 60 * 4), # 4 hours
)

@validate('ui_build_dir')
def _check_ui_build_dir_exists(self, proposed):
if proposed['value'].exists():
Expand Down Expand Up @@ -407,6 +427,7 @@ def __init__(self, *args, **kwargs):
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
self.resolvers = Resolvers(
self,
self.data_store_mgr,
log=self.log,
executor=self.executor,
Expand Down
68 changes: 59 additions & 9 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PIPE,
Popen,
)
from time import time
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -57,6 +58,7 @@
from cylc.flow.option_parsers import Options
from graphql import ResolveInfo

from cylc.uiserver.app import CylcUIServer
from cylc.uiserver.workflows_mgr import WorkflowsManager


Expand Down Expand Up @@ -351,7 +353,7 @@ async def enqueue(stream, queue):
await queue.put(line.decode())

@classmethod
async def cat_log(cls, id_: Tokens, log, info, file=None):
async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None):
"""Calls `cat log`.
Used for log subscriptions.
Expand All @@ -366,7 +368,7 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
]
if file:
cmd += ['-f', file]
log.info(f'$ {" ".join(cmd)}')
app.log.info(f'$ {" ".join(cmd)}')

# For info, below subprocess is safe (uses shell=false by default)
proc = await asyncio.subprocess.create_subprocess_exec(
Expand All @@ -380,26 +382,58 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
# This is to get around problem where stream is not EOF until
# subprocess ends
enqueue_task = asyncio.create_task(cls.enqueue(proc.stdout, queue))

# GraphQL operation ID
op_id = info.root_value

# track the number of lines received so far
line_count = 0

# the time we started the cylc cat-loo process
start_time = time()

# set this to True when killing the process purposfully to
# prevent errors reaching the end user
killed = False

# configured cat-log process timeout
timeout = float(app.log_timeout)

try:
while info.context['sub_statuses'].get(op_id) != 'stop':
if time() - start_time > timeout:
# timeout exceeded -> kill the cat-log process
proc.kill()
killed = True

if queue.empty():
# there are *no* lines to read from the cat-log process
if buffer:
# yield everything in the buffer
yield {'lines': list(buffer)}
buffer.clear()

if proc.returncode is not None:
(_, stderr) = await proc.communicate()
# pass any error onto ui
msg = process_cat_log_stderr(stderr) or (
f"cylc cat-log exited {proc.returncode}"
)
yield {'error': msg}
# process exited
if not killed:
# process errored (excluding purposeful kill)
# -> pass any stderr text to the client
(_, stderr) = await proc.communicate()
msg = process_cat_log_stderr(stderr) or (
f"cylc cat-log exited {proc.returncode}"
)
yield {'error': msg}

# stop reading log lines
break

# sleep set at 1, which matches the `tail` default interval
await asyncio.sleep(1)

else:
# there *are* lines to read from the cat-log process
if line_count > MAX_LINES:
# we have read beyond the line count
yield {'lines': buffer}
yield {
'error': (
Expand All @@ -408,25 +442,39 @@ async def cat_log(cls, id_: Tokens, log, info, file=None):
)
}
break

elif line_count == 0:
# this is the first line
# (this is a special line contains the file path)
line_count += 1
yield {
'connected': True,
'path': (await queue.get())[2:].strip(),
}
continue

# read in the log lines and add them to the buffer
line = await queue.get()
line_count += 1
buffer.append(line)
if len(buffer) >= 75:
yield {'lines': list(buffer)}
buffer.clear()
# there is more text to read so don't sleep (but
# still "sleep(0)" to yield control to other
# coroutines)
await asyncio.sleep(0)

finally:
# kill the cat-log process
kill_process_tree(proc.pid)

# terminate the queue
enqueue_task.cancel()
with suppress(asyncio.CancelledError):
await enqueue_task

# tell the client we have disconnected
yield {'connected': False}

@classmethod
Expand Down Expand Up @@ -467,13 +515,15 @@ class Resolvers(BaseResolvers):

def __init__(
self,
app: 'CylcUIServer',
data: 'DataStoreMgr',
log: 'Logger',
workflows_mgr: 'WorkflowsManager',
executor,
**kwargs
):
super().__init__(data)
self.app = app
self.log = log
self.workflows_mgr = workflows_mgr
self.executor = executor
Expand Down Expand Up @@ -561,7 +611,7 @@ async def subscription_service(
):
async for ret in Services.cat_log(
ids[0],
self.log,
self.app,
info,
file
):
Expand Down

0 comments on commit c4ab54b

Please sign in to comment.