Skip to content

Commit

Permalink
[external-assets] Make execution_type an ExternalAssetNode property (#…
Browse files Browse the repository at this point in the history
…19684)

## Summary & Motivation

This adds `execution_type` as a property on `ExternalAssetsNode` and
defines execution-type related helper properties on `AssetsDefinition`.

Logic is in place on `ExternalAssetNode` to ensure that `execution_type`
specified as a param agrees with metadata.

## How I Tested These Changes

Existing test suite.
  • Loading branch information
smackesey authored and PedramNavid committed Mar 28, 2024
1 parent 786fb25 commit 66268de
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DagsterInstance,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.events.log import EventLogEntry
Expand Down Expand Up @@ -315,6 +316,7 @@ def _build_cross_repo_deps(
)
],
depended_by=[],
execution_type=AssetExecutionType.UNEXECUTABLE,
)

return sink_assets, external_asset_deps
Expand Down
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_spec import (
AssetExecutionType,
)
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.metadata import (
ArbitraryMetadataMapping,
Expand Down Expand Up @@ -706,6 +709,14 @@ def input_for_asset_key(self, node_handle: NodeHandle, key: AssetKey) -> Optiona
def io_manager_key_for_asset(self, asset_key: AssetKey) -> str:
return self.io_manager_keys_by_asset_key.get(asset_key, "io_manager")

def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
if asset_key in self.assets_defs_by_key:
return self.assets_defs_by_key[asset_key].execution_type
elif asset_key in self.source_assets_by_key:
return self.source_assets_by_key[asset_key].execution_type
else:
check.failed(f"Couldn't find key {asset_key}")

def is_observable_for_asset(self, asset_key: AssetKey) -> bool:
return (
asset_key in self.source_assets_by_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._serdes.serdes import whitelist_for_serdes

from .auto_materialize_policy import AutoMaterializePolicy
from .events import (
Expand All @@ -24,6 +25,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"


@whitelist_for_serdes
class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
Expand Down
52 changes: 29 additions & 23 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from dagster._annotations import experimental_param, public
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.freshness_policy import FreshnessPolicy
Expand Down Expand Up @@ -953,33 +956,36 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:
"""
return self._selected_asset_check_keys

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
@property
def execution_type(self) -> AssetExecutionType:
first_key = next(iter(self.keys), None)
# Currently all assets in an AssetsDefinition have the same execution type
if first_key:
return AssetExecutionType.str_to_enum(
self.metadata_by_key.get(first_key, {}).get(
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE
)
)
else:
return AssetExecutionType.UNEXECUTABLE

Args:
asset_key (AssetKey): The asset key to check.
return self._execution_type

Returns:
bool: True if the asset key is materializable by this AssetsDefinition.
"""
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
@property
def is_external(self) -> bool:
return self.execution_type != AssetExecutionType.MATERIALIZATION

return AssetExecutionType.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)
@property
def is_observable(self) -> bool:
return self.execution_type == AssetExecutionType.OBSERVATION

def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
@property
def is_materializable(self) -> bool:
return self.execution_type == AssetExecutionType.MATERIALIZATION

return AssetExecutionType.str_to_enum(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)
@property
def is_executable(self) -> bool:
return self.execution_type != AssetExecutionType.UNEXECUTABLE

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])
Expand Down
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
DataVersion,
Expand Down Expand Up @@ -269,6 +270,19 @@ def op(self) -> OpDefinition:
)
return cast(OpDefinition, self.node_def)

@property
def execution_type(self) -> AssetExecutionType:
return (
AssetExecutionType.OBSERVATION
if self.is_observable
else AssetExecutionType.UNEXECUTABLE
)

@property
def is_executable(self) -> bool:
"""bool: Whether the asset is observable."""
return self.is_observable

@public
@property
def is_observable(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ def _log_asset_materialization_events_for_asset(
if asset_key:
asset_layer = step_context.job_def.asset_layer
execution_type = (
asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key)
asset_layer.assets_def_for_asset(asset_key).execution_type
if asset_layer.has_assets_def_for_asset(asset_key)
else AssetExecutionType.MATERIALIZATION
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@
from dagster._core.snap.mode import ResourceDefSnap, build_resource_def_snap
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import is_whitelisted_for_serdes_object
from dagster._serdes.serdes import (
is_whitelisted_for_serdes_object,
)
from dagster._utils.error import SerializableErrorInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -1175,6 +1177,7 @@ class ExternalAssetNode(
("asset_key", AssetKey),
("dependencies", Sequence[ExternalAssetDependency]),
("depended_by", Sequence[ExternalAssetDependedBy]),
("execution_type", AssetExecutionType),
("compute_kind", Optional[str]),
("op_name", Optional[str]),
("op_names", Sequence[str]),
Expand Down Expand Up @@ -1215,6 +1218,7 @@ def __new__(
asset_key: AssetKey,
dependencies: Sequence[ExternalAssetDependency],
depended_by: Sequence[ExternalAssetDependedBy],
execution_type: Optional[AssetExecutionType] = None,
compute_kind: Optional[str] = None,
op_name: Optional[str] = None,
op_names: Optional[Sequence[str]] = None,
Expand All @@ -1238,6 +1242,32 @@ def __new__(
auto_observe_interval_minutes: Optional[float] = None,
owners: Optional[Sequence[str]] = None,
):
metadata = normalize_metadata(check.opt_mapping_param(metadata, "metadata", key_type=str))

# backcompat logic for execution type specified via metadata
if SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE in metadata:
val = metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE]
if not isinstance(val, TextMetadataValue):
check.failed(
f"Expected metadata value for key {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE} to be a TextMetadataValue, got {val}"
)
metadata_execution_type = AssetExecutionType.str_to_enum(val.value)
if execution_type is not None:
check.invariant(
execution_type == metadata_execution_type,
f"Execution type {execution_type} in metadata does not match type inferred from metadata {metadata_execution_type}",
)
execution_type = metadata_execution_type
else:
execution_type = (
check.opt_inst_param(
execution_type,
"execution_type",
AssetExecutionType,
)
or AssetExecutionType.MATERIALIZATION
)

# backcompat logic to handle ExternalAssetNodes serialized without op_names/graph_name
if not op_names:
op_names = list(filter(None, [op_name]))
Expand Down Expand Up @@ -1272,9 +1302,7 @@ def __new__(
),
output_name=check.opt_str_param(output_name, "output_name"),
output_description=check.opt_str_param(output_description, "output_description"),
metadata=normalize_metadata(
check.opt_mapping_param(metadata, "metadata", key_type=str)
),
metadata=metadata,
group_name=check.opt_str_param(group_name, "group_name"),
freshness_policy=check.opt_inst_param(
freshness_policy, "freshness_policy", FreshnessPolicy
Expand All @@ -1299,19 +1327,12 @@ def __new__(
auto_observe_interval_minutes, "auto_observe_interval_minutes"
),
owners=check.opt_sequence_param(owners, "owners", of_type=str),
execution_type=check.inst_param(execution_type, "execution_type", AssetExecutionType),
)

@property
def is_executable(self) -> bool:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value

return AssetExecutionType.is_executable(varietal_text)
return self.execution_type != AssetExecutionType.UNEXECUTABLE


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down Expand Up @@ -1554,6 +1575,7 @@ def external_asset_nodes_from_defs(
descriptions_by_asset_key: Dict[AssetKey, str] = {}
atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {}
owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {}
execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {}

for job_def in job_defs:
asset_layer = job_def.asset_layer
Expand All @@ -1577,6 +1599,9 @@ def external_asset_nodes_from_defs(
all_upstream_asset_keys.update(upstream_asset_keys)
node_defs_by_asset_key[output_key].append((node_output_handle, job_def))
asset_info_by_asset_key[output_key] = asset_info
execution_types_by_asset_key[output_key] = asset_layer.execution_type_for_asset(
output_key
)

for upstream_key in upstream_asset_keys:
partition_mapping = asset_layer.partition_mapping_for_node_input(
Expand Down Expand Up @@ -1623,6 +1648,7 @@ def external_asset_nodes_from_defs(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
execution_type=AssetExecutionType.UNEXECUTABLE,
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
Expand Down Expand Up @@ -1656,6 +1682,7 @@ def external_asset_nodes_from_defs(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
execution_type=source_asset.execution_type,
job_names=job_names,
op_description=source_asset.description,
metadata=source_asset.metadata,
Expand Down Expand Up @@ -1712,6 +1739,7 @@ def external_asset_nodes_from_defs(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
execution_type=execution_types_by_asset_key[asset_key],
compute_kind=node_def.tags.get("kind"),
# backcompat
op_name=graph_name
Expand Down
Loading

0 comments on commit 66268de

Please sign in to comment.