Skip to content

Commit

Permalink
[external assets] source assets -> external assets
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 7, 2024
1 parent b1e59a1 commit db14c71
Show file tree
Hide file tree
Showing 27 changed files with 557 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export const LaunchAssetExecutionButton = ({
onClick: () => void;
}[];
}) => {
console.log('LaunchAssetExecutionButton', scope);
const {onClick, loading, launchpadElement} = useMaterializationAction(preferredJobName);
const [isOpen, setIsOpen] = React.useState(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -259,6 +260,7 @@ class GrapheneAssetNode(graphene.ObjectType):
groupName = graphene.String()
id = graphene.NonNull(graphene.ID)
isExecutable = graphene.NonNull(graphene.Boolean)
isExternal = graphene.NonNull(graphene.Boolean)
isObservable = graphene.NonNull(graphene.Boolean)
isPartitioned = graphene.NonNull(graphene.Boolean)
isSource = graphene.NonNull(graphene.Boolean)
Expand Down Expand Up @@ -501,7 +503,8 @@ def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
node = self._external_asset_node
return node.is_source or node.is_external and len(node.dependencies) == 0

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -962,7 +965,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None

def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable
return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION

def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_external

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_executable
Expand Down
58 changes: 48 additions & 10 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
Expand All @@ -100,10 +102,8 @@ def __init__(
self._is_observable_by_key = is_observable_by_key
self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key
# source assets keys can sometimes appear in the upstream dict
self._materializable_asset_keys = (
self._asset_dep_graph["upstream"].keys() - self.source_asset_keys
)
self._required_assets_and_checks_by_key = required_assets_and_checks_by_key
self._execution_types_by_key = execution_types_by_key

@property
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
Expand All @@ -117,6 +117,10 @@ def group_names_by_key(self) -> Mapping[AssetKey, Optional[str]]:
def source_asset_keys(self) -> AbstractSet[AssetKey]:
return self._source_asset_keys

@property
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return self.all_asset_keys - self.materializable_asset_keys

@functools.cached_property
def root_asset_keys(self) -> AbstractSet[AssetKey]:
"""Non-source asset keys that have no non-source parents."""
Expand Down Expand Up @@ -150,6 +154,10 @@ def auto_materialize_policies_by_key(
def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]:
return self._backfill_policies_by_key

@property
def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]:
return self._execution_types_by_key

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
return self._auto_observe_interval_minutes_by_key.get(asset_key)

Expand All @@ -174,6 +182,7 @@ def from_assets(
required_assets_and_checks_by_key: Dict[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
] = {}
execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}

for asset in all_assets:
if isinstance(asset, SourceAsset):
Expand All @@ -184,6 +193,7 @@ def from_assets(
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = AssetExecutionType.UNEXECUTABLE
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
Expand All @@ -195,6 +205,8 @@ def from_assets(
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = asset.asset_execution_type_for_asset(key)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
Expand All @@ -218,15 +230,34 @@ def from_assets(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if AssetExecutionType.is_executable(v)
}

def is_executable(self, asset_key: AssetKey) -> bool:
return AssetExecutionType.is_executable(self._execution_types_by_key[asset_key])

@property
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.MATERIALIZATION
}

def is_materializable(self, asset_key: AssetKey) -> bool:
return self._execution_types_by_key[asset_key] == AssetExecutionType.MATERIALIZATION

@property
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys | self.source_asset_keys
return self._execution_types_by_key.keys()

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
return self._partitions_defs_by_key.get(asset_key)
Expand Down Expand Up @@ -275,7 +306,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool:
)

def is_observable(self, asset_key: AssetKey) -> bool:
return self._is_observable_by_key.get(asset_key, False)
return (
self._is_observable_by_key.get(asset_key, False)
or self._execution_types_by_key[asset_key] == AssetExecutionType.OBSERVATION
)

def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets that depend on the given asset."""
Expand Down Expand Up @@ -718,6 +752,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -732,6 +767,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._assets = assets
self._source_assets = source_assets
Expand Down Expand Up @@ -760,13 +796,15 @@ def source_assets(self) -> Sequence[SourceAsset]:
def asset_checks(self) -> Sequence[AssetChecksDefinition]:
return self._asset_checks

def includes_materializable_and_source_assets(self, asset_keys: AbstractSet[AssetKey]) -> bool:
def includes_materializable_and_external_assets(
self, asset_keys: AbstractSet[AssetKey]
) -> bool:
"""Returns true if the given asset keys contains at least one materializable asset and
at least one source asset.
"""
selected_source_assets = self.source_asset_keys & asset_keys
selected_regular_assets = asset_keys - self.source_asset_keys
return len(selected_source_assets) > 0 and len(selected_regular_assets) > 0
selected_external_assets = self.external_asset_keys & asset_keys
selected_regular_assets = asset_keys - self.external_asset_keys
return len(selected_external_assets) > 0 and len(selected_regular_assets) > 0


def sort_key_for_asset_partition(
Expand Down
45 changes: 25 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"],
observable_external_assets_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
source_assets: Sequence["SourceAsset"],
resolved_asset_deps: "ResolvedAssetDependencies",
) -> "AssetLayer":
Expand Down Expand Up @@ -541,30 +541,29 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:

assets_defs_by_key = {
key: assets_def
for assets_def in assets_defs_by_outer_node_handle.values()
for assets_def in [*assets_defs_by_outer_node_handle.values(), *observable_external_assets_by_node_handle.values()]
for key in assets_def.keys
}

source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}
for node_handle, source_asset in observable_source_assets_by_node_handle.items():
node_def = cast(NodeDefinition, source_asset.node_def)
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)
for node_handle, external_asset in observable_external_assets_by_node_handle.items():
node_def = cast(NodeDefinition, external_asset.node_def)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
source_asset.key,
partitions_fn=None,
partitions_def=source_asset.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)
for output_name, asset_key in external_asset.node_keys_by_output_name.items():
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
asset_key,
partitions_fn=None,
partitions_def=external_asset.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
# nodes for assets
Expand Down Expand Up @@ -894,6 +893,12 @@ def build_asset_selection_job(
final_asset_checks = []
final_source_assets = included_source_assets

print("ASSETS", assets)
print("INCLUDED ASSETS", included_assets)
print("INCLUDED SOURCE ASSETS", included_source_assets)
print("BUILDING")
print("FINAL ASSETS", final_assets)
print("FINAL SOURCE ASSETS", final_source_assets)
return build_assets_job(
name=name,
assets=final_assets,
Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
Expand All @@ -23,15 +23,28 @@
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"

# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
# in the auto-materialize daemon. It should eventually be eliminated in favor
# of an implementation of `auto_observe_interval_minutes` in terms of
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"


class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {
def is_executable(execution_type: Optional[Union[str, "AssetExecutionType"]]) -> bool:
enum_value = (
AssetExecutionType.str_to_enum(execution_type)
if not isinstance(execution_type, AssetExecutionType)
else execution_type
)
return enum_value in {
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
}
Expand Down
Loading

0 comments on commit db14c71

Please sign in to comment.