diff --git a/python_modules/dagster/dagster/_core/blueprints/load_from_yaml.py b/python_modules/dagster/dagster/_core/blueprints/load_from_yaml.py index 15081386ffe87..307244980f9ca 100644 --- a/python_modules/dagster/dagster/_core/blueprints/load_from_yaml.py +++ b/python_modules/dagster/dagster/_core/blueprints/load_from_yaml.py @@ -1,10 +1,13 @@ from pathlib import Path -from typing import Any, Dict, NamedTuple, Optional, Type, Union +from typing import Any, Dict, NamedTuple, Optional, Sequence, Type, Union, cast + +from typing_extensions import get_args, get_origin from dagster import ( Definitions, _check as check, ) +from dagster._config.pythonic_config.type_check_utils import safe_is_subclass from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.metadata.source_code import ( CodeReferencesMetadataSet, @@ -12,7 +15,10 @@ LocalFileCodeReference, ) from dagster._model.pydantic_compat_layer import json_schema_from_type -from dagster._utils.pydantic_yaml import parse_yaml_file_to_pydantic +from dagster._utils.pydantic_yaml import ( + parse_yaml_file_to_pydantic, + parse_yaml_file_to_pydantic_sequence, +) from .blueprint import Blueprint, BlueprintDefinitions @@ -82,7 +88,7 @@ def _attach_code_references_to_definitions( def load_defs_from_yaml( *, path: Union[Path, str], - per_file_blueprint_type: Type[Blueprint], + per_file_blueprint_type: Union[Type[Blueprint], Type[Sequence[Blueprint]]], resources: Optional[Dict[str, Any]] = None, ) -> Definitions: """Load Dagster definitions from a YAML file of blueprints. @@ -90,8 +96,10 @@ def load_defs_from_yaml( Args: path (Path | str): The path to the YAML file or directory of YAML files containing the blueprints for Dagster definitions. - per_file_blueprint_type (type[Blueprint]): The type of blueprint that each of the YAML - files are expected to conform to. + per_file_blueprint_type (Union[Type[Blueprint], Sequence[Type[Blueprint]]]): The type + of blueprint that each of the YAML files are expected to conform to. If a sequence + type is provided, the function will expect each YAML file to contain a list of + blueprints. resources (Dict[str, Any], optional): A dictionary of resources to be bound to the definitions. Defaults to None. @@ -106,10 +114,32 @@ def load_defs_from_yaml( else: file_paths = list(resolved_path.rglob("*.yaml")) + list(resolved_path.rglob("*.yml")) - blueprints = [ - parse_yaml_file_to_pydantic(per_file_blueprint_type, file_path.read_text(), str(file_path)) - for file_path in file_paths - ] + origin = get_origin(per_file_blueprint_type) + if safe_is_subclass(origin, Sequence): + args = get_args(per_file_blueprint_type) + check.invariant( + args and len(args) == 1, + "Sequence type annotation must have a single Blueprint type argument", + ) + + # flatten the list of blueprints from all files + blueprints = [ + blueprint + for file_path in file_paths + for blueprint in parse_yaml_file_to_pydantic_sequence( + args[0], file_path.read_text(), str(file_path) + ) + ] + + else: + blueprints = [ + parse_yaml_file_to_pydantic( + cast(Type[Blueprint], per_file_blueprint_type), + file_path.read_text(), + str(file_path), + ) + for file_path in file_paths + ] def_sets_with_code_references = [ _attach_code_references_to_definitions( @@ -130,7 +160,7 @@ class YamlBlueprintsLoader(NamedTuple): """ path: Path - per_file_blueprint_type: Type[Blueprint] + per_file_blueprint_type: Union[Type[Blueprint], Type[Sequence[Blueprint]]] def load_defs(self) -> Definitions: return load_defs_from_yaml( diff --git a/python_modules/dagster/dagster/_model/pydantic_compat_layer.py b/python_modules/dagster/dagster/_model/pydantic_compat_layer.py index e5e508394636d..f3d931809dc85 100644 --- a/python_modules/dagster/dagster/_model/pydantic_compat_layer.py +++ b/python_modules/dagster/dagster/_model/pydantic_compat_layer.py @@ -1,5 +1,5 @@ import json -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Type +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Sequence, Type, Union import pydantic from pydantic import BaseModel, ValidationError @@ -154,7 +154,7 @@ def build_validation_error( ) -def json_schema_from_type(model_type: Type[BaseModel]): +def json_schema_from_type(model_type: Union[Type[BaseModel], Type[Sequence[BaseModel]]]): """Pydantic version stable way to get the JSON schema for a Pydantic model.""" # This nicely handles the case where the per_file_blueprint_type is actually # a union type etc. diff --git a/python_modules/dagster/dagster/_utils/pydantic_yaml.py b/python_modules/dagster/dagster/_utils/pydantic_yaml.py index cdc4c80963b51..fda8363f09bf6 100644 --- a/python_modules/dagster/dagster/_utils/pydantic_yaml.py +++ b/python_modules/dagster/dagster/_utils/pydantic_yaml.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, ValidationError, parse_obj_as +from dagster._core.errors import DagsterInvariantViolationError from dagster._model.pydantic_compat_layer import USING_PYDANTIC_1 from .source_position import ( @@ -116,7 +117,7 @@ def parse_yaml_file_to_pydantic_sequence( parsed = parse_yaml_with_source_positions(src, filename) if not isinstance(parsed.value, list): - raise ValueError( + raise DagsterInvariantViolationError( f"Error parsing YAML file {filename}: Expected a list of objects at document root, but got {type(parsed.value)}" ) diff --git a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/__snapshots__/test_load_defs_from_yaml.ambr b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/__snapshots__/test_load_defs_from_yaml.ambr index 7aec6d9037072..999fb4f1e633c 100644 --- a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/__snapshots__/test_load_defs_from_yaml.ambr +++ b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/__snapshots__/test_load_defs_from_yaml.ambr @@ -8,7 +8,7 @@ 'description': ''' A blob of user-provided, structured metadata that specifies a set of Dagster definitions, like assets, jobs, schedules, sensors, resources, or asset checks. - + Base class for user-provided types. Users override and provide: - The set of fields - A build_defs implementation that generates Dagster Definitions from field values @@ -45,6 +45,63 @@ 'type': 'object', }) # --- +# name: test_loader_schema_sequence[1] + dict({ + 'definitions': dict({ + 'SimpleAssetBlueprint': dict({ + 'additionalProperties': False, + 'description': ''' + A blob of user-provided, structured metadata that specifies a set of Dagster definitions, + like assets, jobs, schedules, sensors, resources, or asset checks. + + Base class for user-provided types. Users override and provide: + - The set of fields + - A build_defs implementation that generates Dagster Definitions from field values + ''', + 'properties': dict({ + 'key': dict({ + 'title': 'Key', + 'type': 'string', + }), + }), + 'required': list([ + 'key', + ]), + 'title': 'SimpleAssetBlueprint', + 'type': 'object', + }), + }), + 'items': dict({ + '$ref': '#/definitions/SimpleAssetBlueprint', + }), + 'title': 'ParsingModel[Sequence[test_load_defs_from_yaml.test_loader_schema_sequence..SimpleAssetBlueprint]]', + 'type': 'array', + }) +# --- +# name: test_loader_schema_sequence[2] + dict({ + '$defs': dict({ + 'SimpleAssetBlueprint': dict({ + 'additionalProperties': False, + 'properties': dict({ + 'key': dict({ + 'title': 'Key', + 'type': 'string', + }), + }), + 'required': list([ + 'key', + ]), + 'title': 'SimpleAssetBlueprint', + 'type': 'object', + }), + }), + 'items': dict({ + '$ref': '#/$defs/SimpleAssetBlueprint', + }), + 'type': 'array', + }) +# --- # name: test_loader_schema_union[1] dict({ 'anyOf': list([ @@ -61,7 +118,7 @@ 'description': ''' A blob of user-provided, structured metadata that specifies a set of Dagster definitions, like assets, jobs, schedules, sensors, resources, or asset checks. - + Base class for user-provided types. Users override and provide: - The set of fields - A build_defs implementation that generates Dagster Definitions from field values @@ -91,7 +148,7 @@ 'description': ''' A blob of user-provided, structured metadata that specifies a set of Dagster definitions, like assets, jobs, schedules, sensors, resources, or asset checks. - + Base class for user-provided types. Users override and provide: - The set of fields - A build_defs implementation that generates Dagster Definitions from field values @@ -133,7 +190,11 @@ 'type': dict({ 'const': 'bar', 'default': 'bar', + 'enum': list([ + 'bar', + ]), 'title': 'Type', + 'type': 'string', }), }), 'required': list([ @@ -152,7 +213,11 @@ 'type': dict({ 'const': 'foo', 'default': 'foo', + 'enum': list([ + 'foo', + ]), 'title': 'Type', + 'type': 'string', }), }), 'required': list([ diff --git a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/test_load_defs_from_yaml.py b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/test_load_defs_from_yaml.py index afe461935f6fc..02a5878f744b3 100644 --- a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/test_load_defs_from_yaml.py +++ b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/test_load_defs_from_yaml.py @@ -1,9 +1,11 @@ import os +import sys from pathlib import Path -from typing import Literal, Union +from typing import List, Literal, Sequence, Union import pytest from dagster import AssetKey, asset, job +from dagster._check import CheckError from dagster._core.blueprints.blueprint import ( Blueprint, BlueprintDefinitions, @@ -14,7 +16,7 @@ CodeReferencesMetadataSet, LocalFileCodeReference, ) -from dagster._core.errors import DagsterInvalidDefinitionError +from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._model.pydantic_compat_layer import USING_PYDANTIC_1, USING_PYDANTIC_2 from pydantic import ValidationError @@ -252,7 +254,7 @@ class SimpleAssetBlueprint(Blueprint): # Pydantic 1 JSON schema has the blueprint as a definition rather than a top-level object # Pydantic 2 JSON schema has the blueprint as a top-level object if model_schema["title"] == "ParsingModel[SimpleAssetBlueprint]": - assert model_schema["#ref"] == "#/definitions/SimpleAssetBlueprint" + assert model_schema["$ref"] == "#/definitions/SimpleAssetBlueprint" model_schema = model_schema["definitions"]["SimpleAssetBlueprint"] assert model_schema["title"] == "SimpleAssetBlueprint" @@ -261,6 +263,21 @@ class SimpleAssetBlueprint(Blueprint): assert set(model_keys) == {"key"} +@pytest.mark.parametrize("pydantic_version", [2 if USING_PYDANTIC_2 else 1]) +def test_loader_schema_sequence(snapshot, pydantic_version: int) -> None: + class SimpleAssetBlueprint(Blueprint): + key: str + + loader = YamlBlueprintsLoader( + path=Path(__file__), per_file_blueprint_type=Sequence[SimpleAssetBlueprint] + ) + + model_schema = loader.model_json_schema() + snapshot.assert_match(model_schema) + + assert model_schema["type"] == "array" + + @pytest.mark.parametrize("pydantic_version", [2 if USING_PYDANTIC_2 else 1]) def test_loader_schema_union(snapshot, pydantic_version: int) -> None: class FooAssetBlueprint(Blueprint): @@ -285,3 +302,80 @@ class BarAssetBlueprint(Blueprint): item.get("#ref", item.get("$ref")).split("/")[-1] for item in model_schema["anyOf"] ] assert set(any_of_refs) == {"FooAssetBlueprint", "BarAssetBlueprint"} + + +def test_single_file_many_blueprints() -> None: + defs = load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml", + per_file_blueprint_type=List[SimpleAssetBlueprint], + ) + assert set(defs.get_asset_graph().all_asset_keys) == { + AssetKey("asset1"), + AssetKey("asset2"), + AssetKey("asset3"), + } + + defs = load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml", + per_file_blueprint_type=Sequence[SimpleAssetBlueprint], + ) + assert set(defs.get_asset_graph().all_asset_keys) == { + AssetKey("asset1"), + AssetKey("asset2"), + AssetKey("asset3"), + } + + +# Disabled for Python versions <3.9 as builtin types do not support generics +# until Python 3.9, https://peps.python.org/pep-0585/ +@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9") +def test_single_file_many_blueprints_builtin_list() -> None: + defs = load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml", + per_file_blueprint_type=list[SimpleAssetBlueprint], # type: ignore + ) + assert set(defs.get_asset_graph().all_asset_keys) == { + AssetKey("asset1"), + AssetKey("asset2"), + AssetKey("asset3"), + } + + +def test_single_file_no_bp_type() -> None: + with pytest.raises( + CheckError, match="Sequence type annotation must have a single Blueprint type argument" + ): + load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml", + per_file_blueprint_type=List, + ) + with pytest.raises( + CheckError, match="Sequence type annotation must have a single Blueprint type argument" + ): + load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml", + per_file_blueprint_type=Sequence, + ) + + +def test_expect_list_no_list() -> None: + with pytest.raises( + DagsterInvariantViolationError, match="Expected a list of objects at document root, but got" + ): + load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "single_blueprint.yaml", + per_file_blueprint_type=List[SimpleAssetBlueprint], + ) + + +def test_dir_of_many_blueprints() -> None: + defs = load_defs_from_yaml( + path=Path(__file__).parent / "yaml_files" / "dir_of_lists_of_blueprints", + per_file_blueprint_type=List[SimpleAssetBlueprint], + ) + assert set(defs.get_asset_graph().all_asset_keys) == { + AssetKey("asset1"), + AssetKey("asset2"), + AssetKey("asset3"), + AssetKey("asset4"), + } diff --git a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints1.yaml b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints1.yaml new file mode 100644 index 0000000000000..3caaab74955ac --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints1.yaml @@ -0,0 +1,2 @@ +- key: asset1 +- key: asset2 diff --git a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints2.yaml b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints2.yaml new file mode 100644 index 0000000000000..f1d4b0970c45e --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/dir_of_lists_of_blueprints/list_of_blueprints2.yaml @@ -0,0 +1,2 @@ +- key: asset3 +- key: asset4 diff --git a/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/list_of_blueprints.yaml b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/list_of_blueprints.yaml new file mode 100644 index 0000000000000..12e50c2233245 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/blueprint_tests/yaml_files/list_of_blueprints.yaml @@ -0,0 +1,3 @@ +- key: asset1 +- key: asset2 +- key: asset3