Skip to content

Commit

Permalink
[external-assets] Make execution_type an AssetsDefinition property
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 8, 2024
1 parent a1490ca commit cfd2894
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DagsterInstance,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.events.log import EventLogEntry
Expand Down Expand Up @@ -315,6 +316,7 @@ def _build_cross_repo_deps(
)
],
depended_by=[],
execution_type=AssetExecutionType.UNEXECUTABLE,
)

return sink_assets, external_asset_deps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,5 @@ def blocking_asset(**kwargs):
auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
config=None, # gets config from asset_def.op
_execution_type=asset_def.execution_type,
)
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_spec import (
AssetExecutionType,
)
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.metadata import (
ArbitraryMetadataMapping,
Expand Down Expand Up @@ -741,6 +744,14 @@ def input_for_asset_key(self, node_handle: NodeHandle, key: AssetKey) -> Optiona
def io_manager_key_for_asset(self, asset_key: AssetKey) -> str:
return self.io_manager_keys_by_asset_key.get(asset_key, "io_manager")

def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
if asset_key in self.assets_defs_by_key:
return self.assets_defs_by_key[asset_key].execution_type
elif asset_key in self.source_assets_by_key:
return self.source_assets_by_key[asset_key].execution_type
else:
check.failed(f"Couldn't find key {asset_key}")

def is_observable_for_asset(self, asset_key: AssetKey) -> bool:
return (
asset_key in self.source_assets_by_key
Expand Down
10 changes: 2 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._serdes.serdes import whitelist_for_serdes

from .auto_materialize_policy import AutoMaterializePolicy
from .events import (
Expand All @@ -15,15 +16,8 @@
if TYPE_CHECKING:
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep

# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
# whih encodes the execution type the of asset. "Unexecutable" assets are assets
# that cannot be materialized in Dagster, but can have events in the event
# log keyed off of them, making Dagster usable as a observability and lineage tool
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"


@whitelist_for_serdes
class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
Expand Down
53 changes: 16 additions & 37 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
_descriptions_by_key: Mapping[AssetKey, str]
_selected_asset_check_keys: AbstractSet[AssetCheckKey]
_is_subset: bool
_execution_type: AssetExecutionType

def __init__(
self,
Expand All @@ -116,6 +117,7 @@ def __init__(
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
is_subset: bool = False,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
# if adding new fields, make sure to handle them in the with_attributes, from_graph, and
# get_attributes_dict methods
):
Expand Down Expand Up @@ -315,6 +317,9 @@ def __init__(
)

self._is_subset = check.bool_param(is_subset, "is_subset")
self._execution_type = check.inst_param(
_execution_type, "_execution_type", AssetExecutionType
)

@staticmethod
def dagster_internal_init(
Expand All @@ -337,6 +342,7 @@ def dagster_internal_init(
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
is_subset: bool,
_execution_type: AssetExecutionType,
) -> "AssetsDefinition":
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
Expand All @@ -357,6 +363,7 @@ def dagster_internal_init(
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=selected_asset_check_keys,
is_subset=is_subset,
_execution_type=_execution_type,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Expand Down Expand Up @@ -394,6 +401,7 @@ def from_graph(
backfill_policy: Optional[BackfillPolicy] = None,
can_subset: bool = False,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> "AssetsDefinition":
"""Constructs an AssetsDefinition from a GraphDefinition.
Expand Down Expand Up @@ -466,6 +474,7 @@ def from_graph(
backfill_policy=backfill_policy,
can_subset=can_subset,
check_specs=check_specs,
_execution_type=_execution_type,
)

@public
Expand All @@ -489,6 +498,7 @@ def from_op(
] = None,
backfill_policy: Optional[BackfillPolicy] = None,
can_subset: bool = False,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> "AssetsDefinition":
"""Constructs an AssetsDefinition from an OpDefinition.
Expand Down Expand Up @@ -554,6 +564,7 @@ def from_op(
auto_materialize_policies_by_output_name=auto_materialize_policies_by_output_name,
backfill_policy=backfill_policy,
can_subset=can_subset,
_execution_type=_execution_type,
)

@staticmethod
Expand All @@ -578,6 +589,7 @@ def _from_node(
backfill_policy: Optional[BackfillPolicy] = None,
can_subset: bool = False,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> "AssetsDefinition":
from dagster._core.definitions.decorators.asset_decorator import (
_assign_output_names_to_check_specs,
Expand Down Expand Up @@ -715,6 +727,7 @@ def _from_node(
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None,
is_subset=False,
_execution_type=_execution_type,
)

@public
Expand Down Expand Up @@ -910,15 +923,7 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:

@property
def execution_type(self) -> AssetExecutionType:
key = next(iter(self.keys), None)

# This occurs when a multi-asset has been subsetted to have only checks-- for now supported
# only on materializable assets.
if key is None:
return AssetExecutionType.MATERIALIZATION

# All assets in an AssetsDefinition currently must have the same execution type
return self.asset_execution_type_for_asset(key)
return self._execution_type

@property
def is_observable(self) -> bool:
Expand All @@ -932,34 +937,6 @@ def is_materializable(self) -> bool:
def is_executable(self) -> bool:
return self.execution_type != AssetExecutionType.UNEXECUTABLE

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Args:
asset_key (AssetKey): The asset key to check.
Returns:
bool: True if the asset key is materializable by this AssetsDefinition.
"""
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

return AssetExecutionType.is_executable(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

return AssetExecutionType.str_to_enum(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down Expand Up @@ -1157,6 +1134,7 @@ def with_attributes(
selected_asset_check_keys=selected_asset_check_keys
if selected_asset_check_keys
else self._selected_asset_check_keys,
_execution_type=self.execution_type,
)

return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))
Expand Down Expand Up @@ -1402,6 +1380,7 @@ def get_attributes_dict(self) -> Dict[str, Any]:
descriptions_by_key=self._descriptions_by_key,
check_specs_by_output_name=self._check_specs_by_output_name,
selected_asset_check_keys=self._selected_asset_check_keys,
_execution_type=self._execution_type,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from ..asset_check_spec import AssetCheckSpec
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..asset_spec import AssetExecutionType, AssetSpec
from ..assets import ASSET_SUBSET_INPUT_PREFIX, AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..decorators.graph_decorator import graph
Expand Down Expand Up @@ -89,6 +89,7 @@ def asset(
key: Optional[CoercibleToAssetKey] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
_execution_type: AssetExecutionType = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand Down Expand Up @@ -128,6 +129,7 @@ def asset(
key: Optional[CoercibleToAssetKey] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -241,6 +243,7 @@ def create_asset():
code_version=code_version,
check_specs=check_specs,
key=key,
execution_type=_execution_type,
)

if compute_fn is not None:
Expand Down Expand Up @@ -313,6 +316,7 @@ def __init__(
code_version: Optional[str] = None,
key: Optional[CoercibleToAssetKey] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
):
self.name = name
self.key_prefix = key_prefix
Expand Down Expand Up @@ -340,6 +344,7 @@ def __init__(
self.code_version = code_version
self.check_specs = check_specs
self.key = key
self.execution_type = execution_type

def __call__(self, fn: Callable) -> AssetsDefinition:
from dagster._config.pythonic_config import (
Expand Down Expand Up @@ -484,6 +489,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None, # no subselection in decorator
is_subset=False,
_execution_type=self.execution_type,
)


Expand Down Expand Up @@ -512,6 +518,7 @@ def multi_asset(
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
Expand Down Expand Up @@ -882,6 +889,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None, # no subselection in decorator
is_subset=False,
_execution_type=_execution_type,
)

return inner
Expand Down Expand Up @@ -1019,6 +1027,7 @@ def graph_asset(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
_execution_type: AssetExecutionType = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -1040,6 +1049,7 @@ def graph_asset(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Creates a software-defined asset that's computed using a graph of ops.
Expand Down Expand Up @@ -1117,6 +1127,7 @@ def slack_files_table():
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
_execution_type=_execution_type,
)
else:
return graph_asset_no_defaults(
Expand All @@ -1135,6 +1146,7 @@ def slack_files_table():
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
_execution_type=_execution_type,
)


Expand All @@ -1155,6 +1167,7 @@ def graph_asset_no_defaults(
resource_defs: Optional[Mapping[str, ResourceDefinition]],
check_specs: Optional[Sequence[AssetCheckSpec]],
key: Optional[CoercibleToAssetKey],
_execution_type: AssetExecutionType,
) -> AssetsDefinition:
ins = ins or {}
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
Expand Down Expand Up @@ -1210,6 +1223,7 @@ def graph_asset_no_defaults(
descriptions_by_output_name={"result": description} if description else None,
resource_defs=resource_defs,
check_specs=check_specs,
_execution_type=_execution_type,
)


Expand All @@ -1225,6 +1239,7 @@ def graph_multi_asset(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
config: Optional[Union[ConfigMapping, Mapping[str, Any]]] = None,
_execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same graph of
ops, and the same upstream assets.
Expand Down Expand Up @@ -1337,6 +1352,7 @@ def inner(fn: Callable) -> AssetsDefinition:
descriptions_by_output_name=descriptions_by_output_name,
resource_defs=resource_defs,
check_specs=check_specs,
_execution_type=_execution_type,
)

return inner
Expand Down
Loading

0 comments on commit cfd2894

Please sign in to comment.