Skip to content

Commit

Permalink
[external-assets] Standardize terminology on "execution unit"
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 29, 2024
1 parent ac8a9c6 commit 59f3d5f
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _fetch_checks(
can_execute_individually = (
GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE
if len(
asset_graph.get_required_asset_and_check_keys(external_check.key) or []
asset_graph.get_execution_unit_asset_and_check_keys(external_check.key)
)
<= 1
# NOTE: once we support multi checks, we'll need to add a case for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ def get_additional_required_keys(

# the set of atomic execution ids that any of the input asset keys are a part of
required_atomic_execution_ids = {
asset_nodes_by_key[asset_key].external_asset_node.atomic_execution_unit_id
asset_nodes_by_key[asset_key].external_asset_node.execution_unit_id
for asset_key in asset_keys
} - {None}

# the set of all asset keys that are part of the required atomic execution units
required_asset_keys = {
asset_node.external_asset_node.asset_key
for asset_node in asset_nodes_by_key.values()
if asset_node.external_asset_node.atomic_execution_unit_id in required_atomic_execution_ids
if asset_node.external_asset_node.execution_unit_id in required_atomic_execution_ids
}

return list(required_asset_keys - asset_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ def get_asset_condition_evaluations(

# if we need to materialize any partitions of a non-subsettable multi-asset, we need to
# materialize all of them
if num_requested > 0:
for neighbor_key in self.asset_graph.get_required_multi_asset_keys(asset_key):
execution_unit_keys = self.asset_graph.get_execution_unit_asset_keys(asset_key)
if len(execution_unit_keys) > 1 and num_requested > 0:
for neighbor_key in execution_unit_keys:
expected_data_time_mapping[neighbor_key] = expected_data_time

# make sure that the true_subset of the neighbor is accurate -- when it was
Expand Down
42 changes: 21 additions & 21 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,19 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]:
visited.add(parent_key)

@abstractmethod
def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""For a given asset_key, return the set of asset keys that must be materialized at the same time."""
def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""For a given asset_key, return the set of asset keys that must be
materialized at the same time.
"""
...

@abstractmethod
def get_required_asset_and_check_keys(
def get_execution_unit_asset_and_check_keys(
self, asset_key_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
"""For a given asset/check key, return the set of asset/check keys that must be
materialized/computed at the same time.
"""
...

@cached_method
Expand Down Expand Up @@ -651,13 +656,13 @@ def bfs_filter_asset_partitions(
Visits parents before children.
When asset partitions are part of the same non-subsettable multi-asset, they're provided all
at once to the condition_fn.
When asset partitions are part of the same execution unit (non-subsettable multi-asset),
they're provided all at once to the condition_fn.
"""
all_nodes = set(initial_asset_partitions)

# invariant: we never consider an asset partition before considering its ancestors
queue = ToposortedPriorityQueue(self, all_nodes, include_required_multi_assets=True)
queue = ToposortedPriorityQueue(self, all_nodes, include_full_execution_unit=True)

result: Set[AssetKeyPartitionKey] = set()

Expand Down Expand Up @@ -752,10 +757,10 @@ def __init__(
self,
asset_graph: AssetGraph,
items: Iterable[AssetKeyPartitionKey],
include_required_multi_assets: bool,
include_full_execution_unit: bool,
):
self._asset_graph = asset_graph
self._include_required_multi_assets = include_required_multi_assets
self._include_full_execution_unit = include_full_execution_unit

asset_keys_by_level = toposort.toposort(asset_graph.asset_dep_graph["upstream"])
self._toposort_level_by_asset_key = {
Expand All @@ -770,34 +775,29 @@ def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None:
heappush(self._heap, self._queue_item(asset_partition))

def dequeue(self) -> Iterable[AssetKeyPartitionKey]:
# For multi-assets, will include all required multi-asset keys if include_required_multi_assets is set to
# True, or a list of size 1 with just the passed in asset key if it was not
# For multi-assets, will include all required multi-asset keys if
# include_full_execution_unit is set to True, or a list of size 1 with just the passed in
# asset key if it was not.
return heappop(self._heap).asset_partitions

def _queue_item(
self, asset_partition: AssetKeyPartitionKey
) -> "ToposortedPriorityQueue.QueueItem":
asset_key = asset_partition.asset_key

if self._include_required_multi_assets:
required_multi_asset_keys = self._asset_graph.get_required_multi_asset_keys(
asset_key
) | {asset_key}
if self._include_full_execution_unit:
execution_unit_keys = self._asset_graph.get_execution_unit_asset_keys(asset_key)
else:
required_multi_asset_keys = {asset_key}
execution_unit_keys = {asset_key}

level = max(
self._toposort_level_by_asset_key[required_asset_key]
for required_asset_key in required_multi_asset_keys
self._toposort_level_by_asset_key[asset_key] for asset_key in execution_unit_keys
)

return ToposortedPriorityQueue.QueueItem(
level,
sort_key_for_asset_partition(self._asset_graph, asset_partition),
[
AssetKeyPartitionKey(ak, asset_partition.partition_key)
for ak in required_multi_asset_keys
],
[AssetKeyPartitionKey(ak, asset_partition.partition_key) for ak in execution_unit_keys],
)

def __len__(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
selection = self.child.resolve_inner(asset_graph)
output = set(selection)
for asset_key in selection:
output.update(asset_graph.get_required_multi_asset_keys(asset_key))
output.update(asset_graph.get_execution_unit_asset_keys(asset_key))
return output

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,33 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]:
return {node.asset_key for node in self.asset_nodes if job_name in node.job_names}

def get_required_asset_and_check_keys(
def get_execution_unit_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
atomic_execution_unit_id = self.get_asset_node(
asset_or_check_key
).atomic_execution_unit_id
execution_unit_id = self.get_asset_node(asset_or_check_key).execution_unit_id
else: # AssetCheckKey
atomic_execution_unit_id = self.get_asset_check(
asset_or_check_key
).atomic_execution_unit_id
if atomic_execution_unit_id is None:
return set()
execution_unit_id = self.get_asset_check(asset_or_check_key).execution_unit_id
if execution_unit_id is None:
return {asset_or_check_key}
else:
return {
*(
node.asset_key
for node in self.asset_nodes
if node.atomic_execution_unit_id == atomic_execution_unit_id
if node.execution_unit_id == execution_unit_id
),
*(
node.key
for node in self.asset_checks
if node.atomic_execution_unit_id == atomic_execution_unit_id
if node.execution_unit_id == execution_unit_id
),
}

def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
return {
key
for key in self.get_required_asset_and_check_keys(asset_key)
for key in self.get_execution_unit_asset_and_check_keys(asset_key)
if isinstance(key, AssetKey)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,26 @@ def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
if ad.group_names_by_key[key] == group_name
}

def get_required_multi_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
def get_execution_unit_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
asset = self.get_assets_def(asset_key)
return set() if len(asset.keys) <= 1 or asset.can_subset else asset.keys
return {asset_key} if len(asset.keys) <= 1 or asset.can_subset else asset.keys

def get_required_asset_and_check_keys(
def get_execution_unit_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
asset = self.get_assets_def(asset_or_check_key)
else: # AssetCheckKey
# only checks emitted by AssetsDefinition have required keys
if self.has_asset_check(asset_or_check_key):
return set()
return {asset_or_check_key}
else:
asset = self.get_assets_def_for_check(asset_or_check_key)
if asset is None or asset_or_check_key not in asset.check_keys:
return set()
return {asset_or_check_key}
has_checks = len(asset.check_keys) > 0
if asset.can_subset or len(asset.keys) <= 1 and not has_checks:
return set()
return {asset_or_check_key}
else:
return {*asset.keys, *asset.check_keys}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,15 +1126,15 @@ def __new__(
)


@whitelist_for_serdes
@whitelist_for_serdes(storage_field_names={"execution_unit_id": "atomic_execution_unit_id"})
class ExternalAssetCheck(
NamedTuple(
"_ExternalAssetCheck",
[
("name", str),
("asset_key", AssetKey),
("description", Optional[str]),
("atomic_execution_unit_id", Optional[str]),
("execution_unit_id", Optional[str]),
("job_names", Sequence[str]),
],
)
Expand All @@ -1146,17 +1146,15 @@ def __new__(
name: str,
asset_key: AssetKey,
description: Optional[str],
atomic_execution_unit_id: Optional[str] = None,
execution_unit_id: Optional[str] = None,
job_names: Optional[Sequence[str]] = None,
):
return super(ExternalAssetCheck, cls).__new__(
cls,
name=check.str_param(name, "name"),
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
description=check.opt_str_param(description, "description"),
atomic_execution_unit_id=check.opt_str_param(
atomic_execution_unit_id, "automic_execution_unit_id"
),
execution_unit_id=check.opt_str_param(execution_unit_id, "automic_execution_unit_id"),
job_names=check.opt_sequence_param(job_names, "job_names", of_type=str),
)

Expand All @@ -1166,7 +1164,10 @@ def key(self) -> AssetCheckKey:


@whitelist_for_serdes(
storage_field_names={"metadata": "metadata_entries"},
storage_field_names={
"metadata": "metadata_entries",
"execution_unit_id": "atomic_execution_unit_id",
},
field_serializers={"metadata": MetadataFieldSerializer},
)
class ExternalAssetNode(
Expand Down Expand Up @@ -1196,9 +1197,9 @@ class ExternalAssetNode(
("is_source", bool),
("is_observable", bool),
# If a set of assets can't be materialized independently from each other, they will all
# have the same atomic_execution_unit_id. This ID should be stable across reloads and
# have the same execution_unit_id. This ID should be stable across reloads and
# unique deployment-wide.
("atomic_execution_unit_id", Optional[str]),
("execution_unit_id", Optional[str]),
("required_top_level_resources", Optional[Sequence[str]]),
("auto_materialize_policy", Optional[AutoMaterializePolicy]),
("backfill_policy", Optional[BackfillPolicy]),
Expand Down Expand Up @@ -1234,7 +1235,7 @@ def __new__(
freshness_policy: Optional[FreshnessPolicy] = None,
is_source: Optional[bool] = None,
is_observable: bool = False,
atomic_execution_unit_id: Optional[str] = None,
execution_unit_id: Optional[str] = None,
required_top_level_resources: Optional[Sequence[str]] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
Expand Down Expand Up @@ -1315,9 +1316,7 @@ def __new__(
),
is_source=check.bool_param(is_source, "is_source"),
is_observable=check.bool_param(is_observable, "is_observable"),
atomic_execution_unit_id=check.opt_str_param(
atomic_execution_unit_id, "atomic_execution_unit_id"
),
execution_unit_id=check.opt_str_param(execution_unit_id, "execution_unit_id"),
required_top_level_resources=check.opt_sequence_param(
required_top_level_resources, "required_top_level_resources", of_type=str
),
Expand Down Expand Up @@ -1528,14 +1527,14 @@ def external_asset_checks_from_defs(
of_type=AssetChecksDefinition,
additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition",
)
atomic_execution_unit_id = None
execution_unit_id = None
elif isinstance(first_node, AssetsDefinition):
check.is_list(
nodes,
of_type=AssetsDefinition,
additional_message=f"Check {check_key} is redefined in an AssetsDefinition and an AssetChecksDefinition",
)
atomic_execution_unit_id = first_node.unique_id if not first_node.can_subset else None
execution_unit_id = first_node.unique_id if not first_node.can_subset else None
else:
check.failed(f"Unexpected node type {first_node}")

Expand All @@ -1545,7 +1544,7 @@ def external_asset_checks_from_defs(
name=check_key.name,
asset_key=check_key.asset_key,
description=spec.description,
atomic_execution_unit_id=atomic_execution_unit_id,
execution_unit_id=execution_unit_id,
job_names=job_names_by_check_key[check_key],
)
)
Expand Down Expand Up @@ -1573,7 +1572,7 @@ def external_asset_nodes_from_defs(
code_version_by_asset_key: Dict[AssetKey, Optional[str]] = dict()
group_name_by_asset_key: Dict[AssetKey, str] = {}
descriptions_by_asset_key: Dict[AssetKey, str] = {}
atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {}
execution_unit_ids: Dict[Union[AssetKey, AssetCheckKey], str] = {}
owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {}
execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {}
is_observable_by_key: Dict[AssetKey, bool] = {}
Expand Down Expand Up @@ -1634,12 +1633,12 @@ def external_asset_nodes_from_defs(
{key: assets_def.auto_observe_interval_minutes for key in assets_def.keys}
)
if len(assets_def.keys) > 1 and not assets_def.can_subset:
atomic_execution_unit_id = assets_def.unique_id
execution_unit_id = assets_def.unique_id

for asset_key in assets_def.keys:
atomic_execution_unit_ids_by_key[asset_key] = atomic_execution_unit_id
execution_unit_ids[asset_key] = execution_unit_id
if len(assets_def.keys) == 1 and assets_def.check_keys and not assets_def.can_subset:
atomic_execution_unit_ids_by_key[assets_def.key] = assets_def.unique_id
execution_unit_ids[assets_def.key] = assets_def.unique_id

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

Expand Down Expand Up @@ -1725,7 +1724,7 @@ def external_asset_nodes_from_defs(
freshness_policy=freshness_policy_by_asset_key.get(asset_key),
auto_materialize_policy=auto_materialize_policy_by_asset_key.get(asset_key),
backfill_policy=backfill_policy_by_asset_key.get(asset_key),
atomic_execution_unit_id=atomic_execution_unit_ids_by_key.get(asset_key),
execution_unit_id=execution_unit_ids.get(asset_key),
required_top_level_resources=required_top_level_resources,
owners=[
owner.email if isinstance(owner, UserAssetOwner) else owner.team
Expand Down
Loading

0 comments on commit 59f3d5f

Please sign in to comment.