diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index dda6c4fcdbad4..800ade27b64e2 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -254,6 +254,9 @@ def get_execution_set_asset_and_check_keys( def assets_defs(self) -> Sequence[AssetsDefinition]: return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) + def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]: + return list(dict.fromkeys([self.get(key).assets_def for key in keys])) + @property def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index a6ac727722166..a9fd12810d76c 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -5,7 +5,6 @@ Any, Dict, Iterable, - List, Mapping, Optional, Sequence, @@ -18,6 +17,7 @@ import dagster._check as check from dagster._core.definitions.hook_definition import HookDefinition +from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.policy import RetryPolicy from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.selector.subset_selector import AssetSelectionData @@ -61,58 +61,37 @@ def is_base_asset_job_name(name: str) -> bool: def get_base_asset_jobs( - assets: Sequence[AssetsDefinition], - asset_checks: Sequence[AssetChecksDefinition], + asset_graph: InternalAssetGraph, resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - executable_assets = [a for a in assets if a.is_executable] - unexecutable_assets = [a for a in assets if not a.is_executable] - - executable_assets_by_partitions_def: Dict[ - Optional[PartitionsDefinition], List[AssetsDefinition] - ] = defaultdict(list) - for asset in executable_assets: - executable_assets_by_partitions_def[asset.partitions_def].append(asset) - # sort to ensure some stability in the ordering - all_partitions_defs = sorted( - [p for p in executable_assets_by_partitions_def.keys() if p], key=repr - ) - - if len(all_partitions_defs) == 0: + if len(asset_graph.all_partitions_defs) == 0: + executable_asset_keys = asset_graph.executable_asset_keys + loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - executable_assets=executable_assets, - loadable_assets=unexecutable_assets, - asset_checks=asset_checks, + executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), + loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), + asset_checks=asset_graph.asset_checks_defs, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, []) jobs = [] - - for i, partitions_def in enumerate(all_partitions_defs): - # all base jobs contain all unpartitioned assets - executable_assets_for_job = [ - *executable_assets_by_partitions_def[partitions_def], - *unpartitioned_executable_assets, - ] + for i, partitions_def in enumerate(asset_graph.all_partitions_defs): + executable_asset_keys = asset_graph.executable_asset_keys & { + *asset_graph.asset_keys_for_partitions_def(partitions_def=partitions_def), + *asset_graph.unpartitioned_asset_keys, + } + loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - executable_assets=executable_assets_for_job, - loadable_assets=[ - *( - asset - for asset in executable_assets - if asset not in executable_assets_for_job - ), - *unexecutable_assets, - ], - asset_checks=asset_checks, + executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), + loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), + asset_checks=asset_graph.asset_checks_defs, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 7262a3af9c1a9..83abc62821027 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -215,9 +215,20 @@ def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]: """ return [set(level) for level in toposort.toposort(self.asset_dep_graph["upstream"])] + @property + @cached_method + def unpartitioned_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if not node.is_partitioned} + def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.group_name == group_name} + @cached_method + def asset_keys_for_partitions_def( + self, *, partitions_def: PartitionsDefinition + ) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if node.partitions_def == partitions_def} + @cached_property def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]: """Materializable asset keys that have no materializable parents.""" @@ -236,6 +247,12 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: return {key for asset in self.asset_nodes for key in asset.check_keys} + @cached_property + def all_partitions_defs(self) -> Sequence[PartitionsDefinition]: + return sorted( + set(node.partitions_def for node in self.asset_nodes if node.partitions_def), key=repr + ) + @cached_property def all_group_names(self) -> AbstractSet[str]: return {a.group_name for a in self.asset_nodes if a.group_name is not None} diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py new file mode 100644 index 0000000000000..104c8e37c5fbd --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -0,0 +1,258 @@ +from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union + +from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_graph import AssetGraph, AssetKeyOrCheckKey, AssetNode +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET, + AssetSpec, +) +from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.events import AssetKey +from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.metadata import ArbitraryMetadataMapping +from dagster._core.definitions.partition import PartitionsDefinition +from dagster._core.definitions.partition_mapping import PartitionMapping +from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.selector.subset_selector import generate_asset_dep_graph +from dagster._utils.cached_method import cached_method + + +class LocalAssetNode(AssetNode): + def __init__( + self, key: AssetKey, assets_def: AssetsDefinition, check_keys: AbstractSet[AssetCheckKey] + ): + self.key = key + self.assets_def = assets_def + self._check_keys = check_keys + + @property + @cached_method + def group_name(self) -> Optional[str]: + return self.assets_def.group_names_by_key.get(self.key) + + @property + def is_materializable(self) -> bool: + return self.assets_def.is_materializable + + @property + def is_observable(self) -> bool: + return self.assets_def.is_observable + + @property + def is_external(self) -> bool: + return self.assets_def.is_external + + @property + def is_executable(self) -> bool: + return self.assets_def.is_executable + + @property + def metadata(self) -> ArbitraryMetadataMapping: + return self.assets_def.metadata_by_key.get(self.key, {}) + + @property + def is_partitioned(self) -> bool: + return self.assets_def.partitions_def is not None + + @property + def partitions_def(self) -> Optional[PartitionsDefinition]: + return self.assets_def.partitions_def + + @property + def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: + return self.assets_def.partition_mappings + + @property + def freshness_policy(self) -> Optional[FreshnessPolicy]: + return self.assets_def.freshness_policies_by_key.get(self.key) + + @property + def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: + return self.assets_def.auto_materialize_policies_by_key.get(self.key) + + @property + def auto_observe_interval_minutes(self) -> Optional[float]: + return self.assets_def.auto_observe_interval_minutes + + @property + def backfill_policy(self) -> Optional[BackfillPolicy]: + return self.assets_def.backfill_policy + + @property + def code_version(self) -> Optional[str]: + return self.assets_def.code_versions_by_key.get(self.key) + + @property + def check_keys(self) -> AbstractSet[AssetCheckKey]: + return self._check_keys + + @property + def execution_set_asset_keys(self) -> AbstractSet[AssetKey]: + return ( + {self.key} + if len(self.assets_def.keys) <= 1 or self.assets_def.can_subset + else self.assets_def.keys + ) + + @property + def execution_set_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]: + if self.assets_def.can_subset or ( + len(self.assets_def.keys) <= 1 and not len(self.assets_def.check_keys) > 0 + ): + return {self.key} + else: + return {*self.assets_def.keys, *self.assets_def.check_keys} + + +class InternalAssetGraph(AssetGraph[LocalAssetNode]): + _asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition] + + def __init__( + self, + asset_nodes_by_key: Mapping[AssetKey, LocalAssetNode], + asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition], + ): + self.asset_nodes_by_key = asset_nodes_by_key + self._asset_checks_defs_by_key = asset_checks_defs_by_key + self._asset_nodes_by_check_key = { + **{ + check_key: asset + for asset in asset_nodes_by_key.values() + for check_key in asset.check_keys + }, + **{ + key: asset_nodes_by_key[checks_def.asset_key] + for key, checks_def in asset_checks_defs_by_key.items() + if checks_def.asset_key in asset_nodes_by_key + }, + } + + @staticmethod + def normalize_assets( + assets: Iterable[Union[AssetsDefinition, SourceAsset]], + checks_defs: Optional[Iterable[AssetChecksDefinition]] = None, + ) -> Sequence[AssetsDefinition]: + """Normalize a mixed list of AssetsDefinition and SourceAsset to a list of AssetsDefinition. + + Normalization includse: + + - Converting any SourceAsset to an AssetDefinition + - Resolving all relative asset keys (that sometimes specify dependencies) to absolute asset + keys + - Creating unexecutable external asset definitions for any keys referenced by asset checks + or as dependencies, but for which no definition was provided. + """ + from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + external_asset_from_spec, + ) + + checks_defs = checks_defs or [] + + # Convert any source assets to external assets + assets_defs = [ + create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a + for a in assets + ] + all_keys = {k for asset_def in assets_defs for k in asset_def.keys} + + # Resolve all asset dependencies. An asset dependency is resolved when its key is an + # AssetKey not subject to any further manipulation. + resolved_deps = ResolvedAssetDependencies(assets_defs, []) + assets_defs = [ + ad.with_attributes( + input_asset_key_replacements={ + raw_key: resolved_deps.get_resolved_asset_key_for_input(ad, input_name) + for input_name, raw_key in ad.keys_by_input_name.items() + } + ) + for ad in assets_defs + ] + + # Create unexecutable external assets definitions for any referenced keys for which no + # definition was provided. + all_referenced_asset_keys = { + *(key for asset_def in assets_defs for key in asset_def.dependency_keys), + *(checks_def.asset_key for checks_def in checks_defs), + } + for key in all_referenced_asset_keys.difference(all_keys): + assets_defs.append( + external_asset_from_spec( + AssetSpec(key=key, metadata={SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET: True}) + ) + ) + return assets_defs + + @classmethod + def from_assets( + cls, + assets: Iterable[Union[AssetsDefinition, SourceAsset]], + asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, + ) -> "InternalAssetGraph": + asset_checks = asset_checks or [] + assets_defs = cls.normalize_assets(assets, asset_checks) + + asset_nodes_by_key = { + k: LocalAssetNode( + key=k, + assets_def=ad, + check_keys={ + *ad.check_keys, + *(ck for cd in asset_checks if cd.asset_key == k for ck in cd.keys), + }, + ) + for ad in assets_defs + for k in ad.keys + } + dep_graph = generate_asset_dep_graph(assets_defs, []) + for key, node in asset_nodes_by_key.items(): + node.set_children( + {asset_nodes_by_key[child_key] for child_key in dep_graph["downstream"][key]} + ) + node.set_parents( + {asset_nodes_by_key[parent_key] for parent_key in dep_graph["upstream"][key]} + ) + + asset_checks_defs_by_key = { + key: check for check in (asset_checks or []) for key in check.keys + } + + return InternalAssetGraph( + asset_nodes_by_key=asset_nodes_by_key, + asset_checks_defs_by_key=asset_checks_defs_by_key, + ) + + ##### COMMON ASSET GRAPH INTERFACE + + def get_execution_set_asset_and_check_keys( + self, asset_or_check_key: AssetKeyOrCheckKey + ) -> AbstractSet[AssetKeyOrCheckKey]: + if isinstance(asset_or_check_key, AssetKey): + return self.get(asset_or_check_key).execution_set_asset_and_check_keys + # only checks emitted by AssetsDefinition have required keys + elif asset_or_check_key in self._asset_checks_defs_by_key: + return {asset_or_check_key} + else: + asset_node = self._asset_nodes_by_check_key[asset_or_check_key] + asset_unit_keys = asset_node.execution_set_asset_and_check_keys + return ( + asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key} + ) + + ##### INTERNAL-SPECIFIC INTERFACE + + @property + @cached_method + def assets_defs(self) -> Sequence[AssetsDefinition]: + return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) + + def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]: + return list(dict.fromkeys([self.get(key).assets_def for key in keys])) + + @property + def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: + return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 8a6ba28a135e3..4f301fb8230e8 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -50,6 +50,7 @@ from .valid_definitions import VALID_REPOSITORY_DATA_DICT_KEYS, RepositoryListDefinition if TYPE_CHECKING: + from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.events import AssetKey @@ -162,6 +163,7 @@ def build_caching_repository_data_from_list( sensors: Dict[str, SensorDefinition] = {} assets_defs: List[AssetsDefinition] = [] asset_keys: Set[AssetKey] = set() + asset_check_keys: Set[AssetCheckKey] = set() source_assets: List[SourceAsset] = [] asset_checks_defs: List[AssetChecksDefinition] = [] for definition in repository_definitions: @@ -228,6 +230,10 @@ def build_caching_repository_data_from_list( source_assets.append(definition) asset_keys.add(definition.key) elif isinstance(definition, AssetChecksDefinition): + for key in definition.keys: + if key in asset_check_keys: + raise DagsterInvalidDefinitionError(f"Duplicate asset check key: {key}") + asset_check_keys.update(definition.keys) asset_checks_defs.append(definition) else: check.failed(f"Unexpected repository entry {definition}") @@ -237,10 +243,9 @@ def build_caching_repository_data_from_list( ) if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( - assets=asset_graph.assets_defs, + asset_graph=asset_graph, executor_def=default_executor_def, resource_defs=top_level_resources, - asset_checks=asset_checks_defs, ): jobs[job_def.name] = job_def diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py index e767a695a2d2e..8258d87619d4d 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py @@ -2,6 +2,7 @@ from typing_extensions import TypeAlias +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.schedule_definition import ScheduleDefinition @@ -33,6 +34,7 @@ RepositoryListDefinition: TypeAlias = Union[ "AssetsDefinition", + AssetChecksDefinition, GraphDefinition, JobDefinition, ScheduleDefinition, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index a7151544da66c..99a525d02042d 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1951,16 +1951,17 @@ def hourly_asset(): ... def unpartitioned_asset(): ... jobs = get_base_asset_jobs( - assets=[ - daily_asset, - daily_asset2, - daily_asset_different_start_date, - hourly_asset, - unpartitioned_asset, - ], + asset_graph=AssetGraph.from_assets( + [ + daily_asset, + daily_asset2, + daily_asset_different_start_date, + hourly_asset, + unpartitioned_asset, + ] + ), executor_def=None, resource_defs={}, - asset_checks=[], ) assert len(jobs) == 3 assert {job_def.name for job_def in jobs} == { @@ -1995,14 +1996,15 @@ def asset_b(): ... def asset_x(asset_b: B): ... jobs = get_base_asset_jobs( - assets=[ - asset_x, - create_external_asset_from_source_asset(asset_a), - create_external_asset_from_source_asset(asset_b), - ], + asset_graph=AssetGraph.from_assets( + [ + asset_x, + create_external_asset_from_source_asset(asset_a), + create_external_asset_from_source_asset(asset_b), + ] + ), executor_def=None, resource_defs={}, - asset_checks=[], ) assert len(jobs) == 2 assert {job_def.name for job_def in jobs} == { diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py index 35088614fb468..a533f07ed70a6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py @@ -380,7 +380,7 @@ def check1(context: AssetExecutionContext): ... with pytest.raises( DagsterInvalidDefinitionError, - match='Detected conflicting node definitions with the same name "asset1_check1"', + match="Duplicate asset check key.+asset1.+check1", ): Definitions(asset_checks=[make_check(), make_check()])