Skip to content

Commit

Permalink
Merge pull request #6310 from oliver-sanders/6291
Browse files Browse the repository at this point in the history
play: fix spurious traceback observed on some systems
  • Loading branch information
hjoliver authored Aug 23, 2024
2 parents 14d4ec0 + 70aca62 commit 6a51b48
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 26 deletions.
1 change: 1 addition & 0 deletions changes.d/6310.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a spurious traceback that could occur when running the `cylc play` command on Mac OS.
67 changes: 53 additions & 14 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pathlib import Path
from shlex import quote
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Tuple

from packaging.version import Version

Expand Down Expand Up @@ -351,12 +351,12 @@ def _open_logs(id_: str, no_detach: bool, restart_num: int) -> None:
)


async def scheduler_cli(
async def _scheduler_cli_1(
options: 'Values',
workflow_id_raw: str,
parse_workflow_id: bool = True
) -> None:
"""Run the workflow.
) -> Tuple[Scheduler, str]:
"""Run the workflow (part 1 - async).
This function should contain all of the command line facing
functionality of the Scheduler, exit codes, logging, etc.
Expand Down Expand Up @@ -408,13 +408,28 @@ async def scheduler_cli(
scheduler = Scheduler(workflow_id, options)
await _setup(scheduler)

return scheduler, workflow_id


def _scheduler_cli_2(
options: 'Values',
scheduler: Scheduler,
) -> None:
"""Run the workflow (part 2 - sync)."""
# daemonize if requested
# NOTE: asyncio event loops cannot persist across daemonization
# ensure you have tidied up all threads etc before daemonizing
if not options.no_detach:
from cylc.flow.daemonize import daemonize
daemonize(scheduler)


async def _scheduler_cli_3(
options: 'Values',
workflow_id: str,
scheduler: Scheduler,
) -> None:
"""Run the workflow (part 3 - async)."""
# setup loggers
_open_logs(
workflow_id,
Expand All @@ -423,14 +438,7 @@ async def scheduler_cli(
)

# run the workflow
if options.no_detach:
ret = await _run(scheduler)
else:
# Note: The daemonization messes with asyncio so we have to start a
# new event loop if detaching
ret = asyncio.run(
_run(scheduler)
)
ret = await _run(scheduler)

# exit
# NOTE: we must clean up all asyncio / threading stuff before exiting
Expand Down Expand Up @@ -658,5 +666,36 @@ async def _run(scheduler: Scheduler) -> int:

@cli_function(get_option_parser)
def play(parser: COP, options: 'Values', id_: str):
"""Implement cylc play."""
return asyncio.run(scheduler_cli(options, id_))
cylc_play(options, id_)


def cylc_play(options: 'Values', id_: str, parse_workflow_id=True) -> None:
"""Implement cylc play.
Raises:
CylcError:
If this function is called whilst an asyncio event loop is running.
Because the scheduler process can be daemonised, this must not be
called whilst an asyncio event loop is active as memory associated
with this event loop will also exist in the new fork leading to
potentially strange problems.
See https://github.com/cylc/cylc-flow/issues/6291
"""
try:
# try opening an event loop to make sure there isn't one already open
asyncio.get_running_loop()
except RuntimeError:
# start/restart/resume the workflow
scheduler, workflow_id = asyncio.run(
_scheduler_cli_1(options, id_, parse_workflow_id=parse_workflow_id)
)
_scheduler_cli_2(options, scheduler)
asyncio.run(_scheduler_cli_3(options, workflow_id, scheduler))
else:
# if this line every gets hit then there is a bug within Cylc
raise CylcError(

Check warning on line 699 in cylc/flow/scheduler_cli.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler_cli.py#L699

Added line #L699 was not covered by tests
'cylc_play called whilst asyncio event loop is running'
) from None
4 changes: 2 additions & 2 deletions cylc/flow/scripts/validate_install_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
cleanup_sysargv,
log_subcommand,
)
from cylc.flow.scheduler_cli import scheduler_cli as cylc_play
from cylc.flow.scheduler_cli import cylc_play
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
run as cylc_validate,
Expand Down Expand Up @@ -120,4 +120,4 @@ def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None):

set_timestamps(LOG, options.log_timestamp)
log_subcommand(*sys.argv[1:])
asyncio.run(cylc_play(options, workflow_id))
cylc_play(options, workflow_id)
42 changes: 32 additions & 10 deletions cylc/flow/scripts/validate_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
in the installed workflow to ensure the change can be safely applied.
"""

import asyncio
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
from optparse import Values
Expand All @@ -59,7 +60,7 @@
log_subcommand,
cleanup_sysargv
)
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scheduler_cli import PLAY_OPTIONS, cylc_play
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
VALIDATE_AGAINST_SOURCE_OPTION,
Expand All @@ -76,8 +77,6 @@
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import detect_old_contact_file

import asyncio

CYLC_ROSE_OPTIONS = COP.get_cylc_rose_options()
VR_OPTIONS = combine_options(
VALIDATE_OPTIONS,
Expand Down Expand Up @@ -127,11 +126,32 @@ def check_tvars_and_workflow_stopped(

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: str):
sys.exit(asyncio.run(vr_cli(parser, options, workflow_id)))
ret = asyncio.run(vr_cli(parser, options, workflow_id))
if isinstance(ret, str):
# NOTE: cylc_play must be called from sync code (not async code)
cylc_play(options, ret, parse_workflow_id=False)
elif ret is False:
sys.exit(1)

Check warning on line 134 in cylc/flow/scripts/validate_reinstall.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/validate_reinstall.py#L134

Added line #L134 was not covered by tests


async def vr_cli(
parser: COP, options: 'Values', workflow_id: str
) -> Union[bool, str]:
"""Validate and reinstall and optionally reload workflow.
Runs:
* Validate
* Reinstall
* Reload (if the workflow is already running)
Returns:
The workflow_id or a True/False outcome.
async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
"""Run Cylc (re)validate - reinstall - reload in sequence."""
workflow_id: If the workflow is stopped and requires restarting.
True: If workflow is running and does not require restarting.
False: If this command should "exit 1".
"""
# Attempt to work out whether the workflow is running.
# We are trying to avoid reinstalling then subsequently being
# unable to play or reload because we cannot identify workflow state.
Expand Down Expand Up @@ -164,7 +184,7 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
if not check_tvars_and_workflow_stopped(
workflow_running, options.templatevars, options.templatevars_file
):
return 1
return False

Check warning on line 187 in cylc/flow/scripts/validate_reinstall.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/validate_reinstall.py#L187

Added line #L187 was not covered by tests

# Force on the against_source option:
options.against_source = True
Expand All @@ -188,12 +208,13 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
'No changes to source: No reinstall or'
f' {"reload" if workflow_running else "play"} required.'
)
return 1
return False

Check warning on line 211 in cylc/flow/scripts/validate_reinstall.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/validate_reinstall.py#L211

Added line #L211 was not covered by tests

# Run reload if workflow is running or paused:
if workflow_running:
log_subcommand('reload', workflow_id)
await cylc_reload(options, workflow_id)
return True

# run play anyway, to play a stopped workflow:
else:
Expand All @@ -206,5 +227,6 @@ async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
script_opts=(*PLAY_OPTIONS, *parser.get_std_options()),
source='', # Intentionally blank
)

log_subcommand(*sys.argv[1:])
await scheduler_cli(options, workflow_id, parse_workflow_id=False)
return workflow_id

0 comments on commit 6a51b48

Please sign in to comment.