Skip to content

Commit

Permalink
play: fix spurious traceback observed on some systems
Browse files Browse the repository at this point in the history
* Closes #6291
* The `cylc play` command would sometimes produce traceback
  when detaching workflows (the default unless `--no-detach` is used).
* This traceback does not appear to have had any ill effects, but may
  have suppressed the normal Python session teardown logic.
* It was only reported on Mac OS, but may potentially occur on other
  systems.
* This PR mitigates the circumstances under which the traceback
  occurred by separating the asyncio event loops that are run before and
  after daemonization.
  • Loading branch information
oliver-sanders committed Aug 20, 2024
1 parent 524b44e commit c2f64f6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 26 deletions.
67 changes: 53 additions & 14 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pathlib import Path
from shlex import quote
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Tuple

from packaging.version import Version

Expand Down Expand Up @@ -351,12 +351,12 @@ def _open_logs(id_: str, no_detach: bool, restart_num: int) -> None:
)


async def scheduler_cli(
async def _scheduler_cli_1(
options: 'Values',
workflow_id_raw: str,
parse_workflow_id: bool = True
) -> None:
"""Run the workflow.
) -> Tuple[Scheduler, str]:
"""Run the workflow (part 1 - async).
This function should contain all of the command line facing
functionality of the Scheduler, exit codes, logging, etc.
Expand Down Expand Up @@ -408,13 +408,28 @@ async def scheduler_cli(
scheduler = Scheduler(workflow_id, options)
await _setup(scheduler)

return scheduler, workflow_id


def _scheduler_cli_2(
options: 'Values',
scheduler: Scheduler,
) -> None:
"""Run the workflow (part 2 - sync)."""
# daemonize if requested
# NOTE: asyncio event loops cannot persist across daemonization
# ensure you have tidied up all threads etc before daemonizing
if not options.no_detach:
from cylc.flow.daemonize import daemonize
daemonize(scheduler)


async def _scheduler_cli_3(
options: 'Values',
workflow_id: str,
scheduler: Scheduler,
) -> None:
"""Run the workflow (part 3 - async)."""
# setup loggers
_open_logs(
workflow_id,
Expand All @@ -423,14 +438,7 @@ async def scheduler_cli(
)

# run the workflow
if options.no_detach:
ret = await _run(scheduler)
else:
# Note: The daemonization messes with asyncio so we have to start a
# new event loop if detaching
ret = asyncio.run(
_run(scheduler)
)
ret = await _run(scheduler)

# exit
# NOTE: we must clean up all asyncio / threading stuff before exiting
Expand Down Expand Up @@ -658,5 +666,36 @@ async def _run(scheduler: Scheduler) -> int:

@cli_function(get_option_parser)
def play(parser: COP, options: 'Values', id_: str):
"""Implement cylc play."""
return asyncio.run(scheduler_cli(options, id_))
cylc_play(options, id_)


def cylc_play(options: 'Values', id_: str, parse_workflow_id=True) -> None:
"""Implement cylc play.
Raises:
CylcError:
If this function is called whilst an asyncio event loop is running.
Because the scheduler process can be daemonised, this must not be
called whilst an asyncio event loop is active as memory associated
with this event loop will also exist in the new fork leading to
potentially strange problems.
See https://github.com/cylc/cylc-flow/issues/6291
"""
try:
# try opening an event loop to make sure there isn't one already open
asyncio.get_running_loop()
except RuntimeError:
# start/restart/resume the workflow
scheduler, workflow_id = asyncio.run(
_scheduler_cli_1(options, id_, parse_workflow_id=parse_workflow_id)
)
_scheduler_cli_2(options, scheduler)
asyncio.run(_scheduler_cli_3(options, workflow_id, scheduler))
else:
# if this line every gets hit then there is a bug within Cylc
raise CylcError(
'cylc_play called whilst asyncio event loop is running'
) from None
4 changes: 2 additions & 2 deletions cylc/flow/scripts/validate_install_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
cleanup_sysargv,
log_subcommand,
)
from cylc.flow.scheduler_cli import scheduler_cli as cylc_play
from cylc.flow.scheduler_cli import cylc_play
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
run as cylc_validate,
Expand Down Expand Up @@ -120,4 +120,4 @@ def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None):

set_timestamps(LOG, options.log_timestamp)
log_subcommand(*sys.argv[1:])
asyncio.run(cylc_play(options, workflow_id))
cylc_play(options, workflow_id)
42 changes: 32 additions & 10 deletions cylc/flow/scripts/validate_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
in the installed workflow to ensure the change can be safely applied.
"""

import asyncio
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
from optparse import Values
Expand All @@ -59,7 +60,7 @@
log_subcommand,
cleanup_sysargv
)
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scheduler_cli import PLAY_OPTIONS, cylc_play
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
VALIDATE_AGAINST_SOURCE_OPTION,
Expand All @@ -76,8 +77,6 @@
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import detect_old_contact_file

import asyncio

CYLC_ROSE_OPTIONS = COP.get_cylc_rose_options()
VR_OPTIONS = combine_options(
VALIDATE_OPTIONS,
Expand Down Expand Up @@ -127,11 +126,32 @@ def check_tvars_and_workflow_stopped(

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: str):
sys.exit(asyncio.run(vr_cli(parser, options, workflow_id)))
ret = asyncio.run(vr_cli(parser, options, workflow_id))
if isinstance(ret, str):
# NOTE: cylc_play must be called from sync code (not async code)
cylc_play(options, ret, parse_workflow_id=False)
elif ret is False:
sys.exit(1)


async def vr_cli(
parser: COP, options: 'Values', workflow_id: str
) -> Union[bool, str]:
"""Validate and reinstall and optionally reload workflow.
Runs:
* Validate
* Reinstall
* Reload (if the workflow is already running)
Returns:
The workflow_id or a True/False outcome.
async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
"""Run Cylc (re)validate - reinstall - reload in sequence."""
workflow_id: If the workflow is stopped and requires restarting.
True: If workflow is running and does not require restarting.
False: If this command should "exit 1".
"""
# Attempt to work out whether the workflow is running.
# We are trying to avoid reinstalling then subsequently being
# unable to play or reload because we cannot identify workflow state.
Expand Down Expand Up @@ -164,7 +184,7 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
if not check_tvars_and_workflow_stopped(
workflow_running, options.templatevars, options.templatevars_file
):
return 1
return False

# Force on the against_source option:
options.against_source = True
Expand All @@ -188,12 +208,13 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
'No changes to source: No reinstall or'
f' {"reload" if workflow_running else "play"} required.'
)
return 1
return False

# Run reload if workflow is running or paused:
if workflow_running:
log_subcommand('reload', workflow_id)
await cylc_reload(options, workflow_id)
return True

# run play anyway, to play a stopped workflow:
else:
Expand All @@ -206,5 +227,6 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
script_opts=(*PLAY_OPTIONS, *parser.get_std_options()),
source='', # Intentionally blank
)

log_subcommand(*sys.argv[1:])
await scheduler_cli(options, workflow_id, parse_workflow_id=False)
return workflow_id

0 comments on commit c2f64f6

Please sign in to comment.