Skip to content

Commit

Permalink
[external-assets] Implement AssetGraph with LocalAssetNode and Global…
Browse files Browse the repository at this point in the history
…Assetode
  • Loading branch information
smackesey committed Feb 27, 2024
1 parent c06c7cc commit 38dcce2
Show file tree
Hide file tree
Showing 18 changed files with 421 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def get_implicit_auto_materialize_policy(
asset_key: AssetKey, asset_graph: AssetGraph
) -> Optional[AutoMaterializePolicy]:
"""For backcompat with pre-auto materialize policy graphs, assume a default scope of 1 day."""
auto_materialize_policy = asset_graph.get_auto_materialize_policy(asset_key)
if auto_materialize_policy is None:
time_partitions_def = get_time_partitions_def(asset_graph.get_partitions_def(asset_key))
asset = asset_graph.get_asset(asset_key)
if asset.auto_materialize_policy is None:
time_partitions_def = get_time_partitions_def(asset.partitions_def)
if time_partitions_def is None:
max_materializations_per_minute = None
elif time_partitions_def.schedule_type == ScheduleType.HOURLY:
Expand All @@ -77,7 +77,7 @@ def get_implicit_auto_materialize_policy(
rules=rules,
max_materializations_per_minute=max_materializations_per_minute,
)
return auto_materialize_policy
return asset.auto_materialize_policy


class AssetDaemonContext:
Expand Down Expand Up @@ -150,7 +150,10 @@ def asset_records_to_prefetch(self) -> Sequence[AssetKey]:
return [
key
for key in self.auto_materialize_asset_keys_and_parents
if (self.asset_graph.has_asset(key) and self.asset_graph.is_materializable(key))
if (
self.asset_graph.has_asset(key)
and self.asset_graph.get_asset(key).is_materializable
)
]

@property
Expand Down Expand Up @@ -193,7 +196,7 @@ def evaluate_asset(
"""
# convert the legacy AutoMaterializePolicy to an Evaluator
asset_condition = check.not_none(
self.asset_graph.get_auto_materialize_policy(asset_key)
self.asset_graph.get_asset(asset_key).auto_materialize_policy
).to_asset_condition()

asset_cursor = self.cursor.get_previous_evaluation_state(asset_key)
Expand Down Expand Up @@ -434,7 +437,7 @@ def build_run_requests_with_backfill_policies(
run_requests.append(RunRequest(asset_selection=list(asset_keys), tags=tags))
else:
backfill_policies = {
check.not_none(asset_graph.get_backfill_policy(asset_key))
check.not_none(asset_graph.get_asset(asset_key).backfill_policy)
for asset_key in asset_keys
}
if len(backfill_policies) == 1:
Expand All @@ -453,7 +456,7 @@ def build_run_requests_with_backfill_policies(
else:
# if backfill policies are different, we need to backfill them separately
for asset_key in asset_keys:
backfill_policy = asset_graph.get_backfill_policy(asset_key)
backfill_policy = asset_graph.get_asset(asset_key).backfill_policy
run_requests.extend(
_build_run_requests_with_backfill_policy(
[asset_key],
Expand Down Expand Up @@ -567,7 +570,9 @@ def get_auto_observe_run_requests(
assets_to_auto_observe: Set[AssetKey] = set()
for asset_key in auto_observe_asset_keys:
last_observe_request_timestamp = last_observe_request_timestamp_by_asset_key.get(asset_key)
auto_observe_interval_minutes = asset_graph.get_auto_observe_interval_minutes(asset_key)
auto_observe_interval_minutes = asset_graph.get_asset(
asset_key
).auto_observe_interval_minutes

if auto_observe_interval_minutes and (
last_observe_request_timestamp is None
Expand Down
154 changes: 99 additions & 55 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
AbstractSet,
Callable,
Dict,
Generic,
Iterable,
Iterator,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
TypeVar,
Union,
cast,
)

import toposort
from typing_extensions import Self

import dagster._check as check
from dagster._core.definitions.asset_subset import ValidAssetSubset
Expand Down Expand Up @@ -67,61 +70,135 @@ class ParentsPartitionsResult(NamedTuple):
required_but_nonexistent_parents_partitions: AbstractSet[AssetKeyPartitionKey]


class AssetGraph(ABC):
class AssetNode(ABC):
@property
@abstractmethod
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
def key(self) -> AssetKey:
...

@property
@abstractmethod
def has_asset(self, asset_key: AssetKey) -> bool:
def group_name(self) -> Optional[str]:
...

@property
@abstractmethod
def all_asset_keys(self) -> AbstractSet[AssetKey]:
def is_materializable(self) -> bool:
...

@property
@abstractmethod
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
def is_observable(self) -> bool:
...

@property
@abstractmethod
def is_materializable(self, asset_key: AssetKey) -> bool:
def is_external(self) -> bool:
...

@property
@abstractmethod
def observable_asset_keys(self) -> AbstractSet[AssetKey]:
def is_executable(self) -> bool:
...

@property
@abstractmethod
def is_observable(self, asset_key: AssetKey) -> bool:
def is_partitioned(self) -> bool:
...

@property
@abstractmethod
def external_asset_keys(self) -> AbstractSet[AssetKey]:
def partitions_def(self) -> Optional[PartitionsDefinition]:
...

@property
@abstractmethod
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:
...

def get_partition_mapping(self, parent_asset: Self) -> PartitionMapping:
return infer_partition_mapping(
self.partition_mappings.get(parent_asset.key),
self.partitions_def,
parent_asset.partitions_def,
)

@property
@abstractmethod
def is_external(self, asset_key: AssetKey) -> bool:
def freshness_policy(self) -> Optional[FreshnessPolicy]:
...

@property
@abstractmethod
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
...

@property
@abstractmethod
def is_executable(self, asset_key: AssetKey) -> bool:
def auto_observe_interval_minutes(self) -> Optional[float]:
...

@property
@abstractmethod
def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
def backfill_policy(self) -> Optional[BackfillPolicy]:
...

@property
@abstractmethod
def code_version(self) -> Optional[str]:
...


T_AssetNode = TypeVar("T_AssetNode", bound=AssetNode)


class AssetGraph(ABC, Generic[T_AssetNode]):
_asset_nodes: Sequence[T_AssetNode]

@property
def asset_nodes(self) -> Iterable[T_AssetNode]:
return self._asset_nodes

def has_asset(self, asset_key: AssetKey) -> bool:
return asset_key in self.all_asset_keys

@cached_method
def get_asset(self, asset_key: AssetKey) -> T_AssetNode:
return next(node for node in self.asset_nodes if node.key == asset_key)

@property
@abstractmethod
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
...

@property
@cached_method
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes}

@property
@cached_method
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.is_materializable}

@property
@cached_method
def observable_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.is_observable}

@property
@cached_method
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.is_external}

@property
@cached_method
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.is_executable}

def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
return {node.key for node in self.asset_nodes if node.group_name == group_name}

@functools.cached_property
def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable asset keys that have no materializable parents."""
Expand All @@ -141,23 +218,14 @@ def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
def all_group_names(self) -> AbstractSet[str]:
...

@abstractmethod
def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
...

@abstractmethod
def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]:
...
# Performing an existence check temporarily until we change callsites
return self.get_asset(asset_key).partitions_def if self.has_asset(asset_key) else None

def get_partition_mapping(
self, asset_key: AssetKey, in_asset_key: AssetKey
) -> PartitionMapping:
partition_mappings = self.get_partition_mappings(asset_key)
return infer_partition_mapping(
partition_mappings.get(in_asset_key),
self.get_partitions_def(asset_key),
self.get_partitions_def(in_asset_key),
)
return self.get_asset(asset_key).get_partition_mapping(self.get_asset(in_asset_key))

def get_partitions_in_range(
self,
Expand All @@ -179,30 +247,6 @@ def is_partitioned(self, asset_key: AssetKey) -> bool:
# changed to verify this first.
return asset_key in self.all_asset_keys and self.get_partitions_def(asset_key) is not None

@abstractmethod
def get_group_name(self, asset_key: AssetKey) -> Optional[str]:
...

@abstractmethod
def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]:
...

@abstractmethod
def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]:
...

@abstractmethod
def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
...

@abstractmethod
def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]:
...

@abstractmethod
def get_code_version(self, asset_key: AssetKey) -> Optional[str]:
...

def have_same_partitioning(self, asset_key1: AssetKey, asset_key2: AssetKey) -> bool:
"""Returns whether the given assets have the same partitions definition."""
return self.get_partitions_def(asset_key1) == self.get_partitions_def(asset_key2)
Expand Down Expand Up @@ -477,10 +521,10 @@ def get_parent_partition_keys_for_child(

def has_materializable_parents(self, asset_key: AssetKey) -> bool:
"""Determines if an asset has any parents which are materializable."""
if self.is_external(asset_key):
if self.get_asset(asset_key).is_external:
return False
return any(
self.has_asset(parent_key) and self.is_materializable(parent_key)
self.has_asset(parent_key) and self.get_asset(parent_key).is_materializable
for parent_key in self.get_parents(asset_key) - {asset_key}
)

Expand All @@ -494,7 +538,7 @@ def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey]
key
for key in self.upstream_key_iterator(asset_key)
if self.has_asset(key)
and self.is_materializable(key)
and self.get_asset(key).is_materializable
and not self.has_materializable_parents(key)
}

Expand Down Expand Up @@ -531,16 +575,16 @@ def toposort_asset_keys(self) -> Sequence[AbstractSet[AssetKey]]:
def get_downstream_freshness_policies(
self, *, asset_key: AssetKey
) -> AbstractSet[FreshnessPolicy]:
asset = self.get_asset(asset_key)
downstream_policies = set().union(
*(
self.get_downstream_freshness_policies(asset_key=child_key)
for child_key in self.get_children(asset_key)
if child_key != asset_key
)
)
current_policy = self.get_freshness_policy(asset_key)
if self.get_partitions_def(asset_key) is None and current_policy is not None:
downstream_policies.add(current_policy)
if asset.partitions_def is None and asset.freshness_policy is not None:
downstream_policies.add(asset.freshness_policy)

return downstream_policies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[Cha
return [ChangeReason.NEW]

changes = []
if self.branch_asset_graph.get_code_version(
asset_key
) != self.base_asset_graph.get_code_version(asset_key):
if (
self.branch_asset_graph.get_asset(asset_key).code_version
!= self.base_asset_graph.get_asset(asset_key).code_version
):
changes.append(ChangeReason.CODE_VERSION)

if self.branch_asset_graph.get_parents(asset_key) != self.base_asset_graph.get_parents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ def evaluate_for_asset(
# observation
if not (
context.asset_graph.has_asset(parent.asset_key)
and context.asset_graph.is_executable(parent.asset_key)
and context.asset_graph.get_asset(parent.asset_key).is_executable
):
continue
if not context.instance_queryer.asset_partition_has_materialization_or_observation(
Expand Down
Loading

0 comments on commit 38dcce2

Please sign in to comment.