Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external assets] source assets -> external assets #18217

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DagsterInstance,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.events.log import EventLogEntry
Expand Down Expand Up @@ -315,6 +316,7 @@ def _build_cross_repo_deps(
)
],
depended_by=[],
execution_type=AssetExecutionType.UNEXECUTABLE,
)

return sink_assets, external_asset_deps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -259,6 +260,7 @@ class GrapheneAssetNode(graphene.ObjectType):
groupName = graphene.String()
id = graphene.NonNull(graphene.ID)
isExecutable = graphene.NonNull(graphene.Boolean)
isExternal = graphene.NonNull(graphene.Boolean)
isObservable = graphene.NonNull(graphene.Boolean)
isPartitioned = graphene.NonNull(graphene.Boolean)
isSource = graphene.NonNull(graphene.Boolean)
Expand Down Expand Up @@ -501,7 +503,8 @@ def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
node = self._external_asset_node
return node.is_source or node.is_external and len(node.dependencies) == 0

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -962,7 +965,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None

def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable
return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION

def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_external

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_executable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,8 +1335,8 @@ def never_runs_asset(

hanging_job = build_assets_job(
name="hanging_job",
source_assets=[dummy_source_asset],
assets=[first_asset, hanging_asset, never_runs_asset],
other_assets=[dummy_source_asset],
assets_to_execute=[first_asset, hanging_asset, never_runs_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
"hanging_asset_resource": hanging_asset_resource,
Expand Down Expand Up @@ -1383,7 +1383,7 @@ def downstream_asset(hanging_graph):

hanging_graph_asset_job = build_assets_job(
name="hanging_graph_asset_job",
assets=[hanging_graph_asset, downstream_asset],
assets_to_execute=[hanging_graph_asset, downstream_asset],
resource_defs={
"hanging_asset_resource": hanging_asset_resource,
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1401,7 +1401,7 @@ def asset_two(asset_one):
return asset_one + 1


two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two])
two_assets_job = build_assets_job(name="two_assets_job", assets_to_execute=[asset_one, asset_two])


@asset
Expand All @@ -1412,7 +1412,7 @@ def executable_asset() -> None:
unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")])))

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
name="executable_test_job", assets_to_execute=[executable_asset, unexecutable_asset]
)

static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"])
Expand Down Expand Up @@ -1455,7 +1455,7 @@ def downstream_dynamic_partitioned_asset(

dynamic_partitioned_assets_job = build_assets_job(
"dynamic_partitioned_assets_job",
assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
assets_to_execute=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
)


Expand Down Expand Up @@ -1529,7 +1529,7 @@ def yield_partition_materialization():

partition_materialization_job = build_assets_job(
"partition_materialization_job",
assets=[yield_partition_materialization],
assets_to_execute=[yield_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1543,7 +1543,7 @@ def fail_partition_materialization(context):

fail_partition_materialization_job = build_assets_job(
"fail_partition_materialization_job",
assets=[fail_partition_materialization],
assets_to_execute=[fail_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1562,7 +1562,7 @@ def hanging_partition_asset(context):

hanging_partition_asset_job = build_assets_job(
"hanging_partition_asset_job",
assets=[hanging_partition_asset],
assets_to_execute=[hanging_partition_asset],
executor_def=in_process_executor,
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1580,7 +1580,7 @@ def asset_yields_observation():

observation_job = build_assets_job(
"observation_job",
assets=[asset_yields_observation],
assets_to_execute=[asset_yields_observation],
executor_def=in_process_executor,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,5 @@ def blocking_asset(**kwargs):
auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
config=None, # gets config from asset_def.op
_execution_type=asset_def.execution_type,
)
71 changes: 61 additions & 10 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
Expand All @@ -100,10 +102,8 @@ def __init__(
self._is_observable_by_key = is_observable_by_key
self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key
# source assets keys can sometimes appear in the upstream dict
self._materializable_asset_keys = (
self._asset_dep_graph["upstream"].keys() - self.source_asset_keys
)
self._required_assets_and_checks_by_key = required_assets_and_checks_by_key
self._execution_types_by_key = execution_types_by_key

@property
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
Expand All @@ -117,6 +117,10 @@ def group_names_by_key(self) -> Mapping[AssetKey, Optional[str]]:
def source_asset_keys(self) -> AbstractSet[AssetKey]:
return self._source_asset_keys

@property
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return self.all_asset_keys - self.materializable_asset_keys

@functools.cached_property
def root_asset_keys(self) -> AbstractSet[AssetKey]:
"""Non-source asset keys that have no non-source parents."""
Expand All @@ -140,6 +144,10 @@ def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]:
def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolicy]]:
return self._freshness_policies_by_key

@property
def observable_keys(self) -> AbstractSet[AssetKey]:
return {key for key, is_observable in self._is_observable_by_key.items() if is_observable}

@property
def auto_materialize_policies_by_key(
self,
Expand All @@ -150,6 +158,10 @@ def auto_materialize_policies_by_key(
def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]:
return self._backfill_policies_by_key

@property
def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]:
return self._execution_types_by_key

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
return self._auto_observe_interval_minutes_by_key.get(asset_key)

Expand All @@ -158,13 +170,18 @@ def from_assets(
all_assets: Iterable[Union[AssetsDefinition, SourceAsset]],
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
) -> "InternalAssetGraph":
from dagster._core.definitions.external_asset import (
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
)

assets_defs: List[AssetsDefinition] = []
source_assets: List[SourceAsset] = []
partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {}
partition_mappings_by_key: Dict[
AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]
] = {}
group_names_by_key: Dict[AssetKey, Optional[str]] = {}
execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}
freshness_policies_by_key: Dict[AssetKey, Optional[FreshnessPolicy]] = {}
auto_materialize_policies_by_key: Dict[AssetKey, Optional[AutoMaterializePolicy]] = {}
backfill_policies_by_key: Dict[AssetKey, Optional[BackfillPolicy]] = {}
Expand All @@ -184,6 +201,7 @@ def from_assets(
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = AssetExecutionType.UNEXECUTABLE
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
Expand All @@ -195,6 +213,27 @@ def from_assets(
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = asset.execution_type

is_observable = asset.execution_type == AssetExecutionType.OBSERVATION
is_observable_by_key.update({key: is_observable for key in asset.keys})

# Set auto_observe_interval_minutes for external observable assets
# This can be removed when/if we have a a solution for mapping
# `auto_observe_interval_minutes` to an AutoMaterialzePolicy
first_key = next(iter(asset.keys), None)
if (
first_key
and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
in asset.metadata_by_key[first_key]
):
interval = asset.metadata_by_key[first_key][
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
]
auto_observe_interval_minutes_by_key.update(
{key: interval for key in asset.keys}
)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
Expand All @@ -218,15 +257,20 @@ def from_assets(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.MATERIALIZATION
}

@property
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys | self.source_asset_keys
return self._execution_types_by_key.keys()

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
return self._partitions_defs_by_key.get(asset_key)
Expand Down Expand Up @@ -275,7 +319,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool:
)

def is_observable(self, asset_key: AssetKey) -> bool:
return self._is_observable_by_key.get(asset_key, False)
return (
self._is_observable_by_key.get(asset_key, False)
or self._execution_types_by_key.get(asset_key) == AssetExecutionType.OBSERVATION
)

def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets that depend on the given asset."""
Expand Down Expand Up @@ -718,6 +765,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -732,6 +780,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._assets = assets
self._source_assets = source_assets
Expand Down Expand Up @@ -760,13 +809,15 @@ def source_assets(self) -> Sequence[SourceAsset]:
def asset_checks(self) -> Sequence[AssetChecksDefinition]:
return self._asset_checks

def includes_materializable_and_source_assets(self, asset_keys: AbstractSet[AssetKey]) -> bool:
def includes_materializable_and_external_assets(
self, asset_keys: AbstractSet[AssetKey]
) -> bool:
"""Returns true if the given asset keys contains at least one materializable asset and
at least one source asset.
"""
selected_source_assets = self.source_asset_keys & asset_keys
selected_regular_assets = asset_keys - self.source_asset_keys
return len(selected_source_assets) > 0 and len(selected_regular_assets) > 0
selected_external_assets = self.external_asset_keys & asset_keys
selected_regular_assets = asset_keys - self.external_asset_keys
return len(selected_external_assets) > 0 and len(selected_regular_assets) > 0


def sort_key_for_asset_partition(
Expand Down
Loading