diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index 4039cda2..46abeab4 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -340,6 +340,26 @@ class CylcUIServer(ExtensionApp): default_value=False, ) + log_timeout = Float( + # Note: This timeout it intended to clean up log streams that are no + # longer being actively monitored and prevent the associated "cat-log" + # processes from persisting in situations where they should not be + # (e.g. if the websocket connection unexpectedly closes) + config=True, + help=''' + The maximum length of time Cylc will stream a log file for in + seconds. + + The "Log" view in the Cylc GUI streams log files allowing you to + monitor the file while is grows. + + After the configured timeout, the stream will close. The log + view in the GUI will display a "reconnect" button allowing you + to restart the stream if desired. + ''', + default_value=(60 * 60 * 4), # 4 hours + ) + @validate('ui_build_dir') def _check_ui_build_dir_exists(self, proposed): if proposed['value'].exists(): @@ -408,6 +428,7 @@ def __init__(self, *args, **kwargs): # sub_status dictionary storing status of subscriptions self.sub_statuses = {} self.resolvers = Resolvers( + self, self.data_store_mgr, log=self.log, executor=self.executor, diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 721ff3a4..00d97e49 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -27,6 +27,7 @@ PIPE, Popen, ) +from time import time from typing import ( TYPE_CHECKING, Any, @@ -57,6 +58,7 @@ from cylc.flow.option_parsers import Options from graphql import ResolveInfo + from cylc.uiserver.app import CylcUIServer from cylc.uiserver.workflows_mgr import WorkflowsManager @@ -351,7 +353,7 @@ async def enqueue(stream, queue): await queue.put(line.decode()) @classmethod - async def cat_log(cls, id_: Tokens, log, info, file=None): + async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): """Calls `cat log`. Used for log subscriptions. @@ -366,7 +368,7 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): ] if file: cmd += ['-f', file] - log.info(f'$ {" ".join(cmd)}') + app.log.info(f'$ {" ".join(cmd)}') # For info, below subprocess is safe (uses shell=false by default) proc = await asyncio.subprocess.create_subprocess_exec( @@ -380,26 +382,58 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): # This is to get around problem where stream is not EOF until # subprocess ends enqueue_task = asyncio.create_task(cls.enqueue(proc.stdout, queue)) + + # GraphQL operation ID op_id = info.root_value + + # track the number of lines received so far line_count = 0 + + # the time we started the cylc cat-loo process + start_time = time() + + # set this to True when killing the process purposfully to + # prevent errors reaching the end user + killed = False + + # configured cat-log process timeout + timeout = float(app.log_timeout) + try: while info.context['sub_statuses'].get(op_id) != 'stop': + if time() - start_time > timeout: + # timeout exceeded -> kill the cat-log process + proc.kill() + killed = True + if queue.empty(): + # there are *no* lines to read from the cat-log process if buffer: + # yield everything in the buffer yield {'lines': list(buffer)} buffer.clear() + if proc.returncode is not None: - (_, stderr) = await proc.communicate() - # pass any error onto ui - msg = process_cat_log_stderr(stderr) or ( - f"cylc cat-log exited {proc.returncode}" - ) - yield {'error': msg} + # process exited + if not killed: + # process errored (excluding purposeful kill) + # -> pass any stderr text to the client + (_, stderr) = await proc.communicate() + msg = process_cat_log_stderr(stderr) or ( + f"cylc cat-log exited {proc.returncode}" + ) + yield {'error': msg} + + # stop reading log lines break + # sleep set at 1, which matches the `tail` default interval await asyncio.sleep(1) + else: + # there *are* lines to read from the cat-log process if line_count > MAX_LINES: + # we have read beyond the line count yield {'lines': buffer} yield { 'error': ( @@ -408,25 +442,39 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): ) } break + elif line_count == 0: + # this is the first line + # (this is a special line contains the file path) line_count += 1 yield { 'connected': True, 'path': (await queue.get())[2:].strip(), } continue + + # read in the log lines and add them to the buffer line = await queue.get() line_count += 1 buffer.append(line) if len(buffer) >= 75: yield {'lines': list(buffer)} buffer.clear() + # there is more text to read so don't sleep (but + # still "sleep(0)" to yield control to other + # coroutines) await asyncio.sleep(0) + finally: + # kill the cat-log process kill_process_tree(proc.pid) + + # terminate the queue enqueue_task.cancel() with suppress(asyncio.CancelledError): await enqueue_task + + # tell the client we have disconnected yield {'connected': False} @classmethod @@ -467,6 +515,7 @@ class Resolvers(BaseResolvers): def __init__( self, + app: 'CylcUIServer', data: 'DataStoreMgr', log: 'Logger', workflows_mgr: 'WorkflowsManager', @@ -474,6 +523,7 @@ def __init__( **kwargs ): super().__init__(data) + self.app = app self.log = log self.workflows_mgr = workflows_mgr self.executor = executor @@ -561,7 +611,7 @@ async def subscription_service( ): async for ret in Services.cat_log( ids[0], - self.log, + self.app, info, file ):