diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py index 460854cb6a89f..410f1d8d705bf 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py @@ -136,7 +136,7 @@ def _fetch_checks( can_execute_individually = ( GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE if len( - asset_graph.get_required_asset_and_check_keys(external_check.key) or [] + asset_graph.get_execution_set_asset_and_check_keys(external_check.key) ) <= 1 # NOTE: once we support multi checks, we'll need to add a case for diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index e1280c85d7ce2..37cd66d28f237 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -145,16 +145,17 @@ def get_additional_required_keys( asset_nodes_by_key = get_asset_nodes_by_asset_key(graphene_info) # the set of atomic execution ids that any of the input asset keys are a part of - required_atomic_execution_ids = { - asset_nodes_by_key[asset_key].external_asset_node.atomic_execution_unit_id + required_execution_set_identifiers = { + asset_nodes_by_key[asset_key].external_asset_node.execution_set_identifier for asset_key in asset_keys } - {None} - # the set of all asset keys that are part of the required atomic execution units + # the set of all asset keys that are part of the required execution sets required_asset_keys = { asset_node.external_asset_node.asset_key for asset_node in asset_nodes_by_key.values() - if asset_node.external_asset_node.atomic_execution_unit_id in required_atomic_execution_ids + if asset_node.external_asset_node.execution_set_identifier + in required_execution_set_identifiers } return list(required_asset_keys - asset_keys) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 464907b1d5589..4914a69c96576 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -269,8 +269,9 @@ def get_asset_condition_evaluations( # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them - if num_requested > 0: - for neighbor_key in self.asset_graph.get_required_multi_asset_keys(asset_key): + execution_unit_keys = self.asset_graph.get_execution_set_asset_keys(asset_key) + if len(execution_unit_keys) > 1 and num_requested > 0: + for neighbor_key in execution_unit_keys: expected_data_time_mapping[neighbor_key] = expected_data_time # make sure that the true_subset of the neighbor is accurate -- when it was diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index fac7963117c79..4d9b0431b7bcd 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -514,14 +514,20 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]: visited.add(parent_key) @abstractmethod - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """For a given asset_key, return the set of asset keys that must be materialized at the same time.""" + def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + """For a given asset_key, return the set of asset keys that must be + materialized at the same time. + """ ... @abstractmethod - def get_required_asset_and_check_keys( + def get_execution_set_asset_and_check_keys( self, asset_key_or_check_key: AssetKeyOrCheckKey - ) -> AbstractSet[AssetKeyOrCheckKey]: ... + ) -> AbstractSet[AssetKeyOrCheckKey]: + """For a given asset/check key, return the set of asset/check keys that must be + materialized/computed at the same time. + """ + ... @cached_method def get_downstream_freshness_policies( @@ -643,13 +649,13 @@ def bfs_filter_asset_partitions( Visits parents before children. - When asset partitions are part of the same non-subsettable multi-asset, they're provided all - at once to the condition_fn. + When asset partitions are part of the same execution set (non-subsettable multi-asset), + they're provided all at once to the condition_fn. """ all_nodes = set(initial_asset_partitions) # invariant: we never consider an asset partition before considering its ancestors - queue = ToposortedPriorityQueue(self, all_nodes, include_required_multi_assets=True) + queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_set=True) result: Set[AssetKeyPartitionKey] = set() @@ -744,10 +750,10 @@ def __init__( self, asset_graph: AssetGraph, items: Iterable[AssetKeyPartitionKey], - include_required_multi_assets: bool, + include_full_execution_set: bool, ): self._asset_graph = asset_graph - self._include_required_multi_assets = include_required_multi_assets + self._include_full_execution_set = include_full_execution_set self._toposort_level_by_asset_key = { asset_key: i @@ -761,8 +767,9 @@ def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None: heappush(self._heap, self._queue_item(asset_partition)) def dequeue(self) -> Iterable[AssetKeyPartitionKey]: - # For multi-assets, will include all required multi-asset keys if include_required_multi_assets is set to - # True, or a list of size 1 with just the passed in asset key if it was not + # For multi-assets, will include all required multi-asset keys if + # include_full_execution_set is set to True, or a list of size 1 with just the passed in + # asset key if it was not. return heappop(self._heap).asset_partitions def _queue_item( @@ -770,25 +777,19 @@ def _queue_item( ) -> "ToposortedPriorityQueue.QueueItem": asset_key = asset_partition.asset_key - if self._include_required_multi_assets: - required_multi_asset_keys = self._asset_graph.get_required_multi_asset_keys( - asset_key - ) | {asset_key} + if self._include_full_execution_set: + execution_set_keys = self._asset_graph.get_execution_set_asset_keys(asset_key) else: - required_multi_asset_keys = {asset_key} + execution_set_keys = {asset_key} level = max( - self._toposort_level_by_asset_key[required_asset_key] - for required_asset_key in required_multi_asset_keys + self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys ) return ToposortedPriorityQueue.QueueItem( level, sort_key_for_asset_partition(self._asset_graph, asset_partition), - [ - AssetKeyPartitionKey(ak, asset_partition.partition_key) - for ak in required_multi_asset_keys - ], + [AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_set_keys], ) def __len__(self) -> int: diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index c772064692f20..9234fd71ff407 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -620,7 +620,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: selection = self.child.resolve_inner(asset_graph) output = set(selection) for asset_key in selection: - output.update(asset_graph.get_required_multi_asset_keys(asset_key)) + output.update(asset_graph.get_execution_set_asset_keys(asset_key)) return output def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection": diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index cce5f43f66257..a8184d50223a7 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -237,37 +237,37 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: return {node.asset_key for node in self.asset_nodes if job_name in node.job_names} - def get_required_asset_and_check_keys( + def get_execution_set_asset_and_check_keys( self, asset_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): - atomic_execution_unit_id = self.get_asset_node( + execution_set_identifier = self.get_asset_node( asset_or_check_key - ).atomic_execution_unit_id + ).execution_set_identifier else: # AssetCheckKey - atomic_execution_unit_id = self.get_asset_check( + execution_set_identifier = self.get_asset_check( asset_or_check_key - ).atomic_execution_unit_id - if atomic_execution_unit_id is None: - return set() + ).execution_set_identifier + if execution_set_identifier is None: + return {asset_or_check_key} else: return { *( node.asset_key for node in self.asset_nodes - if node.atomic_execution_unit_id == atomic_execution_unit_id + if node.execution_set_identifier == execution_set_identifier ), *( node.key for node in self.asset_checks - if node.atomic_execution_unit_id == atomic_execution_unit_id + if node.execution_set_identifier == execution_set_identifier ), } - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: return { key - for key in self.get_required_asset_and_check_keys(asset_key) + for key in self.get_execution_set_asset_and_check_keys(asset_key) if isinstance(key, AssetKey) } diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 07e1c82303ad5..7fe6b4d69d839 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -136,11 +136,11 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: if ad.group_names_by_key[key] == group_name } - def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: + def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: asset = self.get_assets_def(asset_key) - return set() if len(asset.keys) <= 1 or asset.can_subset else asset.keys + return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys - def get_required_asset_and_check_keys( + def get_execution_set_asset_and_check_keys( self, asset_or_check_key: AssetKeyOrCheckKey ) -> AbstractSet[AssetKeyOrCheckKey]: if isinstance(asset_or_check_key, AssetKey): @@ -148,14 +148,14 @@ def get_required_asset_and_check_keys( else: # AssetCheckKey # only checks emitted by AssetsDefinition have required keys if self.has_asset_check(asset_or_check_key): - return set() + return {asset_or_check_key} else: asset = self.get_assets_def_for_check(asset_or_check_key) if asset is None or asset_or_check_key not in asset.check_keys: - return set() + return {asset_or_check_key} has_checks = len(asset.check_keys) > 0 if asset.can_subset or len(asset.keys) <= 1 and not has_checks: - return set() + return {asset_or_check_key} else: return {*asset.keys, *asset.check_keys} diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 1482d5545b228..39441fb1dcfc0 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1127,7 +1127,7 @@ def __new__( ) -@whitelist_for_serdes +@whitelist_for_serdes(storage_field_names={"execution_set_identifier": "atomic_execution_unit_id"}) class ExternalAssetCheck( NamedTuple( "_ExternalAssetCheck", @@ -1135,7 +1135,7 @@ class ExternalAssetCheck( ("name", str), ("asset_key", AssetKey), ("description", Optional[str]), - ("atomic_execution_unit_id", Optional[str]), + ("execution_set_identifier", Optional[str]), ("job_names", Sequence[str]), ], ) @@ -1147,7 +1147,7 @@ def __new__( name: str, asset_key: AssetKey, description: Optional[str], - atomic_execution_unit_id: Optional[str] = None, + execution_set_identifier: Optional[str] = None, job_names: Optional[Sequence[str]] = None, ): return super(ExternalAssetCheck, cls).__new__( @@ -1155,8 +1155,8 @@ def __new__( name=check.str_param(name, "name"), asset_key=check.inst_param(asset_key, "asset_key", AssetKey), description=check.opt_str_param(description, "description"), - atomic_execution_unit_id=check.opt_str_param( - atomic_execution_unit_id, "automic_execution_unit_id" + execution_set_identifier=check.opt_str_param( + execution_set_identifier, "execution_set_identifier" ), job_names=check.opt_sequence_param(job_names, "job_names", of_type=str), ) @@ -1167,7 +1167,10 @@ def key(self) -> AssetCheckKey: @whitelist_for_serdes( - storage_field_names={"metadata": "metadata_entries"}, + storage_field_names={ + "metadata": "metadata_entries", + "execution_set_identifier": "atomic_execution_unit_id", + }, field_serializers={"metadata": MetadataFieldSerializer}, ) class ExternalAssetNode( @@ -1197,9 +1200,9 @@ class ExternalAssetNode( ("is_source", bool), ("is_observable", bool), # If a set of assets can't be materialized independently from each other, they will all - # have the same atomic_execution_unit_id. This ID should be stable across reloads and + # have the same execution_set_identifier. This ID should be stable across reloads and # unique deployment-wide. - ("atomic_execution_unit_id", Optional[str]), + ("execution_set_identifier", Optional[str]), ("required_top_level_resources", Optional[Sequence[str]]), ("auto_materialize_policy", Optional[AutoMaterializePolicy]), ("backfill_policy", Optional[BackfillPolicy]), @@ -1235,7 +1238,7 @@ def __new__( freshness_policy: Optional[FreshnessPolicy] = None, is_source: Optional[bool] = None, is_observable: bool = False, - atomic_execution_unit_id: Optional[str] = None, + execution_set_identifier: Optional[str] = None, required_top_level_resources: Optional[Sequence[str]] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, @@ -1316,8 +1319,8 @@ def __new__( ), is_source=check.bool_param(is_source, "is_source"), is_observable=check.bool_param(is_observable, "is_observable"), - atomic_execution_unit_id=check.opt_str_param( - atomic_execution_unit_id, "atomic_execution_unit_id" + execution_set_identifier=check.opt_str_param( + execution_set_identifier, "execution_set_identifier" ), required_top_level_resources=check.opt_sequence_param( required_top_level_resources, "required_top_level_resources", of_type=str @@ -1529,7 +1532,7 @@ def external_asset_checks_from_defs( of_type=AssetChecksDefinition, additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition", ) - atomic_execution_unit_id = None + execution_set_identifier = None elif isinstance(first_node, AssetsDefinition): check.is_list( nodes, @@ -1539,9 +1542,9 @@ def external_asset_checks_from_defs( # Executing individual checks isn't supported in graph assets if isinstance(first_node.node_def, GraphDefinition): - atomic_execution_unit_id = first_node.unique_id + execution_set_identifier = first_node.unique_id else: - atomic_execution_unit_id = ( + execution_set_identifier = ( first_node.unique_id if not first_node.can_subset else None ) else: @@ -1553,7 +1556,7 @@ def external_asset_checks_from_defs( name=check_key.name, asset_key=check_key.asset_key, description=spec.description, - atomic_execution_unit_id=atomic_execution_unit_id, + execution_set_identifier=execution_set_identifier, job_names=job_names_by_check_key[check_key], ) ) @@ -1581,7 +1584,7 @@ def external_asset_nodes_from_defs( code_version_by_asset_key: Dict[AssetKey, Optional[str]] = dict() group_name_by_asset_key: Dict[AssetKey, str] = {} descriptions_by_asset_key: Dict[AssetKey, str] = {} - atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {} + execution_set_identifiers: Dict[Union[AssetKey, AssetCheckKey], str] = {} owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {} execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {} is_observable_by_key: Dict[AssetKey, bool] = {} @@ -1642,12 +1645,12 @@ def external_asset_nodes_from_defs( {key: assets_def.auto_observe_interval_minutes for key in assets_def.keys} ) if len(assets_def.keys) > 1 and not assets_def.can_subset: - atomic_execution_unit_id = assets_def.unique_id + execution_set_identifier = assets_def.unique_id for asset_key in assets_def.keys: - atomic_execution_unit_ids_by_key[asset_key] = atomic_execution_unit_id + execution_set_identifiers[asset_key] = execution_set_identifier if len(assets_def.keys) == 1 and assets_def.check_keys and not assets_def.can_subset: - atomic_execution_unit_ids_by_key[assets_def.key] = assets_def.unique_id + execution_set_identifiers[assets_def.key] = assets_def.unique_id group_name_by_asset_key.update(asset_layer.group_names_by_assets()) @@ -1720,7 +1723,7 @@ def external_asset_nodes_from_defs( freshness_policy=freshness_policy_by_asset_key.get(asset_key), auto_materialize_policy=auto_materialize_policy_by_asset_key.get(asset_key), backfill_policy=backfill_policy_by_asset_key.get(asset_key), - atomic_execution_unit_id=atomic_execution_unit_ids_by_key.get(asset_key), + execution_set_identifier=execution_set_identifiers.get(asset_key), required_top_level_resources=required_top_level_resources, owners=[ owner.email if isinstance(owner, UserAssetOwner) else owner.team diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index d3b520cb5a430..42ab021456c24 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -92,7 +92,7 @@ def asset3(asset1, asset2): ... assert asset_graph.get_children(asset0.key) == {asset1.key, asset2.key} assert asset_graph.get_parents(asset3.key) == {asset1.key, asset2.key} for asset_def in assets: - assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() + assert asset_graph.get_execution_set_asset_keys(asset_def.key) == {asset_def.key} assert asset_graph.get_code_version(asset0.key) == "1" assert asset_graph.get_code_version(asset1.key) is None @@ -349,7 +349,7 @@ def non_subsettable_multi_asset(): ... asset_graph = asset_graph_from_assets([non_subsettable_multi_asset]) for asset_key in non_subsettable_multi_asset.keys: assert ( - asset_graph.get_required_multi_asset_keys(asset_key) == non_subsettable_multi_asset.keys + asset_graph.get_execution_set_asset_keys(asset_key) == non_subsettable_multi_asset.keys ) @@ -364,7 +364,7 @@ def subsettable_multi_asset(): ... asset_graph = asset_graph_from_assets([subsettable_multi_asset]) for asset_key in subsettable_multi_asset.keys: - assert asset_graph.get_required_multi_asset_keys(asset_key) == set() + assert asset_graph.get_execution_set_asset_keys(asset_key) == {asset_key} def test_required_multi_asset_sets_graph_backed_multi_asset( @@ -387,7 +387,7 @@ def graph1(): graph_backed_multi_asset = AssetsDefinition.from_graph(graph1) asset_graph = asset_graph_from_assets([graph_backed_multi_asset]) for asset_key in graph_backed_multi_asset.keys: - assert asset_graph.get_required_multi_asset_keys(asset_key) == graph_backed_multi_asset.keys + assert asset_graph.get_execution_set_asset_keys(asset_key) == graph_backed_multi_asset.keys def test_required_multi_asset_sets_same_op_in_different_assets( @@ -402,7 +402,7 @@ def op1(): ... asset_graph = asset_graph_from_assets(assets) for asset_def in assets: - assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set() + assert asset_graph.get_execution_set_asset_keys(asset_def.key) == {asset_def.key} def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]): @@ -753,8 +753,8 @@ def asset0(): ... def check0(): ... asset_graph = asset_graph_from_assets([asset0], asset_checks=[check0]) - assert asset_graph.get_required_asset_and_check_keys(asset0.key) == set() - assert asset_graph.get_required_asset_and_check_keys(check0.spec.key) == set() + assert asset_graph.get_execution_set_asset_and_check_keys(asset0.key) == {asset0.key} + assert asset_graph.get_execution_set_asset_and_check_keys(check0.spec.key) == {check0.spec.key} def test_required_assets_and_checks_by_key_asset_decorator( @@ -773,9 +773,9 @@ def check0(): ... grouped_keys = [asset0.key, foo_check.key, bar_check.key] for key in grouped_keys: - assert asset_graph.get_required_asset_and_check_keys(key) == set(grouped_keys) + assert asset_graph.get_execution_set_asset_and_check_keys(key) == set(grouped_keys) - assert asset_graph.get_required_asset_and_check_keys(check0.spec.key) == set() + assert asset_graph.get_execution_set_asset_and_check_keys(check0.spec.key) == {check0.spec.key} def test_required_assets_and_checks_by_key_multi_asset( @@ -808,14 +808,14 @@ def subsettable_asset_fn(): ... bar_check.key, ] for key in grouped_keys: - assert asset_graph.get_required_asset_and_check_keys(key) == set(grouped_keys) + assert asset_graph.get_execution_set_asset_and_check_keys(key) == set(grouped_keys) for key in [ AssetKey(["subsettable_asset0"]), AssetKey(["subsettable_asset1"]), biz_check.key, ]: - assert asset_graph.get_required_asset_and_check_keys(key) == set() + assert asset_graph.get_execution_set_asset_and_check_keys(key) == {key} def test_required_assets_and_checks_by_key_multi_asset_single_asset( @@ -834,4 +834,4 @@ def asset_fn(): ... asset_graph = asset_graph_from_assets([asset_fn]) for key in [AssetKey(["asset0"]), foo_check.key, bar_check.key]: - assert asset_graph.get_required_asset_and_check_keys(key) == set() + assert asset_graph.get_execution_set_asset_and_check_keys(key) == {key} diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index 11f907be5bd33..92d7562fd5b5b 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -568,7 +568,7 @@ def assets(): Definitions(assets=[assets], jobs=[assets_job]) ) - atomic_execution_unit_id = assets.unique_id + execution_set_identifier = assets.unique_id assert external_asset_nodes == [ ExternalAssetNode( @@ -584,7 +584,7 @@ def assets(): job_names=["__ASSET_JOB", "assets_job"], output_name=f"out{i}", group_name=DEFAULT_GROUP_NAME, - atomic_execution_unit_id=atomic_execution_unit_id, + execution_set_identifier=execution_set_identifier, ) for i in range(10) ]