diff --git a/src/sirocco/__init__.py b/src/sirocco/__init__.py index 672f045..820ed0c 100644 --- a/src/sirocco/__init__.py +++ b/src/sirocco/__init__.py @@ -1,5 +1,5 @@ -from . import parsing +from . import core, parsing -__all__ = ["parsing"] +__all__ = ["parsing", "core"] __version__ = "0.0.0-dev0" diff --git a/src/sirocco/core/__init__.py b/src/sirocco/core/__init__.py new file mode 100644 index 0000000..e2c15e4 --- /dev/null +++ b/src/sirocco/core/__init__.py @@ -0,0 +1,4 @@ +from .graph_items import Cycle, Data, GraphItem, Task +from .workflow import Workflow + +__all__ = ["Workflow", "GraphItem", "Data", "Task", "Cycle"] diff --git a/src/sirocco/core/_tasks/__init__.py b/src/sirocco/core/_tasks/__init__.py new file mode 100644 index 0000000..ee39844 --- /dev/null +++ b/src/sirocco/core/_tasks/__init__.py @@ -0,0 +1,3 @@ +from . import icon_task, shell_task + +__all__ = ["icon_task", "shell_task"] diff --git a/src/sirocco/core/_tasks/icon_task.py b/src/sirocco/core/_tasks/icon_task.py new file mode 100644 index 0000000..87b3f89 --- /dev/null +++ b/src/sirocco/core/_tasks/icon_task.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import ClassVar + +from sirocco.core.graph_items import Task + + +@dataclass +class IconTask(Task): + plugin: ClassVar[str] = "icon" + + namelists: dict = field(default_factory=dict) diff --git a/src/sirocco/core/_tasks/shell_task.py b/src/sirocco/core/_tasks/shell_task.py new file mode 100644 index 0000000..8e49ec9 --- /dev/null +++ b/src/sirocco/core/_tasks/shell_task.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import ClassVar + +from sirocco.core.graph_items import Task + + +@dataclass +class ShellTask(Task): + plugin: ClassVar[str] = "shell" + + command: str | None = None + command_option: str | None = None + input_arg_options: dict[str, str] | None = None + src: str | None = None diff --git a/src/sirocco/core.py b/src/sirocco/core/graph_items.py similarity index 51% rename from src/sirocco/core.py rename to src/sirocco/core/graph_items.py index 18483e1..9b00faf 100644 --- a/src/sirocco/core.py +++ b/src/sirocco/core/graph_items.py @@ -1,135 +1,120 @@ from __future__ import annotations -import logging from dataclasses import dataclass, field from itertools import chain, product -from typing import TYPE_CHECKING, Any, Self - -from sirocco.parsing._yaml_data_models import ( - ConfigCycleTask, - ConfigTask, - ConfigWorkflow, - load_workflow_config, -) +from typing import TYPE_CHECKING, Any, ClassVar, Self if TYPE_CHECKING: from collections.abc import Iterator - from datetime import datetime - - from sirocco.parsing._yaml_data_models import ConfigCycle, DataBaseModel, TargetNodesBaseModel -logging.basicConfig() -logger = logging.getLogger(__name__) + from sirocco.parsing._yaml_data_models import ConfigCycleTask, ConfigTask, DataBaseModel, TargetNodesBaseModel @dataclass class GraphItem: """base class for Data Tasks and Cycles""" + color: ClassVar[str] + name: str - color: str coordinates: dict = field(default_factory=dict) - @classmethod - def iter_coordinates(cls, param_refs: list, parameters: dict, date: datetime) -> Iterator[dict]: - space = ({} if date is None else {"date": [date]}) | {k: parameters[k] for k in param_refs} - yield from (dict(zip(space.keys(), x)) for x in product(*space.values())) + +class Plugin(type): + """Metaclass for plugin tasks inheriting from Task + + Used to register all plugin task classes""" + + classes: dict[str, type] | None = None + + def __new__(cls, name, bases, dct): + if cls.classes is None: + cls.classes = {} + plugin = dct["plugin"] + if plugin in cls.classes: + msg = f"Task for plugin {plugin} already set" + raise ValueError(msg) + return_cls = super().__new__(cls, name, bases, dct) + cls.classes[plugin] = return_cls + return return_cls @dataclass -class Task(GraphItem): +class Task(GraphItem, metaclass=Plugin): """Internal representation of a task node""" - color: str = "light_red" - workflow: Workflow | None = None - outputs: list[Data] = field(default_factory=list) + plugin: ClassVar[str] = "_BASE_TASK" + color: ClassVar[str] = "light_red" + inputs: list[Data] = field(default_factory=list) + outputs: list[Data] = field(default_factory=list) wait_on: list[Task] = field(default_factory=list) - # TODO: This list is too long. We should start with the set of supported - # keywords and extend it as we support more - command: str | None = None - command_option: str | None = None - input_arg_options: dict[str, str] | None = None host: str | None = None account: str | None = None - plugin: str | None = None - config: str | None = None uenv: dict | None = None nodes: int | None = None walltime: str | None = None - src: str | None = None - conda_env: str | None = None - # use classmethod instead of custom init @classmethod def from_config( cls, config: ConfigTask, - workflow_parameters: dict[str, list], + coordinates: dict[str, Any], + datastore: Store, graph_spec: ConfigCycleTask, - workflow: Workflow, - *, - date: datetime | None = None, - ) -> Iterator[Self]: - for coordinates in cls.iter_coordinates(config.parameters, workflow_parameters, date): - inputs = list( - chain( - *(workflow.data.iter_from_cycle_spec(input_spec, coordinates) for input_spec in graph_spec.inputs) + ) -> Task: + inputs = list( + chain(*(datastore.iter_from_cycle_spec(input_spec, coordinates) for input_spec in graph_spec.inputs)) + ) + outputs = [datastore[output_spec.name, coordinates] for output_spec in graph_spec.outputs] + # use the fact that pydantic models can be turned into dicts easily + cls_config = dict(config) + del cls_config["parameters"] + del cls_config["plugin"] + new = Plugin.classes[config.plugin]( + coordinates=coordinates, + inputs=inputs, + outputs=outputs, + **cls_config, + ) # this works because dataclass has generated this init for us + + # Store for actual linking in link_wait_on_tasks() once all tasks are created + new._wait_on_specs = graph_spec.wait_on # noqa: SLF001 we don't have access to self in a dataclass + # and setting an underscored attribute from + # the class itself raises SLF001 + + return new + + def link_wait_on_tasks(self, taskstore: Store): + self.wait_on = list( + chain( + *( + taskstore.iter_from_cycle_spec(wait_on_spec, self.coordinates) + for wait_on_spec in self._wait_on_specs ) ) - - outputs = [workflow.data[output_spec.name, coordinates] for output_spec in graph_spec.outputs] - - # use the fact that pydantic models can be turned into dicts easily - cls_config = dict(config) - del cls_config["parameters"] - - new = cls( - coordinates=coordinates, - inputs=inputs, - outputs=outputs, - workflow=workflow, - **cls_config, - ) # this works because dataclass has generated this init for us - - # Store for actual linking in link_wait_on_tasks() once all tasks are created - new._wait_on_specs = graph_spec.wait_on # noqa: SLF001 we don't have access to self in a dataclass - # and setting an underscored attribute from - # the class itself raises SLF001 - - yield new - - def link_wait_on_tasks(self): - self.wait_on: list[Task] = [] - for wait_on_spec in self._wait_on_specs: - self.wait_on.extend( - task - for task in self.workflow.tasks.iter_from_cycle_spec(wait_on_spec, self.coordinates) - if task is not None - ) + ) @dataclass(kw_only=True) class Data(GraphItem): """Internal representation of a data node""" - color: str = "light_blue" + color: ClassVar[str] = "light_blue" + type: str src: str available: bool @classmethod - def from_config( - cls, config: DataBaseModel, workflow_parameters: dict[str, list], *, date: datetime | None = None - ) -> Iterator[Self]: - for coordinates in cls.iter_coordinates(config.parameters, workflow_parameters, date): - yield cls( - name=config.name, - type=config.type, - src=config.src, - available=config.available, - coordinates=coordinates, - ) + def from_config(cls, config: DataBaseModel, coordinates: dict) -> Self: + return cls( + name=config.name, + type=config.type, + src=config.src, + available=config.available, + coordinates=coordinates, + ) @dataclass(kw_only=True) @@ -186,7 +171,7 @@ def iter_from_cycle_spec(self, spec: TargetNodesBaseModel, reference: dict) -> I if "date" not in self._dims and (spec.lag or spec.date): msg = f"Array {self._name} has no date dimension, cannot be referenced by dates" raise ValueError(msg) - if "date" in self._dims and reference.get("date") is None and spec.date is []: + if "date" in self._dims and reference.get("date") is None and len(spec.date) == 0: msg = f"Array {self._name} has a date dimension, must be referenced by dates" raise ValueError(msg) @@ -256,59 +241,3 @@ def __iter__(self) -> Iterator[GraphItem]: yield from item else: yield item - - -class Workflow: - """Internal reprensentation of a workflow""" - - def __init__(self, workflow_config: ConfigWorkflow) -> None: - self.name = workflow_config.name - self.tasks = Store() - self.data = Store() - self.cycles = Store() - - # 1 - create availalbe data nodes - for data_config in workflow_config.data.available: - for data in Data.from_config(data_config, workflow_config.parameters, date=None): - self.data.add(data) - - # 2 - create output data nodes - for cycle_config in workflow_config.cycles: - for date in self.cycle_dates(cycle_config): - for task_ref in cycle_config.tasks: - for data_ref in task_ref.outputs: - data_name = data_ref.name - data_config = workflow_config.data_dict[data_name] - for data in Data.from_config(data_config, workflow_config.parameters, date=date): - self.data.add(data) - - # 3 - create cycles and tasks - for cycle_config in workflow_config.cycles: - cycle_name = cycle_config.name - for date in self.cycle_dates(cycle_config): - cycle_tasks = [] - for task_graph_spec in cycle_config.tasks: - task_name = task_graph_spec.name - task_config = workflow_config.task_dict[task_name] - for task in Task.from_config( - task_config, workflow_config.parameters, task_graph_spec, workflow=self, date=date - ): - self.tasks.add(task) - cycle_tasks.append(task) - coordinates = {} if date is None else {"date": date} - self.cycles.add(Cycle(name=cycle_name, tasks=cycle_tasks, coordinates=coordinates)) - - # 4 - Link wait on tasks - for task in self.tasks: - task.link_wait_on_tasks() - - @staticmethod - def cycle_dates(cycle_config: ConfigCycle) -> Iterator[datetime]: - yield (date := cycle_config.start_date) - if cycle_config.period is not None: - while (date := date + cycle_config.period) < cycle_config.end_date: - yield date - - @classmethod - def from_yaml(cls, config_path: str): - return cls(load_workflow_config(config_path)) diff --git a/src/sirocco/core/workflow.py b/src/sirocco/core/workflow.py new file mode 100644 index 0000000..00fa1f6 --- /dev/null +++ b/src/sirocco/core/workflow.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from itertools import product +from typing import TYPE_CHECKING + +from sirocco.core import _tasks # noqa [F401] +from sirocco.core.graph_items import Cycle, Data, Store, Task +from sirocco.parsing._yaml_data_models import ( + ConfigWorkflow, + load_workflow_config, +) + +if TYPE_CHECKING: + from collections.abc import Iterator + from datetime import datetime + + from sirocco.parsing._yaml_data_models import ConfigCycle + + +class Workflow: + """Internal reprensentation of a workflow""" + + def __init__(self, workflow_config: ConfigWorkflow) -> None: + self.name = workflow_config.name + self.tasks = Store() + self.data = Store() + self.cycles = Store() + + # Function to iterate over date and parameter combinations + def iter_coordinates(param_refs: list, date: datetime | None = None) -> Iterator[dict]: + space = ({} if date is None else {"date": [date]}) | {k: workflow_config.parameters[k] for k in param_refs} + yield from (dict(zip(space.keys(), x)) for x in product(*space.values())) + + # 1 - create availalbe data nodes + for data_config in workflow_config.data.available: + for coordinates in iter_coordinates(param_refs=data_config.parameters, date=None): + self.data.add(Data.from_config(config=data_config, coordinates=coordinates)) + + # 2 - create output data nodes + for cycle_config in workflow_config.cycles: + for date in self.cycle_dates(cycle_config): + for task_ref in cycle_config.tasks: + for data_ref in task_ref.outputs: + data_name = data_ref.name + data_config = workflow_config.data_dict[data_name] + for coordinates in iter_coordinates(param_refs=data_config.parameters, date=date): + self.data.add(Data.from_config(config=data_config, coordinates=coordinates)) + + # 3 - create cycles and tasks + for cycle_config in workflow_config.cycles: + cycle_name = cycle_config.name + for date in self.cycle_dates(cycle_config): + cycle_tasks = [] + for task_graph_spec in cycle_config.tasks: + task_name = task_graph_spec.name + task_config = workflow_config.task_dict[task_name] + + for coordinates in iter_coordinates(param_refs=task_config.parameters, date=date): + task = Task.from_config( + config=task_config, coordinates=coordinates, datastore=self.data, graph_spec=task_graph_spec + ) + self.tasks.add(task) + cycle_tasks.append(task) + self.cycles.add( + Cycle(name=cycle_name, tasks=cycle_tasks, coordinates={} if date is None else {"date": date}) + ) + + # 4 - Link wait on tasks + for task in self.tasks: + task.link_wait_on_tasks(self.tasks) + + @staticmethod + def cycle_dates(cycle_config: ConfigCycle) -> Iterator[datetime]: + yield (date := cycle_config.start_date) + if cycle_config.period is not None: + while (date := date + cycle_config.period) < cycle_config.end_date: + yield date + + @classmethod + def from_yaml(cls, config_path: str): + return cls(load_workflow_config(config_path)) diff --git a/src/sirocco/parsing/_yaml_data_models.py b/src/sirocco/parsing/_yaml_data_models.py index 67c00ce..eb2b706 100644 --- a/src/sirocco/parsing/_yaml_data_models.py +++ b/src/sirocco/parsing/_yaml_data_models.py @@ -2,7 +2,6 @@ import time from datetime import datetime -from os.path import expandvars from pathlib import Path from typing import Any @@ -235,21 +234,14 @@ class ConfigTask(_NamedBaseModel): To create an instance of a task defined in a workflow file """ - # TODO: This list is too large. We should start with the set of supported - # keywords and extend it as we support more - command: str - command_option: str | None = None - input_arg_options: dict[str, str] | None = None + # config for genric task, no plugin specifics parameters: list[str] = [] host: str | None = None account: str | None = None plugin: str | None = None - config: str | None = None uenv: dict | None = None nodes: int | None = None walltime: str | None = None - src: str | None = None - conda_env: str | None = None def __init__(self, /, **data): # We have to treat root special as it does not typically define a command @@ -257,12 +249,6 @@ def __init__(self, /, **data): data["ROOT"]["command"] = "ROOT_PLACEHOLDER" super().__init__(**data) - @field_validator("command") - @classmethod - def expand_env_vars(cls, value: str) -> str: - """Expands any environment variables in the value""" - return expandvars(value) - @field_validator("walltime") @classmethod def convert_to_struct_time(cls, value: str | None) -> time.struct_time | None: @@ -270,6 +256,9 @@ def convert_to_struct_time(cls, value: str | None) -> time.struct_time | None: return None if value is None else time.strptime(value, "%H:%M:%S") +# TODO(maybe): ConfigTaskIcon(ConfigTask) and ConfigTaskShell(ConfigTask) + + class DataBaseModel(_NamedBaseModel): """ To create an instance of a data defined in a workflow file. diff --git a/tests/files/configs/test_config_large.yml b/tests/files/configs/test_config_large.yml index 81b5db7..2326e53 100644 --- a/tests/files/configs/test_config_large.yml +++ b/tests/files/configs/test_config_large.yml @@ -61,19 +61,18 @@ tasks: host: santis account: g110 - extpar: - plugin: extpar + plugin: shell # no extpar plugin available yet command: $PWD/examples/files/scripts/extpar command_option: '--verbose' # todo implement support input_arg_options: obs_data: '--input' - config: path/to/namelists/dir uenv: squashfs: path/to/squashfs mount_point: runtime/mount/point nodes: 1 walltime: 00:02:00 - preproc: - plugin: AiiDA Shell + plugin: shell command: $PWD/examples/files/scripts/cleanup.sh input_arg_options: grid_file: '-g' @@ -81,7 +80,6 @@ tasks: ERA5: '-e' nodes: 4 walltime: 00:02:00 - config: path/to/config/dir uenv: squashfs: path/to/squashfs mount_point: runtime/mount/point @@ -93,35 +91,35 @@ tasks: icon_input: '--input' nodes: 40 walltime: 23:59:59 - config: path/to/namelists/dir + namelists: + master: path/to/mater_nml + model: path/to/model_nml uenv: squashfs: path/to/squashfs mount_point: runtime/mount/point - postproc_1: - plugin: AiiDA Shell + plugin: shell command: $PWD/examples/files/scripts/main_script_ocn.sh input_arg_options: stream_1: '--input' nodes: 2 walltime: 00:05:00 - conda_env: path/to/yaml/env/file uenv: squashfs: path/to/squashfs mount_point: runtime/mount/point - postproc_2: - plugin: AiiDA Shell + plugin: shell command: $PWD/examples/files/scripts/main_script_atm.sh input_arg_options: stream_2: '--input' nodes: 2 walltime: 00:05:00 src: path/to/src/dir - conda_env: path/to/yaml/env/file uenv: squashfs: path/to/squashfs mount_point: runtime/mount/point - store_and_clean_1: - plugin: AiiDA Shell + plugin: shell command: $PWD/examples/files/scripts/post_clean.sh input_arg_options: postout_1: '--input' @@ -130,7 +128,7 @@ tasks: nodes: 1 walltime: 00:01:00 - store_and_clean_2: - plugin: AiiDA Shell + plugin: shell command: $PWD/examples/files/scripts/post_clean.sh input_arg_options: postout_2: '--input'