Skip to content

Commit

Permalink
Fix runtime environment (scheduler, validation, etc.) (#5570)
Browse files Browse the repository at this point in the history
Fix workflow environment variables when in source directory
  • Loading branch information
hjoliver authored Jun 28, 2023
1 parent 9a0f5f8 commit b325d2d
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 20 deletions.
67 changes: 47 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
from cylc.flow.parsec.util import replicate
from cylc.flow.pathutil import (
get_workflow_run_dir,
get_workflow_run_scheduler_log_dir,
get_workflow_run_share_dir,
get_workflow_run_work_dir,
get_workflow_name_from_id
get_workflow_name_from_id,
get_cylc_run_dir,
is_relative_to,
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
Expand Down Expand Up @@ -240,20 +238,27 @@ def __init__(
work_dir: Optional[str] = None,
share_dir: Optional[str] = None
) -> None:
"""
Initialize the workflow config object.
Args:
workflow: workflow ID
fpath: workflow config file path
options: CLI options
"""
check_deprecation(Path(fpath))
self.mem_log = mem_log_func
if self.mem_log is None:
self.mem_log = lambda x: None
self.mem_log("config.py:config.py: start init config")
self.workflow = workflow # workflow id
self.workflow = workflow
self.workflow_name = get_workflow_name_from_id(self.workflow)
self.fpath = str(fpath) # workflow definition
self.fdir = os.path.dirname(fpath)
self.run_dir = run_dir or get_workflow_run_dir(self.workflow)
self.log_dir = (log_dir or
get_workflow_run_scheduler_log_dir(self.workflow))
self.share_dir = share_dir or get_workflow_run_share_dir(self.workflow)
self.work_dir = work_dir or get_workflow_run_work_dir(self.workflow)
self.fpath: Path = Path(fpath)
self.fdir = str(self.fpath.parent)
self.run_dir = run_dir
self.log_dir = log_dir
self.share_dir = share_dir
self.work_dir = work_dir
self.options = options
self.implicit_tasks: Set[str] = set()
self.edges: Dict[
Expand Down Expand Up @@ -890,7 +895,7 @@ def _check_implicit_tasks(self) -> None:
)
# Allow implicit tasks in back-compat mode unless rose-suite.conf
# present (to maintain compat with Rose 2019)
elif not Path(self.run_dir, 'rose-suite.conf').is_file():
elif not (self.fpath.parent / "rose-suite.conf").is_file():
LOG.debug(msg)
return

Expand Down Expand Up @@ -1491,19 +1496,41 @@ def print_first_parent_tree(self, pretty=False, titles=False):
print_tree(tree, padding=padding, use_unicode=pretty)

def process_workflow_env(self):
"""Workflow context is exported to the local environment."""
"""Export Workflow context to the local environment.
A source workflow has only a name.
Once installed it also has an ID and a run directory.
And at scheduler start-up it has work, share, and log sub-dirs too.
"""
for key, value in {
**verbosity_to_env(cylc.flow.flags.verbosity),
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_NAME': self.workflow_name,
'CYLC_WORKFLOW_NAME_BASE': str(Path(self.workflow_name).name),
'CYLC_WORKFLOW_RUN_DIR': self.run_dir,
'CYLC_WORKFLOW_LOG_DIR': self.log_dir,
'CYLC_WORKFLOW_WORK_DIR': self.work_dir,
'CYLC_WORKFLOW_SHARE_DIR': self.share_dir,
}.items():
os.environ[key] = value

if is_relative_to(self.fdir, get_cylc_run_dir()):
# This is an installed workflow.
# - self.run_dir is only defined by the scheduler
# - but the run dir exists, created at installation
# - run sub-dirs may exist, if this installation was run already
# but if the scheduler is not running they shouldn't be used.
for key, value in {
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_RUN_DIR': str(self.fdir),
}.items():
os.environ[key] = value

if self.run_dir is not None:
# Run directory is only defined if the scheduler is running; in
# which case the following run sub-directories must exist.
for key, value in {
'CYLC_WORKFLOW_LOG_DIR': str(self.log_dir),
'CYLC_WORKFLOW_WORK_DIR': str(self.work_dir),
'CYLC_WORKFLOW_SHARE_DIR': str(self.share_dir),
}.items():
os.environ[key] = value

def process_config_env(self):
"""Set local config derived environment."""
os.environ['CYLC_UTC'] = str(get_utc_mode())
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
from optparse import Values
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from pathlib import Path
import pytest
import logging
from types import SimpleNamespace
from unittest.mock import Mock
from contextlib import suppress

from cylc.flow import CYLC_LOG
from cylc.flow.config import WorkflowConfig
Expand Down Expand Up @@ -1565,3 +1567,94 @@ def test__warn_if_queues_have_implicit_tasks(caplog):
assert "'baz'" not in result
assert f"showing first {max_warning_lines}" in result


@pytest.mark.parametrize(
'installed, run_dir, cylc_vars',
[
pytest.param(
False, # not installed (parsing a source dir)
None, # no run directory passed to config object by scheduler
{
'CYLC_WORKFLOW_NAME': True, # expected environment variables
'CYLC_WORKFLOW_ID': False,
'CYLC_WORKFLOW_RUN_DIR': False,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="source-dir"
),
pytest.param(
True,
None,
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="run-dir"
),
pytest.param(
True,
"/some/path",
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': True,
'CYLC_WORKFLOW_SHARE_DIR': True,
'CYLC_WORKFLOW_LOG_DIR': True,
},
id="run-dir-from-scheduler"
),
]
)
def test_cylc_env_at_parsing(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
installed,
run_dir,
cylc_vars
):
"""Check that CYLC_ environment vars exported during config file parsing
are appropriate to the workflow context (source, installed, or running).
"""

# Purge environment from previous tests.
for key in cylc_vars.keys():
with suppress(KeyError):
del os.environ[key]

flow_file = tmp_path / WorkflowFiles.FLOW_FILE
flow_config = """
[scheduler]
allow implicit tasks = True
[scheduling]
[[graph]]
R1 = 'foo'
"""

flow_file.write_text(flow_config)

# Make it look as if path is relative to cylc-run (i.e. installed).
monkeypatch.setattr(
'cylc.flow.config.is_relative_to',
lambda _a, _b: installed
)

# Parse the workflow config then check the environment.
WorkflowConfig(
workflow="name", fpath=flow_file, options=Mock(spec=[]),
run_dir=run_dir
)

cylc_env = [k for k in os.environ.keys() if k.startswith('CYLC_')]

for var, expected in cylc_vars.items():
if expected:
assert var in cylc_env
else:
assert var not in cylc_env

0 comments on commit b325d2d

Please sign in to comment.