Skip to content

Commit

Permalink
[external-assets] Build base asset jobs using AssetGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 8, 2024
1 parent 06e3dba commit c89e7a3
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ def get_execution_set_asset_and_check_keys(
def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes))

def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys([self.get(key).assets_def for key in keys]))

@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return list(dict.fromkeys(self._asset_checks_defs_by_key.values()))
55 changes: 17 additions & 38 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand All @@ -18,6 +17,7 @@

import dagster._check as check
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.internal_asset_graph import InternalAssetGraph
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.selector.subset_selector import AssetSelectionData
Expand Down Expand Up @@ -61,58 +61,37 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
asset_checks: Sequence[AssetChecksDefinition],
asset_graph: InternalAssetGraph,
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
executable_assets = [a for a in assets if a.is_executable]
unexecutable_assets = [a for a in assets if not a.is_executable]

executable_assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[AssetsDefinition]
] = defaultdict(list)
for asset in executable_assets:
executable_assets_by_partitions_def[asset.partitions_def].append(asset)
# sort to ensure some stability in the ordering
all_partitions_defs = sorted(
[p for p in executable_assets_by_partitions_def.keys() if p], key=repr
)

if len(all_partitions_defs) == 0:
if len(asset_graph.all_partitions_defs) == 0:
executable_asset_keys = asset_graph.executable_asset_keys
loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
executable_assets=executable_assets,
loadable_assets=unexecutable_assets,
asset_checks=asset_checks,
executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys),
loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys),
asset_checks=asset_graph.asset_checks_defs,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, [])
jobs = []

for i, partitions_def in enumerate(all_partitions_defs):
# all base jobs contain all unpartitioned assets
executable_assets_for_job = [
*executable_assets_by_partitions_def[partitions_def],
*unpartitioned_executable_assets,
]
for i, partitions_def in enumerate(asset_graph.all_partitions_defs):
executable_asset_keys = asset_graph.executable_asset_keys & {
*asset_graph.asset_keys_for_partitions_def(partitions_def=partitions_def),
*asset_graph.unpartitioned_asset_keys,
}
loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
executable_assets=executable_assets_for_job,
loadable_assets=[
*(
asset
for asset in executable_assets
if asset not in executable_assets_for_job
),
*unexecutable_assets,
],
asset_checks=asset_checks,
executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys),
loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys),
asset_checks=asset_graph.asset_checks_defs,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,20 @@ def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]:
"""
return [set(level) for level in toposort.toposort(self.asset_dep_graph["upstream"])]

@property
@cached_method
def unpartitioned_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if not node.is_partitioned}

def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.group_name == group_name}

@cached_method
def asset_keys_for_partitions_def(
self, *, partitions_def: PartitionsDefinition
) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.partitions_def == partitions_def}

@cached_property
def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable asset keys that have no materializable parents."""
Expand All @@ -236,6 +247,12 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {key for asset in self.asset_nodes for key in asset.check_keys}

@cached_property
def all_partitions_defs(self) -> Sequence[PartitionsDefinition]:
return sorted(
set(node.partitions_def for node in self.asset_nodes if node.partitions_def), key=repr
)

@cached_property
def all_group_names(self) -> AbstractSet[str]:
return {a.group_name for a in self.asset_nodes if a.group_name is not None}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import AssetGraph, AssetKeyOrCheckKey, AssetNode
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.selector.subset_selector import generate_asset_dep_graph
from dagster._utils.cached_method import cached_method


class LocalAssetNode(AssetNode):
def __init__(
self, key: AssetKey, assets_def: AssetsDefinition, check_keys: AbstractSet[AssetCheckKey]
):
self.key = key
self.assets_def = assets_def
self._check_keys = check_keys

@property
@cached_method
def group_name(self) -> Optional[str]:
return self.assets_def.group_names_by_key.get(self.key)

@property
def is_materializable(self) -> bool:
return self.assets_def.is_materializable

@property
def is_observable(self) -> bool:
return self.assets_def.is_observable

@property
def is_external(self) -> bool:
return self.assets_def.is_external

@property
def is_executable(self) -> bool:
return self.assets_def.is_executable

@property
def metadata(self) -> ArbitraryMetadataMapping:
return self.assets_def.metadata_by_key.get(self.key, {})

@property
def is_partitioned(self) -> bool:
return self.assets_def.partitions_def is not None

@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self.assets_def.partitions_def

@property
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:
return self.assets_def.partition_mappings

@property
def freshness_policy(self) -> Optional[FreshnessPolicy]:
return self.assets_def.freshness_policies_by_key.get(self.key)

@property
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
return self.assets_def.auto_materialize_policies_by_key.get(self.key)

@property
def auto_observe_interval_minutes(self) -> Optional[float]:
return self.assets_def.auto_observe_interval_minutes

@property
def backfill_policy(self) -> Optional[BackfillPolicy]:
return self.assets_def.backfill_policy

@property
def code_version(self) -> Optional[str]:
return self.assets_def.code_versions_by_key.get(self.key)

@property
def check_keys(self) -> AbstractSet[AssetCheckKey]:
return self._check_keys

@property
def execution_set_asset_keys(self) -> AbstractSet[AssetKey]:
return (
{self.key}
if len(self.assets_def.keys) <= 1 or self.assets_def.can_subset
else self.assets_def.keys
)

@property
def execution_set_asset_and_check_keys(self) -> AbstractSet[AssetKeyOrCheckKey]:
if self.assets_def.can_subset or (
len(self.assets_def.keys) <= 1 and not len(self.assets_def.check_keys) > 0
):
return {self.key}
else:
return {*self.assets_def.keys, *self.assets_def.check_keys}


class InternalAssetGraph(AssetGraph[LocalAssetNode]):
_asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition]

def __init__(
self,
asset_nodes_by_key: Mapping[AssetKey, LocalAssetNode],
asset_checks_defs_by_key: Mapping[AssetCheckKey, AssetChecksDefinition],
):
self.asset_nodes_by_key = asset_nodes_by_key
self._asset_checks_defs_by_key = asset_checks_defs_by_key
self._asset_nodes_by_check_key = {
**{
check_key: asset
for asset in asset_nodes_by_key.values()
for check_key in asset.check_keys
},
**{
key: asset_nodes_by_key[checks_def.asset_key]
for key, checks_def in asset_checks_defs_by_key.items()
if checks_def.asset_key in asset_nodes_by_key
},
}

@staticmethod
def normalize_assets(
assets: Iterable[Union[AssetsDefinition, SourceAsset]],
checks_defs: Optional[Iterable[AssetChecksDefinition]] = None,
) -> Sequence[AssetsDefinition]:
"""Normalize a mixed list of AssetsDefinition and SourceAsset to a list of AssetsDefinition.
Normalization includse:
- Converting any SourceAsset to an AssetDefinition
- Resolving all relative asset keys (that sometimes specify dependencies) to absolute asset
keys
- Creating unexecutable external asset definitions for any keys referenced by asset checks
or as dependencies, but for which no definition was provided.
"""
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_asset_from_spec,
)

checks_defs = checks_defs or []

# Convert any source assets to external assets
assets_defs = [
create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a
for a in assets
]
all_keys = {k for asset_def in assets_defs for k in asset_def.keys}

# Resolve all asset dependencies. An asset dependency is resolved when its key is an
# AssetKey not subject to any further manipulation.
resolved_deps = ResolvedAssetDependencies(assets_defs, [])
assets_defs = [
ad.with_attributes(
input_asset_key_replacements={
raw_key: resolved_deps.get_resolved_asset_key_for_input(ad, input_name)
for input_name, raw_key in ad.keys_by_input_name.items()
}
)
for ad in assets_defs
]

# Create unexecutable external assets definitions for any referenced keys for which no
# definition was provided.
all_referenced_asset_keys = {
*(key for asset_def in assets_defs for key in asset_def.dependency_keys),
*(checks_def.asset_key for checks_def in checks_defs),
}
for key in all_referenced_asset_keys.difference(all_keys):
assets_defs.append(
external_asset_from_spec(
AssetSpec(key=key, metadata={SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET: True})
)
)
return assets_defs

@classmethod
def from_assets(
cls,
assets: Iterable[Union[AssetsDefinition, SourceAsset]],
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
) -> "InternalAssetGraph":
asset_checks = asset_checks or []
assets_defs = cls.normalize_assets(assets, asset_checks)

asset_nodes_by_key = {
k: LocalAssetNode(
key=k,
assets_def=ad,
check_keys={
*ad.check_keys,
*(ck for cd in asset_checks if cd.asset_key == k for ck in cd.keys),
},
)
for ad in assets_defs
for k in ad.keys
}
dep_graph = generate_asset_dep_graph(assets_defs, [])
for key, node in asset_nodes_by_key.items():
node.set_children(
{asset_nodes_by_key[child_key] for child_key in dep_graph["downstream"][key]}
)
node.set_parents(
{asset_nodes_by_key[parent_key] for parent_key in dep_graph["upstream"][key]}
)

asset_checks_defs_by_key = {
key: check for check in (asset_checks or []) for key in check.keys
}

return InternalAssetGraph(
asset_nodes_by_key=asset_nodes_by_key,
asset_checks_defs_by_key=asset_checks_defs_by_key,
)

##### COMMON ASSET GRAPH INTERFACE

def get_execution_set_asset_and_check_keys(
self, asset_or_check_key: AssetKeyOrCheckKey
) -> AbstractSet[AssetKeyOrCheckKey]:
if isinstance(asset_or_check_key, AssetKey):
return self.get(asset_or_check_key).execution_set_asset_and_check_keys
# only checks emitted by AssetsDefinition have required keys
elif asset_or_check_key in self._asset_checks_defs_by_key:
return {asset_or_check_key}
else:
asset_node = self._asset_nodes_by_check_key[asset_or_check_key]
asset_unit_keys = asset_node.execution_set_asset_and_check_keys
return (
asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key}
)

##### INTERNAL-SPECIFIC INTERFACE

@property
@cached_method
def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes))

def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefinition]:
return list(dict.fromkeys([self.get(key).assets_def for key in keys]))

@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return list(dict.fromkeys(self._asset_checks_defs_by_key.values()))
Loading

0 comments on commit c89e7a3

Please sign in to comment.