Skip to content

Commit

Permalink
Merge branch 'feature.plus_s_list_of_strings' of github.com:wxtim/cyl…
Browse files Browse the repository at this point in the history
…c into feature.plus_s_list_of_strings

* 'feature.plus_s_list_of_strings' of github.com:wxtim/cylc:
  Update tests/unit/test_templatevars.py
  Fix runtime environment (scheduler, validation, etc.) (#5570)
  Bump pypa/gh-action-pypi-publish from 1.8.6 to 1.8.7
  Fix simulation mode TypeError
  prevent tests symlinking
  subprocctx: fix xtrigger __str__
  xtriggers: improve parser
  • Loading branch information
wxtim committed Jun 29, 2023
2 parents 1fa1f4f + d978e85 commit 9242324
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/2_auto_publish_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
uses: cylc/release-actions/build-python-package@v1

- name: Publish distribution to PyPI
uses: pypa/[email protected].6
uses: pypa/[email protected].7
with:
user: __token__ # uses the API token feature of PyPI - least permissions possible
password: ${{ secrets.PYPI_TOKEN }}
Expand Down
67 changes: 47 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
from cylc.flow.parsec.util import replicate
from cylc.flow.pathutil import (
get_workflow_run_dir,
get_workflow_run_scheduler_log_dir,
get_workflow_run_share_dir,
get_workflow_run_work_dir,
get_workflow_name_from_id
get_workflow_name_from_id,
get_cylc_run_dir,
is_relative_to,
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
Expand Down Expand Up @@ -240,20 +238,27 @@ def __init__(
work_dir: Optional[str] = None,
share_dir: Optional[str] = None
) -> None:
"""
Initialize the workflow config object.
Args:
workflow: workflow ID
fpath: workflow config file path
options: CLI options
"""
check_deprecation(Path(fpath))
self.mem_log = mem_log_func
if self.mem_log is None:
self.mem_log = lambda x: None
self.mem_log("config.py:config.py: start init config")
self.workflow = workflow # workflow id
self.workflow = workflow
self.workflow_name = get_workflow_name_from_id(self.workflow)
self.fpath = str(fpath) # workflow definition
self.fdir = os.path.dirname(fpath)
self.run_dir = run_dir or get_workflow_run_dir(self.workflow)
self.log_dir = (log_dir or
get_workflow_run_scheduler_log_dir(self.workflow))
self.share_dir = share_dir or get_workflow_run_share_dir(self.workflow)
self.work_dir = work_dir or get_workflow_run_work_dir(self.workflow)
self.fpath: Path = Path(fpath)
self.fdir = str(self.fpath.parent)
self.run_dir = run_dir
self.log_dir = log_dir
self.share_dir = share_dir
self.work_dir = work_dir
self.options = options
self.implicit_tasks: Set[str] = set()
self.edges: Dict[
Expand Down Expand Up @@ -890,7 +895,7 @@ def _check_implicit_tasks(self) -> None:
)
# Allow implicit tasks in back-compat mode unless rose-suite.conf
# present (to maintain compat with Rose 2019)
elif not Path(self.run_dir, 'rose-suite.conf').is_file():
elif not (self.fpath.parent / "rose-suite.conf").is_file():
LOG.debug(msg)
return

Expand Down Expand Up @@ -1491,19 +1496,41 @@ def print_first_parent_tree(self, pretty=False, titles=False):
print_tree(tree, padding=padding, use_unicode=pretty)

def process_workflow_env(self):
"""Workflow context is exported to the local environment."""
"""Export Workflow context to the local environment.
A source workflow has only a name.
Once installed it also has an ID and a run directory.
And at scheduler start-up it has work, share, and log sub-dirs too.
"""
for key, value in {
**verbosity_to_env(cylc.flow.flags.verbosity),
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_NAME': self.workflow_name,
'CYLC_WORKFLOW_NAME_BASE': str(Path(self.workflow_name).name),
'CYLC_WORKFLOW_RUN_DIR': self.run_dir,
'CYLC_WORKFLOW_LOG_DIR': self.log_dir,
'CYLC_WORKFLOW_WORK_DIR': self.work_dir,
'CYLC_WORKFLOW_SHARE_DIR': self.share_dir,
}.items():
os.environ[key] = value

if is_relative_to(self.fdir, get_cylc_run_dir()):
# This is an installed workflow.
# - self.run_dir is only defined by the scheduler
# - but the run dir exists, created at installation
# - run sub-dirs may exist, if this installation was run already
# but if the scheduler is not running they shouldn't be used.
for key, value in {
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_RUN_DIR': str(self.fdir),
}.items():
os.environ[key] = value

if self.run_dir is not None:
# Run directory is only defined if the scheduler is running; in
# which case the following run sub-directories must exist.
for key, value in {
'CYLC_WORKFLOW_LOG_DIR': str(self.log_dir),
'CYLC_WORKFLOW_WORK_DIR': str(self.work_dir),
'CYLC_WORKFLOW_SHARE_DIR': str(self.share_dir),
}.items():
os.environ[key] = value

def process_config_env(self):
"""Set local config derived environment."""
os.environ['CYLC_UTC'] = str(get_utc_mode())
Expand Down
99 changes: 86 additions & 13 deletions cylc/flow/parsec/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import shlex
from collections import deque
from textwrap import dedent
from typing import List, Dict, Any, Tuple

from metomi.isodatetime.data import Duration, TimePoint
from metomi.isodatetime.dumpers import TimePointDumper
Expand Down Expand Up @@ -994,6 +995,66 @@ def coerce_parameter_list(cls, value, keys):
except ValueError:
return items

@staticmethod
def parse_xtrig_arglist(value: str) -> Tuple[List[Any], Dict[str, Any]]:
"""Parse Pythonic-like arg/kwarg signatures.
A stateful parser treats all args/kwargs as strings with
implicit quoting.
Examples:
>>> parse = CylcConfigValidator.parse_xtrig_arglist
# Parse pythonic syntax
>>> parse('a, b, c, d=1, e=2,')
(['a', 'b', 'c'], {'d': '1', 'e': '2'})
>>> parse('a, "1,2,3", b=",=",')
(['a', '"1,2,3"'], {'b': '",="'})
>>> parse('a, "b c", '"'d=e '")
(['a', '"b c"', "'d=e '"], {})
# Parse implicit (i.e. unquoted) strings
>>> parse('%(cycle)s, %(task)s, output=succeeded')
(['%(cycle)s', '%(task)s'], {'output': 'succeeded'})
"""
# results
args = []
kwargs = {}
# state
in_str = False # are we inside a quoted string
in_kwarg = False # are we after the = sign of a kwarg
buffer = '' # the current argument being parsed
kwarg_buffer = '' # the key of a kwarg if in_kwarg == True
# parser
for char in value:
if char in {'"', "'"}:
in_str = not in_str
buffer += char
elif not in_str and char == ',':
if in_kwarg:
kwargs[kwarg_buffer.strip()] = buffer.strip()
in_kwarg = False
kwarg_buffer = ''
else:
args.append(buffer.strip())
buffer = ''
elif char == '=' and not in_str and not in_kwarg:
in_kwarg = True
kwarg_buffer = buffer
buffer = ''
else:
buffer += char

# reached the end of the string
if buffer:
if in_kwarg:
kwargs[kwarg_buffer.strip()] = buffer.strip()
else:
args.append(buffer.strip())

return args, kwargs

@classmethod
def coerce_xtrigger(cls, value, keys):
"""Coerce a string into an xtrigger function context object.
Expand All @@ -1002,33 +1063,45 @@ def coerce_xtrigger(cls, value, keys):
Checks for legal string templates in arg values too.
Examples:
>>> CylcConfigValidator.coerce_xtrigger('a(b, c):PT1M', [None])
>>> xtrig = CylcConfigValidator.coerce_xtrigger
# parse xtrig function signatures
>>> xtrig('a(b, c):PT1M', [None])
a(b, c):60.0
>>> xtrig('a(x, "1,2,3", y):PT1S', [None])
a(x, 1,2,3, y):1.0
"""
# cast types
>>> x = xtrig('a(1, 1.1, True, abc, x=True, y=1.1)', [None])
>>> x.func_args
[1, 1.1, True, 'abc']
>>> x.func_kwargs
{'x': True, 'y': 1.1}
"""
label = keys[-1]
value = cls.strip_and_unquote(keys, value)
if not value:
raise IllegalValueError("xtrigger", keys, value)
args = []
kwargs = {}
match = cls._REC_TRIG_FUNC.match(value)
if match is None:
raise IllegalValueError("xtrigger", keys, value)
fname, fargs, intvl = match.groups()
if intvl:
intvl = cls.coerce_interval(intvl, keys)

if fargs:
# Extract function args and kwargs.
for farg in fargs.split(r','):
try:
key, val = farg.strip().split(r'=', 1)
except ValueError:
args.append(cls._coerce_type(farg.strip()))
else:
kwargs[key.strip()] = cls._coerce_type(val.strip())
# parse args
args, kwargs = CylcConfigValidator.parse_xtrig_arglist(fargs or '')

# cast types
args = [
CylcConfigValidator._coerce_type(arg)
for arg in args
]
kwargs = {
key: CylcConfigValidator._coerce_type(value)
for key, value in kwargs.items()
}

return SubFuncContext(label, fname, args, kwargs, intvl)

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def get_signature(self):
def __str__(self):
return (
f'{self.func_name}('
f'{", ".join(self.func_args + list(self.func_kwargs))}'
f'{", ".join(map(str, self.func_args + list(self.func_kwargs)))}'
f'):{self.intvl}'
)

Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1739,9 +1739,12 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
conf = itask.tdef.rtconfig['simulation']
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
now_str = get_current_time_string()
if (itask.point in conf['fail cycle points'] and
(itask.get_try_num() == 1 or
not conf['fail try 1 only'])):
if (
conf['fail cycle points'] is None # i.e. "all"
or itask.point in conf['fail cycle points']
) and (
itask.get_try_num() == 1 or not conf['fail try 1 only']
):
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
)
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,11 @@ def xtrigger_mgr() -> XtriggerManager:
create_autospec(Scheduler, workflow=workflow_name, owner=user)
)
)


@pytest.fixture()
def prevent_symlinking(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(
'cylc.flow.pathutil.make_symlink_dir',
lambda *_, **__: {}
)
93 changes: 93 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
from optparse import Values
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from pathlib import Path
import pytest
import logging
from types import SimpleNamespace
from unittest.mock import Mock
from contextlib import suppress

from cylc.flow import CYLC_LOG
from cylc.flow.config import WorkflowConfig
Expand Down Expand Up @@ -1565,3 +1567,94 @@ def test__warn_if_queues_have_implicit_tasks(caplog):
assert "'baz'" not in result
assert f"showing first {max_warning_lines}" in result


@pytest.mark.parametrize(
'installed, run_dir, cylc_vars',
[
pytest.param(
False, # not installed (parsing a source dir)
None, # no run directory passed to config object by scheduler
{
'CYLC_WORKFLOW_NAME': True, # expected environment variables
'CYLC_WORKFLOW_ID': False,
'CYLC_WORKFLOW_RUN_DIR': False,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="source-dir"
),
pytest.param(
True,
None,
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="run-dir"
),
pytest.param(
True,
"/some/path",
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': True,
'CYLC_WORKFLOW_SHARE_DIR': True,
'CYLC_WORKFLOW_LOG_DIR': True,
},
id="run-dir-from-scheduler"
),
]
)
def test_cylc_env_at_parsing(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
installed,
run_dir,
cylc_vars
):
"""Check that CYLC_ environment vars exported during config file parsing
are appropriate to the workflow context (source, installed, or running).
"""

# Purge environment from previous tests.
for key in cylc_vars.keys():
with suppress(KeyError):
del os.environ[key]

flow_file = tmp_path / WorkflowFiles.FLOW_FILE
flow_config = """
[scheduler]
allow implicit tasks = True
[scheduling]
[[graph]]
R1 = 'foo'
"""

flow_file.write_text(flow_config)

# Make it look as if path is relative to cylc-run (i.e. installed).
monkeypatch.setattr(
'cylc.flow.config.is_relative_to',
lambda _a, _b: installed
)

# Parse the workflow config then check the environment.
WorkflowConfig(
workflow="name", fpath=flow_file, options=Mock(spec=[]),
run_dir=run_dir
)

cylc_env = [k for k in os.environ.keys() if k.startswith('CYLC_')]

for var, expected in cylc_vars.items():
if expected:
assert var in cylc_env
else:
assert var not in cylc_env
Loading

0 comments on commit 9242324

Please sign in to comment.