Skip to content

Commit

Permalink
[blueprints] add capability to load list of blueprints from file (#22274
Browse files Browse the repository at this point in the history
)

## Summary

Allows `load_blueprints_from_yaml` to take a type which is a
`Sequence[Blueprint]`, meaning it will load many blueprints from the top
level of a file.

## Test Plan

New unit tests.
  • Loading branch information
benpankow authored Jun 4, 2024
1 parent ef16763 commit 9d831d9
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 19 deletions.
50 changes: 40 additions & 10 deletions python_modules/dagster/dagster/_core/blueprints/load_from_yaml.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from pathlib import Path
from typing import Any, Dict, NamedTuple, Optional, Type, Union
from typing import Any, Dict, NamedTuple, Optional, Sequence, Type, Union, cast

from typing_extensions import get_args, get_origin

from dagster import (
Definitions,
_check as check,
)
from dagster._config.pythonic_config.type_check_utils import safe_is_subclass
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.metadata.source_code import (
CodeReferencesMetadataSet,
CodeReferencesMetadataValue,
LocalFileCodeReference,
)
from dagster._model.pydantic_compat_layer import json_schema_from_type
from dagster._utils.pydantic_yaml import parse_yaml_file_to_pydantic
from dagster._utils.pydantic_yaml import (
parse_yaml_file_to_pydantic,
parse_yaml_file_to_pydantic_sequence,
)

from .blueprint import Blueprint, BlueprintDefinitions

Expand Down Expand Up @@ -82,16 +88,18 @@ def _attach_code_references_to_definitions(
def load_defs_from_yaml(
*,
path: Union[Path, str],
per_file_blueprint_type: Type[Blueprint],
per_file_blueprint_type: Union[Type[Blueprint], Type[Sequence[Blueprint]]],
resources: Optional[Dict[str, Any]] = None,
) -> Definitions:
"""Load Dagster definitions from a YAML file of blueprints.
Args:
path (Path | str): The path to the YAML file or directory of YAML files containing the
blueprints for Dagster definitions.
per_file_blueprint_type (type[Blueprint]): The type of blueprint that each of the YAML
files are expected to conform to.
per_file_blueprint_type (Union[Type[Blueprint], Sequence[Type[Blueprint]]]): The type
of blueprint that each of the YAML files are expected to conform to. If a sequence
type is provided, the function will expect each YAML file to contain a list of
blueprints.
resources (Dict[str, Any], optional): A dictionary of resources to be bound to the
definitions. Defaults to None.
Expand All @@ -106,10 +114,32 @@ def load_defs_from_yaml(
else:
file_paths = list(resolved_path.rglob("*.yaml")) + list(resolved_path.rglob("*.yml"))

blueprints = [
parse_yaml_file_to_pydantic(per_file_blueprint_type, file_path.read_text(), str(file_path))
for file_path in file_paths
]
origin = get_origin(per_file_blueprint_type)
if safe_is_subclass(origin, Sequence):
args = get_args(per_file_blueprint_type)
check.invariant(
args and len(args) == 1,
"Sequence type annotation must have a single Blueprint type argument",
)

# flatten the list of blueprints from all files
blueprints = [
blueprint
for file_path in file_paths
for blueprint in parse_yaml_file_to_pydantic_sequence(
args[0], file_path.read_text(), str(file_path)
)
]

else:
blueprints = [
parse_yaml_file_to_pydantic(
cast(Type[Blueprint], per_file_blueprint_type),
file_path.read_text(),
str(file_path),
)
for file_path in file_paths
]

def_sets_with_code_references = [
_attach_code_references_to_definitions(
Expand All @@ -130,7 +160,7 @@ class YamlBlueprintsLoader(NamedTuple):
"""

path: Path
per_file_blueprint_type: Type[Blueprint]
per_file_blueprint_type: Union[Type[Blueprint], Type[Sequence[Blueprint]]]

def load_defs(self) -> Definitions:
return load_defs_from_yaml(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Type
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Sequence, Type, Union

import pydantic
from pydantic import BaseModel, ValidationError
Expand Down Expand Up @@ -154,7 +154,7 @@ def build_validation_error(
)


def json_schema_from_type(model_type: Type[BaseModel]):
def json_schema_from_type(model_type: Union[Type[BaseModel], Type[Sequence[BaseModel]]]):
"""Pydantic version stable way to get the JSON schema for a Pydantic model."""
# This nicely handles the case where the per_file_blueprint_type is actually
# a union type etc.
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/_utils/pydantic_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pydantic import BaseModel, ValidationError, parse_obj_as

from dagster._core.errors import DagsterInvariantViolationError
from dagster._model.pydantic_compat_layer import USING_PYDANTIC_1

from .source_position import (
Expand Down Expand Up @@ -116,7 +117,7 @@ def parse_yaml_file_to_pydantic_sequence(
parsed = parse_yaml_with_source_positions(src, filename)

if not isinstance(parsed.value, list):
raise ValueError(
raise DagsterInvariantViolationError(
f"Error parsing YAML file {filename}: Expected a list of objects at document root, but got {type(parsed.value)}"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
'description': '''
A blob of user-provided, structured metadata that specifies a set of Dagster definitions,
like assets, jobs, schedules, sensors, resources, or asset checks.

Base class for user-provided types. Users override and provide:
- The set of fields
- A build_defs implementation that generates Dagster Definitions from field values
Expand Down Expand Up @@ -45,6 +45,63 @@
'type': 'object',
})
# ---
# name: test_loader_schema_sequence[1]
dict({
'definitions': dict({
'SimpleAssetBlueprint': dict({
'additionalProperties': False,
'description': '''
A blob of user-provided, structured metadata that specifies a set of Dagster definitions,
like assets, jobs, schedules, sensors, resources, or asset checks.

Base class for user-provided types. Users override and provide:
- The set of fields
- A build_defs implementation that generates Dagster Definitions from field values
''',
'properties': dict({
'key': dict({
'title': 'Key',
'type': 'string',
}),
}),
'required': list([
'key',
]),
'title': 'SimpleAssetBlueprint',
'type': 'object',
}),
}),
'items': dict({
'$ref': '#/definitions/SimpleAssetBlueprint',
}),
'title': 'ParsingModel[Sequence[test_load_defs_from_yaml.test_loader_schema_sequence.<locals>.SimpleAssetBlueprint]]',
'type': 'array',
})
# ---
# name: test_loader_schema_sequence[2]
dict({
'$defs': dict({
'SimpleAssetBlueprint': dict({
'additionalProperties': False,
'properties': dict({
'key': dict({
'title': 'Key',
'type': 'string',
}),
}),
'required': list([
'key',
]),
'title': 'SimpleAssetBlueprint',
'type': 'object',
}),
}),
'items': dict({
'$ref': '#/$defs/SimpleAssetBlueprint',
}),
'type': 'array',
})
# ---
# name: test_loader_schema_union[1]
dict({
'anyOf': list([
Expand All @@ -61,7 +118,7 @@
'description': '''
A blob of user-provided, structured metadata that specifies a set of Dagster definitions,
like assets, jobs, schedules, sensors, resources, or asset checks.

Base class for user-provided types. Users override and provide:
- The set of fields
- A build_defs implementation that generates Dagster Definitions from field values
Expand Down Expand Up @@ -91,7 +148,7 @@
'description': '''
A blob of user-provided, structured metadata that specifies a set of Dagster definitions,
like assets, jobs, schedules, sensors, resources, or asset checks.

Base class for user-provided types. Users override and provide:
- The set of fields
- A build_defs implementation that generates Dagster Definitions from field values
Expand Down Expand Up @@ -133,7 +190,11 @@
'type': dict({
'const': 'bar',
'default': 'bar',
'enum': list([
'bar',
]),
'title': 'Type',
'type': 'string',
}),
}),
'required': list([
Expand All @@ -152,7 +213,11 @@
'type': dict({
'const': 'foo',
'default': 'foo',
'enum': list([
'foo',
]),
'title': 'Type',
'type': 'string',
}),
}),
'required': list([
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import sys
from pathlib import Path
from typing import Literal, Union
from typing import List, Literal, Sequence, Union

import pytest
from dagster import AssetKey, asset, job
from dagster._check import CheckError
from dagster._core.blueprints.blueprint import (
Blueprint,
BlueprintDefinitions,
Expand All @@ -14,7 +16,7 @@
CodeReferencesMetadataSet,
LocalFileCodeReference,
)
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._model.pydantic_compat_layer import USING_PYDANTIC_1, USING_PYDANTIC_2
from pydantic import ValidationError

Expand Down Expand Up @@ -252,7 +254,7 @@ class SimpleAssetBlueprint(Blueprint):
# Pydantic 1 JSON schema has the blueprint as a definition rather than a top-level object
# Pydantic 2 JSON schema has the blueprint as a top-level object
if model_schema["title"] == "ParsingModel[SimpleAssetBlueprint]":
assert model_schema["#ref"] == "#/definitions/SimpleAssetBlueprint"
assert model_schema["$ref"] == "#/definitions/SimpleAssetBlueprint"
model_schema = model_schema["definitions"]["SimpleAssetBlueprint"]

assert model_schema["title"] == "SimpleAssetBlueprint"
Expand All @@ -261,6 +263,21 @@ class SimpleAssetBlueprint(Blueprint):
assert set(model_keys) == {"key"}


@pytest.mark.parametrize("pydantic_version", [2 if USING_PYDANTIC_2 else 1])
def test_loader_schema_sequence(snapshot, pydantic_version: int) -> None:
class SimpleAssetBlueprint(Blueprint):
key: str

loader = YamlBlueprintsLoader(
path=Path(__file__), per_file_blueprint_type=Sequence[SimpleAssetBlueprint]
)

model_schema = loader.model_json_schema()
snapshot.assert_match(model_schema)

assert model_schema["type"] == "array"


@pytest.mark.parametrize("pydantic_version", [2 if USING_PYDANTIC_2 else 1])
def test_loader_schema_union(snapshot, pydantic_version: int) -> None:
class FooAssetBlueprint(Blueprint):
Expand All @@ -285,3 +302,80 @@ class BarAssetBlueprint(Blueprint):
item.get("#ref", item.get("$ref")).split("/")[-1] for item in model_schema["anyOf"]
]
assert set(any_of_refs) == {"FooAssetBlueprint", "BarAssetBlueprint"}


def test_single_file_many_blueprints() -> None:
defs = load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml",
per_file_blueprint_type=List[SimpleAssetBlueprint],
)
assert set(defs.get_asset_graph().all_asset_keys) == {
AssetKey("asset1"),
AssetKey("asset2"),
AssetKey("asset3"),
}

defs = load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml",
per_file_blueprint_type=Sequence[SimpleAssetBlueprint],
)
assert set(defs.get_asset_graph().all_asset_keys) == {
AssetKey("asset1"),
AssetKey("asset2"),
AssetKey("asset3"),
}


# Disabled for Python versions <3.9 as builtin types do not support generics
# until Python 3.9, https://peps.python.org/pep-0585/
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9")
def test_single_file_many_blueprints_builtin_list() -> None:
defs = load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml",
per_file_blueprint_type=list[SimpleAssetBlueprint], # type: ignore
)
assert set(defs.get_asset_graph().all_asset_keys) == {
AssetKey("asset1"),
AssetKey("asset2"),
AssetKey("asset3"),
}


def test_single_file_no_bp_type() -> None:
with pytest.raises(
CheckError, match="Sequence type annotation must have a single Blueprint type argument"
):
load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml",
per_file_blueprint_type=List,
)
with pytest.raises(
CheckError, match="Sequence type annotation must have a single Blueprint type argument"
):
load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "list_of_blueprints.yaml",
per_file_blueprint_type=Sequence,
)


def test_expect_list_no_list() -> None:
with pytest.raises(
DagsterInvariantViolationError, match="Expected a list of objects at document root, but got"
):
load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "single_blueprint.yaml",
per_file_blueprint_type=List[SimpleAssetBlueprint],
)


def test_dir_of_many_blueprints() -> None:
defs = load_defs_from_yaml(
path=Path(__file__).parent / "yaml_files" / "dir_of_lists_of_blueprints",
per_file_blueprint_type=List[SimpleAssetBlueprint],
)
assert set(defs.get_asset_graph().all_asset_keys) == {
AssetKey("asset1"),
AssetKey("asset2"),
AssetKey("asset3"),
AssetKey("asset4"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- key: asset1
- key: asset2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- key: asset3
- key: asset4
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- key: asset1
- key: asset2
- key: asset3

0 comments on commit 9d831d9

Please sign in to comment.