diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py index 460854cb6a89f..f89c2c944cfd9 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py @@ -136,7 +136,7 @@ def _fetch_checks( can_execute_individually = ( GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE if len( - asset_graph.get_required_asset_and_check_keys(external_check.key) or [] + asset_graph.get_execution_unit_asset_and_check_keys(external_check.key) ) <= 1 # NOTE: once we support multi checks, we'll need to add a case for diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index e1280c85d7ce2..3f8526270b254 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -146,7 +146,7 @@ def get_additional_required_keys( # the set of atomic execution ids that any of the input asset keys are a part of required_atomic_execution_ids = { - asset_nodes_by_key[asset_key].external_asset_node.atomic_execution_unit_id + asset_nodes_by_key[asset_key].external_asset_node.execution_unit_id for asset_key in asset_keys } - {None} @@ -154,7 +154,7 @@ def get_additional_required_keys( required_asset_keys = { asset_node.external_asset_node.asset_key for asset_node in asset_nodes_by_key.values() - if asset_node.external_asset_node.atomic_execution_unit_id in required_atomic_execution_ids + if asset_node.external_asset_node.execution_unit_id in required_atomic_execution_ids } return list(required_asset_keys - asset_keys) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index d404200bbaa47..0ac2a15656291 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -269,8 +269,9 @@ def get_asset_condition_evaluations( # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them - if num_requested > 0: - for neighbor_key in self.asset_graph.get_required_multi_asset_keys(asset_key): + execution_unit_keys = self.asset_graph.get_execution_unit_asset_keys(asset_key) + if len(execution_unit_keys) > 1 and num_requested > 0: + for neighbor_key in execution_unit_keys: expected_data_time_mapping[neighbor_key] = expected_data_time # make sure that the true_subset of the neighbor is accurate -- when it was diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 31d453c76902c..c48482dcb2196 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -521,14 +521,19 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]: visited.add(parent_key) @abstractmethod - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """For a given asset_key, return the set of asset keys that must be materialized at the same time.""" + def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + """For a given asset_key, return the set of asset keys that must be + materialized at the same time. + """ ... @abstractmethod - def get_required_asset_and_check_keys( + def get_execution_unit_asset_and_check_keys( self, asset_key_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: + """For a given asset/check key, return the set of asset/check keys that must be + materialized/computed at the same time. + """ ... @cached_method @@ -651,13 +656,13 @@ def bfs_filter_asset_partitions( Visits parents before children. - When asset partitions are part of the same non-subsettable multi-asset, they're provided all - at once to the condition_fn. + When asset partitions are part of the same execution unit (non-subsettable multi-asset), + they're provided all at once to the condition_fn. """ all_nodes = set(initial_asset_partitions) # invariant: we never consider an asset partition before considering its ancestors - queue = ToposortedPriorityQueue(self, all_nodes, include_required_multi_assets=True) + queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_unit=True) result: Set[AssetKeyPartitionKey] = set() @@ -752,10 +757,10 @@ def __init__( self, asset_graph: AssetGraph, items: Iterable[AssetKeyPartitionKey], - include_required_multi_assets: bool, + include_full_execution_unit: bool, ): self._asset_graph = asset_graph - self._include_required_multi_assets = include_required_multi_assets + self._include_full_execution_unit = include_full_execution_unit asset_keys_by_level = toposort.toposort(asset_graph.asset_dep_graph["upstream"]) self._toposort_level_by_asset_key = { @@ -770,8 +775,9 @@ def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None: heappush(self._heap, self._queue_item(asset_partition)) def dequeue(self) -> Iterable[AssetKeyPartitionKey]: - # For multi-assets, will include all required multi-asset keys if include_required_multi_assets is set to - # True, or a list of size 1 with just the passed in asset key if it was not + # For multi-assets, will include all required multi-asset keys if + # include_full_execution_unit is set to True, or a list of size 1 with just the passed in + # asset key if it was not. return heappop(self._heap).asset_partitions def _queue_item( @@ -779,25 +785,19 @@ def _queue_item( ) -> "ToposortedPriorityQueue.QueueItem": asset_key = asset_partition.asset_key - if self._include_required_multi_assets: - required_multi_asset_keys = self._asset_graph.get_required_multi_asset_keys( - asset_key - ) | {asset_key} + if self._include_full_execution_unit: + execution_unit_keys = self._asset_graph.get_execution_unit_asset_keys(asset_key) else: - required_multi_asset_keys = {asset_key} + execution_unit_keys = {asset_key} level = max( - self._toposort_level_by_asset_key[required_asset_key] - for required_asset_key in required_multi_asset_keys + self._toposort_level_by_asset_key[asset_key] for asset_key in execution_unit_keys ) return ToposortedPriorityQueue.QueueItem( level, sort_key_for_asset_partition(self._asset_graph, asset_partition), - [ - AssetKeyPartitionKey(ak, asset_partition.partition_key) - for ak in required_multi_asset_keys - ], + [AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_unit_keys], ) def __len__(self) -> int: diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index cc16ca9aeae3a..5e2be9e2f82d3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -619,7 +619,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: selection = self.child.resolve_inner(asset_graph) output = set(selection) for asset_key in selection: - output.update(asset_graph.get_required_multi_asset_keys(asset_key)) + output.update(asset_graph.get_execution_unit_asset_keys(asset_key)) return output def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection": diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index cce5f43f66257..899fcff62c049 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -237,37 +237,33 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: return {node.asset_key for node in self.asset_nodes if job_name in node.job_names} - def get_required_asset_and_check_keys( + def get_execution_unit_asset_and_check_keys( self, asset_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): - atomic_execution_unit_id = self.get_asset_node( - asset_or_check_key - ).atomic_execution_unit_id + execution_unit_id = self.get_asset_node(asset_or_check_key).execution_unit_id else: # AssetCheckKey - atomic_execution_unit_id = self.get_asset_check( - asset_or_check_key - ).atomic_execution_unit_id - if atomic_execution_unit_id is None: - return set() + execution_unit_id = self.get_asset_check(asset_or_check_key).execution_unit_id + if execution_unit_id is None: + return {asset_or_check_key} else: return { *( node.asset_key for node in self.asset_nodes - if node.atomic_execution_unit_id == atomic_execution_unit_id + if node.execution_unit_id == execution_unit_id ), *( node.key for node in self.asset_checks - if node.atomic_execution_unit_id == atomic_execution_unit_id + if node.execution_unit_id == execution_unit_id ), } - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: return { key - for key in self.get_required_asset_and_check_keys(asset_key) + for key in self.get_execution_unit_asset_and_check_keys(asset_key) if isinstance(key, AssetKey) } diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 07e1c82303ad5..163d406445d72 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -136,11 +136,11 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: if ad.group_names_by_key[key] == group_name } - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: asset = self.get_assets_def(asset_key) - return set() if len(asset.keys) <= 1 or asset.can_subset else asset.keys + return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys - def get_required_asset_and_check_keys( + def get_execution_unit_asset_and_check_keys( self, asset_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): @@ -148,14 +148,14 @@ def get_required_asset_and_check_keys( else: # AssetCheckKey # only checks emitted by AssetsDefinition have required keys if self.has_asset_check(asset_or_check_key): - return set() + return {asset_or_check_key} else: asset = self.get_assets_def_for_check(asset_or_check_key) if asset is None or asset_or_check_key not in asset.check_keys: - return set() + return {asset_or_check_key} has_checks = len(asset.check_keys) > 0 if asset.can_subset or len(asset.keys) <= 1 and not has_checks: - return set() + return {asset_or_check_key} else: return {*asset.keys, *asset.check_keys} diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 1888fb0739cde..1b5ec14bd6a9e 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1126,7 +1126,7 @@ def __new__( ) -@whitelist_for_serdes +@whitelist_for_serdes(storage_field_names={"execution_unit_id": "atomic_execution_unit_id"}) class ExternalAssetCheck( NamedTuple( "_ExternalAssetCheck", @@ -1134,7 +1134,7 @@ class ExternalAssetCheck( ("name", str), ("asset_key", AssetKey), ("description", Optional[str]), - ("atomic_execution_unit_id", Optional[str]), + ("execution_unit_id", Optional[str]), ("job_names", Sequence[str]), ], ) @@ -1146,7 +1146,7 @@ def __new__( name: str, asset_key: AssetKey, description: Optional[str], - atomic_execution_unit_id: Optional[str] = None, + execution_unit_id: Optional[str] = None, job_names: Optional[Sequence[str]] = None, ): return super(ExternalAssetCheck, cls).__new__( @@ -1154,9 +1154,7 @@ def __new__( name=check.str_param(name, "name"), asset_key=check.inst_param(asset_key, "asset_key", AssetKey), description=check.opt_str_param(description, "description"), - atomic_execution_unit_id=check.opt_str_param( - atomic_execution_unit_id, "automic_execution_unit_id" - ), + execution_unit_id=check.opt_str_param(execution_unit_id, "automic_execution_unit_id"), job_names=check.opt_sequence_param(job_names, "job_names", of_type=str), ) @@ -1166,7 +1164,10 @@ def key(self) -> AssetCheckKey: @whitelist_for_serdes( - storage_field_names={"metadata": "metadata_entries"}, + storage_field_names={ + "metadata": "metadata_entries", + "execution_unit_id": "atomic_execution_unit_id", + }, field_serializers={"metadata": MetadataFieldSerializer}, ) class ExternalAssetNode( @@ -1196,9 +1197,9 @@ class ExternalAssetNode( ("is_source", bool), ("is_observable", bool), # If a set of assets can't be materialized independently from each other, they will all - # have the same atomic_execution_unit_id. This ID should be stable across reloads and + # have the same execution_unit_id. This ID should be stable across reloads and # unique deployment-wide. - ("atomic_execution_unit_id", Optional[str]), + ("execution_unit_id", Optional[str]), ("required_top_level_resources", Optional[Sequence[str]]), ("auto_materialize_policy", Optional[AutoMaterializePolicy]), ("backfill_policy", Optional[BackfillPolicy]), @@ -1234,7 +1235,7 @@ def __new__( freshness_policy: Optional[FreshnessPolicy] = None, is_source: Optional[bool] = None, is_observable: bool = False, - atomic_execution_unit_id: Optional[str] = None, + execution_unit_id: Optional[str] = None, required_top_level_resources: Optional[Sequence[str]] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, @@ -1315,9 +1316,7 @@ def __new__( ), is_source=check.bool_param(is_source, "is_source"), is_observable=check.bool_param(is_observable, "is_observable"), - atomic_execution_unit_id=check.opt_str_param( - atomic_execution_unit_id, "atomic_execution_unit_id" - ), + execution_unit_id=check.opt_str_param(execution_unit_id, "execution_unit_id"), required_top_level_resources=check.opt_sequence_param( required_top_level_resources, "required_top_level_resources", of_type=str ), @@ -1528,14 +1527,14 @@ def external_asset_checks_from_defs( of_type=AssetChecksDefinition, additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition", ) - atomic_execution_unit_id = None + execution_unit_id = None elif isinstance(first_node, AssetsDefinition): check.is_list( nodes, of_type=AssetsDefinition, additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition", ) - atomic_execution_unit_id = first_node.unique_id if not first_node.can_subset else None + execution_unit_id = first_node.unique_id if not first_node.can_subset else None else: check.failed(f"Unexpected node type {first_node}") @@ -1545,7 +1544,7 @@ def external_asset_checks_from_defs( name=check_key.name, asset_key=check_key.asset_key, description=spec.description, - atomic_execution_unit_id=atomic_execution_unit_id, + execution_unit_id=execution_unit_id, job_names=job_names_by_check_key[check_key], ) ) @@ -1573,7 +1572,7 @@ def external_asset_nodes_from_defs( code_version_by_asset_key: Dict[AssetKey, Optional[str]] = dict() group_name_by_asset_key: Dict[AssetKey, str] = {} descriptions_by_asset_key: Dict[AssetKey, str] = {} - atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {} + execution_unit_ids: Dict[Union[AssetKey, AssetCheckKey], str] = {} owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {} execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {} is_observable_by_key: Dict[AssetKey, bool] = {} @@ -1634,12 +1633,12 @@ def external_asset_nodes_from_defs( {key: assets_def.auto_observe_interval_minutes for key in assets_def.keys} ) if len(assets_def.keys) > 1 and not assets_def.can_subset: - atomic_execution_unit_id = assets_def.unique_id + execution_unit_id = assets_def.unique_id for asset_key in assets_def.keys: - atomic_execution_unit_ids_by_key[asset_key] = atomic_execution_unit_id + execution_unit_ids[asset_key] = execution_unit_id if len(assets_def.keys) == 1 and assets_def.check_keys and not assets_def.can_subset: - atomic_execution_unit_ids_by_key[assets_def.key] = assets_def.unique_id + execution_unit_ids[assets_def.key] = assets_def.unique_id group_name_by_asset_key.update(asset_layer.group_names_by_assets()) @@ -1725,7 +1724,7 @@ def external_asset_nodes_from_defs( freshness_policy=freshness_policy_by_asset_key.get(asset_key), auto_materialize_policy=auto_materialize_policy_by_asset_key.get(asset_key), backfill_policy=backfill_policy_by_asset_key.get(asset_key), - atomic_execution_unit_id=atomic_execution_unit_ids_by_key.get(asset_key), + execution_unit_id=execution_unit_ids.get(asset_key), required_top_level_resources=required_top_level_resources, owners=[ owner.email if isinstance(owner, UserAssetOwner) else owner.team diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index d56994451c91e..e9bf80d073763 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -96,7 +96,7 @@ def asset3(asset1, asset2): assert asset_graph.get_children(asset0.key) == {asset1.key, asset2.key} assert asset_graph.get_parents(asset3.key) == {asset1.key, asset2.key} for asset_def in assets: - assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() + assert asset_graph.get_execution_unit_asset_keys(asset_def.key) == {asset_def.key} assert asset_graph.get_code_version(asset0.key) == "1" assert asset_graph.get_code_version(asset1.key) is None @@ -366,7 +366,7 @@ def non_subsettable_multi_asset(): asset_graph = asset_graph_from_assets([non_subsettable_multi_asset]) for asset_key in non_subsettable_multi_asset.keys: assert ( - asset_graph.get_required_multi_asset_keys(asset_key) == non_subsettable_multi_asset.keys + asset_graph.get_execution_unit_asset_keys(asset_key) == non_subsettable_multi_asset.keys ) @@ -382,7 +382,7 @@ def subsettable_multi_asset(): asset_graph = asset_graph_from_assets([subsettable_multi_asset]) for asset_key in subsettable_multi_asset.keys: - assert asset_graph.get_required_multi_asset_keys(asset_key) == set() + assert asset_graph.get_execution_unit_asset_keys(asset_key) == {asset_key} def test_required_multi_asset_sets_graph_backed_multi_asset( @@ -405,7 +405,7 @@ def graph1(): graph_backed_multi_asset = AssetsDefinition.from_graph(graph1) asset_graph = asset_graph_from_assets([graph_backed_multi_asset]) for asset_key in graph_backed_multi_asset.keys: - assert asset_graph.get_required_multi_asset_keys(asset_key) == graph_backed_multi_asset.keys + assert asset_graph.get_execution_unit_asset_keys(asset_key) == graph_backed_multi_asset.keys def test_required_multi_asset_sets_same_op_in_different_assets( @@ -421,7 +421,7 @@ def op1(): asset_graph = asset_graph_from_assets(assets) for asset_def in assets: - assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() + assert asset_graph.get_execution_unit_asset_keys(asset_def.key) == {asset_def.key} def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]): @@ -814,8 +814,8 @@ def check0(): ... asset_graph = asset_graph_from_assets([asset0], asset_checks=[check0]) - assert asset_graph.get_required_asset_and_check_keys(asset0.key) == set() - assert asset_graph.get_required_asset_and_check_keys(check0.spec.key) == set() + assert asset_graph.get_execution_unit_asset_and_check_keys(asset0.key) == {asset0.key} + assert asset_graph.get_execution_unit_asset_and_check_keys(check0.spec.key) == {check0.spec.key} def test_required_assets_and_checks_by_key_asset_decorator( @@ -836,9 +836,9 @@ def check0(): grouped_keys = [asset0.key, foo_check.key, bar_check.key] for key in grouped_keys: - assert asset_graph.get_required_asset_and_check_keys(key) == set(grouped_keys) + assert asset_graph.get_execution_unit_asset_and_check_keys(key) == set(grouped_keys) - assert asset_graph.get_required_asset_and_check_keys(check0.spec.key) == set() + assert asset_graph.get_execution_unit_asset_and_check_keys(check0.spec.key) == {check0.spec.key} def test_required_assets_and_checks_by_key_multi_asset( @@ -873,14 +873,14 @@ def subsettable_asset_fn(): bar_check.key, ] for key in grouped_keys: - assert asset_graph.get_required_asset_and_check_keys(key) == set(grouped_keys) + assert asset_graph.get_execution_unit_asset_and_check_keys(key) == set(grouped_keys) for key in [ AssetKey(["subsettable_asset0"]), AssetKey(["subsettable_asset1"]), biz_check.key, ]: - assert asset_graph.get_required_asset_and_check_keys(key) == set() + assert asset_graph.get_execution_unit_asset_and_check_keys(key) == {key} def test_required_assets_and_checks_by_key_multi_asset_single_asset( @@ -900,4 +900,4 @@ def asset_fn(): asset_graph = asset_graph_from_assets([asset_fn]) for key in [AssetKey(["asset0"]), foo_check.key, bar_check.key]: - assert asset_graph.get_required_asset_and_check_keys(key) == set() + assert asset_graph.get_execution_unit_asset_and_check_keys(key) == {key} diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index ccf8dc718059d..a6c815ad67a9f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -568,7 +568,7 @@ def assets(): Definitions(assets=[assets], jobs=[assets_job]) ) - atomic_execution_unit_id = assets.unique_id + execution_unit_id = assets.unique_id assert external_asset_nodes == [ ExternalAssetNode( @@ -584,7 +584,7 @@ def assets(): job_names=["__ASSET_JOB", "assets_job"], output_name=f"out{i}", group_name=DEFAULT_GROUP_NAME, - atomic_execution_unit_id=atomic_execution_unit_id, + execution_unit_id=execution_unit_id, ) for i in range(10) ]