Skip to content

Commit

Permalink
[external-assets] Standardize terminology on "execution set" (#20140)
Browse files Browse the repository at this point in the history
## Summary & Motivation

The terminology around groups of assets/checks that must be executed
together is currently a little mixed:

- `AssetGraph` has `get_required_multi_asset_keys` and
`get_required_asset_and_check_keys`
- `ExternalAssetNode` and `ExternalAssetCheck` have a field
`atomic_execution_unit_id`

It's not obvious that these are related to one another. This PR attempts
to tighten up and standardize the terminology around the phrase
"execution set":

- `AssetGraph.get_required_multi_asset_keys` ->
`get_execution_set_asset_keys`
- `AssetGraph.get_required_asset_and_check_keys` ->
`get_execution_set_asset_and_check_keys`
- `ExternalAsset{Node,Check}.atomic_execution_unit_id` ->
`execution_set_id`

It also tweaks the return value of the `AssetGraph` functions-- the
current behavior is to return an empty set for subsettable assets, which
does not make sense for "execution sets"-- this changes it to return a
set with a single element (the passed asset key).

## How I Tested These Changes

Existing test suite.
  • Loading branch information
smackesey authored Mar 5, 2024
1 parent d8f37aa commit ffb82f1
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _fetch_checks(
can_execute_individually = (
GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE
if len(
asset_graph.get_required_asset_and_check_keys(external_check.key) or []
asset_graph.get_execution_set_asset_and_check_keys(external_check.key)
)
<= 1
# NOTE: once we support multi checks, we'll need to add a case for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,17 @@ def get_additional_required_keys(
asset_nodes_by_key = get_asset_nodes_by_asset_key(graphene_info)

# the set of atomic execution ids that any of the input asset keys are a part of
required_atomic_execution_ids = {
asset_nodes_by_key[asset_key].external_asset_node.atomic_execution_unit_id
required_execution_set_identifiers = {
asset_nodes_by_key[asset_key].external_asset_node.execution_set_identifier
for asset_key in asset_keys
} - {None}

# the set of all asset keys that are part of the required atomic execution units
# the set of all asset keys that are part of the required execution sets
required_asset_keys = {
asset_node.external_asset_node.asset_key
for asset_node in asset_nodes_by_key.values()
if asset_node.external_asset_node.atomic_execution_unit_id in required_atomic_execution_ids
if asset_node.external_asset_node.execution_set_identifier
in required_execution_set_identifiers
}

return list(required_asset_keys - asset_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ def get_asset_condition_evaluations(

# if we need to materialize any partitions of a non-subsettable multi-asset, we need to
# materialize all of them
if num_requested > 0:
for neighbor_key in self.asset_graph.get_required_multi_asset_keys(asset_key):
execution_unit_keys = self.asset_graph.get_execution_set_asset_keys(asset_key)
if len(execution_unit_keys) > 1 and num_requested > 0:
for neighbor_key in execution_unit_keys:
expected_data_time_mapping[neighbor_key] = expected_data_time

# make sure that the true_subset of the neighbor is accurate -- when it was
Expand Down
45 changes: 23 additions & 22 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,20 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]:
visited.add(parent_key)

@abstractmethod
def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""For a given asset_key, return the set of asset keys that must be materialized at the same time."""
def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""For a given asset_key, return the set of asset keys that must be
materialized at the same time.
"""
...

@abstractmethod
def get_required_asset_and_check_keys(
def get_execution_set_asset_and_check_keys(
self, asset_key_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]: ...
) -> AbstractSet[AssetKeyOrCheckKey]:
"""For a given asset/check key, return the set of asset/check keys that must be
materialized/computed at the same time.
"""
...

@cached_method
def get_downstream_freshness_policies(
Expand Down Expand Up @@ -643,13 +649,13 @@ def bfs_filter_asset_partitions(
Visits parents before children.
When asset partitions are part of the same non-subsettable multi-asset, they're provided all
at once to the condition_fn.
When asset partitions are part of the same execution set (non-subsettable multi-asset),
they're provided all at once to the condition_fn.
"""
all_nodes = set(initial_asset_partitions)

# invariant: we never consider an asset partition before considering its ancestors
queue = ToposortedPriorityQueue(self, all_nodes, include_required_multi_assets=True)
queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_set=True)

result: Set[AssetKeyPartitionKey] = set()

Expand Down Expand Up @@ -744,10 +750,10 @@ def __init__(
self,
asset_graph: AssetGraph,
items: Iterable[AssetKeyPartitionKey],
include_required_multi_assets: bool,
include_full_execution_set: bool,
):
self._asset_graph = asset_graph
self._include_required_multi_assets = include_required_multi_assets
self._include_full_execution_set = include_full_execution_set

self._toposort_level_by_asset_key = {
asset_key: i
Expand All @@ -761,34 +767,29 @@ def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None:
heappush(self._heap, self._queue_item(asset_partition))

def dequeue(self) -> Iterable[AssetKeyPartitionKey]:
# For multi-assets, will include all required multi-asset keys if include_required_multi_assets is set to
# True, or a list of size 1 with just the passed in asset key if it was not
# For multi-assets, will include all required multi-asset keys if
# include_full_execution_set is set to True, or a list of size 1 with just the passed in
# asset key if it was not.
return heappop(self._heap).asset_partitions

def _queue_item(
self, asset_partition: AssetKeyPartitionKey
) -> "ToposortedPriorityQueue.QueueItem":
asset_key = asset_partition.asset_key

if self._include_required_multi_assets:
required_multi_asset_keys = self._asset_graph.get_required_multi_asset_keys(
asset_key
) | {asset_key}
if self._include_full_execution_set:
execution_set_keys = self._asset_graph.get_execution_set_asset_keys(asset_key)
else:
required_multi_asset_keys = {asset_key}
execution_set_keys = {asset_key}

level = max(
self._toposort_level_by_asset_key[required_asset_key]
for required_asset_key in required_multi_asset_keys
self._toposort_level_by_asset_key[asset_key] for asset_key in execution_set_keys
)

return ToposortedPriorityQueue.QueueItem(
level,
sort_key_for_asset_partition(self._asset_graph, asset_partition),
[
AssetKeyPartitionKey(ak, asset_partition.partition_key)
for ak in required_multi_asset_keys
],
[AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_set_keys],
)

def __len__(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
selection = self.child.resolve_inner(asset_graph)
output = set(selection)
for asset_key in selection:
output.update(asset_graph.get_required_multi_asset_keys(asset_key))
output.update(asset_graph.get_execution_set_asset_keys(asset_key))
return output

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,37 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]:
return {node.asset_key for node in self.asset_nodes if job_name in node.job_names}

def get_required_asset_and_check_keys(
def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
atomic_execution_unit_id = self.get_asset_node(
execution_set_identifier = self.get_asset_node(
asset_or_check_key
).atomic_execution_unit_id
).execution_set_identifier
else: # AssetCheckKey
atomic_execution_unit_id = self.get_asset_check(
execution_set_identifier = self.get_asset_check(
asset_or_check_key
).atomic_execution_unit_id
if atomic_execution_unit_id is None:
return set()
).execution_set_identifier
if execution_set_identifier is None:
return {asset_or_check_key}
else:
return {
*(
node.asset_key
for node in self.asset_nodes
if node.atomic_execution_unit_id == atomic_execution_unit_id
if node.execution_set_identifier == execution_set_identifier
),
*(
node.key
for node in self.asset_checks
if node.atomic_execution_unit_id == atomic_execution_unit_id
if node.execution_set_identifier == execution_set_identifier
),
}

def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
return {
key
for key in self.get_required_asset_and_check_keys(asset_key)
for key in self.get_execution_set_asset_and_check_keys(asset_key)
if isinstance(key, AssetKey)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,26 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
if ad.group_names_by_key[key] == group_name
}

def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
asset = self.get_assets_def(asset_key)
return set() if len(asset.keys) <= 1 or asset.can_subset else asset.keys
return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys

def get_required_asset_and_check_keys(
def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
asset = self.get_assets_def(asset_or_check_key)
else: # AssetCheckKey
# only checks emitted by AssetsDefinition have required keys
if self.has_asset_check(asset_or_check_key):
return set()
return {asset_or_check_key}
else:
asset = self.get_assets_def_for_check(asset_or_check_key)
if asset is None or asset_or_check_key not in asset.check_keys:
return set()
return {asset_or_check_key}
has_checks = len(asset.check_keys) > 0
if asset.can_subset or len(asset.keys) <= 1 and not has_checks:
return set()
return {asset_or_check_key}
else:
return {*asset.keys, *asset.check_keys}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,15 +1127,15 @@ def __new__(
)


@whitelist_for_serdes
@whitelist_for_serdes(storage_field_names={"execution_set_identifier": "atomic_execution_unit_id"})
class ExternalAssetCheck(
NamedTuple(
"_ExternalAssetCheck",
[
("name", str),
("asset_key", AssetKey),
("description", Optional[str]),
("atomic_execution_unit_id", Optional[str]),
("execution_set_identifier", Optional[str]),
("job_names", Sequence[str]),
],
)
Expand All @@ -1147,16 +1147,16 @@ def __new__(
name: str,
asset_key: AssetKey,
description: Optional[str],
atomic_execution_unit_id: Optional[str] = None,
execution_set_identifier: Optional[str] = None,
job_names: Optional[Sequence[str]] = None,
):
return super(ExternalAssetCheck, cls).__new__(
cls,
name=check.str_param(name, "name"),
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
description=check.opt_str_param(description, "description"),
atomic_execution_unit_id=check.opt_str_param(
atomic_execution_unit_id, "automic_execution_unit_id"
execution_set_identifier=check.opt_str_param(
execution_set_identifier, "execution_set_identifier"
),
job_names=check.opt_sequence_param(job_names, "job_names", of_type=str),
)
Expand All @@ -1167,7 +1167,10 @@ def key(self) -> AssetCheckKey:


@whitelist_for_serdes(
storage_field_names={"metadata": "metadata_entries"},
storage_field_names={
"metadata": "metadata_entries",
"execution_set_identifier": "atomic_execution_unit_id",
},
field_serializers={"metadata": MetadataFieldSerializer},
)
class ExternalAssetNode(
Expand Down Expand Up @@ -1197,9 +1200,9 @@ class ExternalAssetNode(
("is_source", bool),
("is_observable", bool),
# If a set of assets can't be materialized independently from each other, they will all
# have the same atomic_execution_unit_id. This ID should be stable across reloads and
# have the same execution_set_identifier. This ID should be stable across reloads and
# unique deployment-wide.
("atomic_execution_unit_id", Optional[str]),
("execution_set_identifier", Optional[str]),
("required_top_level_resources", Optional[Sequence[str]]),
("auto_materialize_policy", Optional[AutoMaterializePolicy]),
("backfill_policy", Optional[BackfillPolicy]),
Expand Down Expand Up @@ -1235,7 +1238,7 @@ def __new__(
freshness_policy: Optional[FreshnessPolicy] = None,
is_source: Optional[bool] = None,
is_observable: bool = False,
atomic_execution_unit_id: Optional[str] = None,
execution_set_identifier: Optional[str] = None,
required_top_level_resources: Optional[Sequence[str]] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
Expand Down Expand Up @@ -1316,8 +1319,8 @@ def __new__(
),
is_source=check.bool_param(is_source, "is_source"),
is_observable=check.bool_param(is_observable, "is_observable"),
atomic_execution_unit_id=check.opt_str_param(
atomic_execution_unit_id, "atomic_execution_unit_id"
execution_set_identifier=check.opt_str_param(
execution_set_identifier, "execution_set_identifier"
),
required_top_level_resources=check.opt_sequence_param(
required_top_level_resources, "required_top_level_resources", of_type=str
Expand Down Expand Up @@ -1529,7 +1532,7 @@ def external_asset_checks_from_defs(
of_type=AssetChecksDefinition,
additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition",
)
atomic_execution_unit_id = None
execution_set_identifier = None
elif isinstance(first_node, AssetsDefinition):
check.is_list(
nodes,
Expand All @@ -1539,9 +1542,9 @@ def external_asset_checks_from_defs(

# Executing individual checks isn't supported in graph assets
if isinstance(first_node.node_def, GraphDefinition):
atomic_execution_unit_id = first_node.unique_id
execution_set_identifier = first_node.unique_id
else:
atomic_execution_unit_id = (
execution_set_identifier = (
first_node.unique_id if not first_node.can_subset else None
)
else:
Expand All @@ -1553,7 +1556,7 @@ def external_asset_checks_from_defs(
name=check_key.name,
asset_key=check_key.asset_key,
description=spec.description,
atomic_execution_unit_id=atomic_execution_unit_id,
execution_set_identifier=execution_set_identifier,
job_names=job_names_by_check_key[check_key],
)
)
Expand Down Expand Up @@ -1581,7 +1584,7 @@ def external_asset_nodes_from_defs(
code_version_by_asset_key: Dict[AssetKey, Optional[str]] = dict()
group_name_by_asset_key: Dict[AssetKey, str] = {}
descriptions_by_asset_key: Dict[AssetKey, str] = {}
atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {}
execution_set_identifiers: Dict[Union[AssetKey, AssetCheckKey], str] = {}
owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {}
execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {}
is_observable_by_key: Dict[AssetKey, bool] = {}
Expand Down Expand Up @@ -1642,12 +1645,12 @@ def external_asset_nodes_from_defs(
{key: assets_def.auto_observe_interval_minutes for key in assets_def.keys}
)
if len(assets_def.keys) > 1 and not assets_def.can_subset:
atomic_execution_unit_id = assets_def.unique_id
execution_set_identifier = assets_def.unique_id

for asset_key in assets_def.keys:
atomic_execution_unit_ids_by_key[asset_key] = atomic_execution_unit_id
execution_set_identifiers[asset_key] = execution_set_identifier
if len(assets_def.keys) == 1 and assets_def.check_keys and not assets_def.can_subset:
atomic_execution_unit_ids_by_key[assets_def.key] = assets_def.unique_id
execution_set_identifiers[assets_def.key] = assets_def.unique_id

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

Expand Down Expand Up @@ -1720,7 +1723,7 @@ def external_asset_nodes_from_defs(
freshness_policy=freshness_policy_by_asset_key.get(asset_key),
auto_materialize_policy=auto_materialize_policy_by_asset_key.get(asset_key),
backfill_policy=backfill_policy_by_asset_key.get(asset_key),
atomic_execution_unit_id=atomic_execution_unit_ids_by_key.get(asset_key),
execution_set_identifier=execution_set_identifiers.get(asset_key),
required_top_level_resources=required_top_level_resources,
owners=[
owner.email if isinstance(owner, UserAssetOwner) else owner.team
Expand Down
Loading

0 comments on commit ffb82f1

Please sign in to comment.