Skip to content

Commit

Permalink
[external-assets] External asset renames
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 12, 2024
1 parent 08255d1 commit da95774
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def resolve_assetMaterializationUsedData(
if not event_records:
return []

if not asset_graph.has_non_source_parents(asset_key):
if not asset_graph.has_materializable_parents(asset_key):
return []

used_data_times = data_time_resolver.get_data_time_by_key_for_record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def asset_records_to_prefetch(self) -> Sequence[AssetKey]:
return [
key
for key in self.auto_materialize_asset_keys_and_parents
if not self.asset_graph.is_source(key)
if not self.asset_graph.is_external(key)
]

@property
Expand Down
38 changes: 19 additions & 19 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.selector.subset_selector import (
DependencyGraph,
fetch_sources,
fetch_roots,
generate_asset_dep_graph,
)
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -149,18 +149,18 @@ def source_asset_keys(self) -> AbstractSet[AssetKey]:
return self._source_asset_keys

@functools.cached_property
def root_asset_keys(self) -> AbstractSet[AssetKey]:
"""Non-source asset keys that have no non-source parents."""
def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable asset keys that have no materializable parents."""
from .asset_selection import AssetSelection

return AssetSelection.keys(*self.materializable_asset_keys).roots().resolve(self)

@functools.cached_property
def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable or observable source asset keys that have no parents which are
"""Materializable or observable asset keys that have no parents which are
materializable or observable.
"""
return fetch_sources(self._asset_dep_graph, self.executable_asset_keys)
return fetch_roots(self._asset_dep_graph, self.executable_asset_keys)

@property
def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolicy]]:
Expand Down Expand Up @@ -587,30 +587,30 @@ def get_parent_partition_keys_for_child(
current_time=current_time,
)

def is_source(self, asset_key: AssetKey) -> bool:
return (
asset_key in self.source_asset_keys or asset_key not in self.materializable_asset_keys
def is_external(self, asset_key: AssetKey) -> bool:
return AssetExecutionType.MATERIALIZATION not in self._execution_types_by_key.get(
asset_key, []
)

def has_non_source_parents(self, asset_key: AssetKey) -> bool:
"""Determines if an asset has any parents which are not source assets."""
if self.is_source(asset_key):
def has_materializable_parents(self, asset_key: AssetKey) -> bool:
"""Determines if an asset has any materializable parents assets."""
if self.is_external(asset_key):
return False
return any(
not self.is_source(parent_key)
not self.is_external(parent_key)
for parent_key in self.get_parents(asset_key) - {asset_key}
)

def get_non_source_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets upstream of the given asset which do not consume any other
AssetsDefinitions (but may consume SourceAssets).
def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all materializable assets upstream of the given asset which do not consume any
other materializable assets.
"""
if not self.has_non_source_parents(asset_key):
if not self.has_materializable_parents(asset_key):
return {asset_key}
return {
key
for key in self.upstream_key_iterator(asset_key)
if not self.is_source(key) and not self.has_non_source_parents(key)
if not self.is_external(key) and not self.has_materializable_parents(key)
}

def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]:
Expand All @@ -619,7 +619,7 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]:
queue = deque([asset_key])
while queue:
current_key = queue.popleft()
if self.is_source(current_key):
if self.is_external(current_key):
continue
for parent_key in self.get_parents(current_key):
if parent_key not in visited:
Expand Down Expand Up @@ -882,7 +882,7 @@ def includes_materializable_and_external_assets(
self, asset_keys: AbstractSet[AssetKey]
) -> bool:
"""Returns true if the given asset keys contains at least one materializable asset and
at least one source asset.
at least one external asset.
"""
selected_external_assets = self.external_asset_keys & asset_keys
selected_regular_assets = asset_keys - self.external_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from dagster._core.errors import DagsterInvalidSubsetError
from dagster._core.selector.subset_selector import (
fetch_connected,
fetch_roots,
fetch_sinks,
fetch_sources,
parse_clause,
)
from dagster._serdes.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -634,7 +634,7 @@ class RootsAssetSelection(

def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
selection = self.child.resolve_inner(asset_graph)
return fetch_sources(asset_graph.asset_dep_graph, selection)
return fetch_roots(asset_graph.asset_dep_graph, selection)

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
return self.replace(child=self.child.to_serializable_asset_selection(asset_graph))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ def evaluate_for_asset(
asset_partition=candidate
):
# ignore non-observable sources, which will never have a materialization or observation
if context.asset_graph.is_source(
if context.asset_graph.is_external(
parent.asset_key
) and not context.asset_graph.is_observable(parent.asset_key):
continue
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/definitions/data_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _upstream_records_by_key(

for parent_key in self.asset_graph.get_parents(asset_key):
if (
parent_key in self.asset_graph.source_asset_keys
parent_key in self.asset_graph.external_asset_keys
and not self.asset_graph.is_observable(parent_key)
):
continue
Expand Down Expand Up @@ -218,7 +218,7 @@ def _calculate_data_time_by_key_unpartitioned(
asset_key, record_id, record_tags_dict
)
if not upstream_records_by_key:
if not self.asset_graph.has_non_source_parents(asset_key):
if not self.asset_graph.has_materializable_parents(asset_key):
return {
asset_key: datetime.datetime.fromtimestamp(
record_timestamp, tz=datetime.timezone.utc
Expand Down Expand Up @@ -307,7 +307,7 @@ def _calculate_data_time_by_key(
current_time: datetime.datetime,
) -> Mapping[AssetKey, Optional[datetime.datetime]]:
if record_id is None:
return {key: None for key in self.asset_graph.get_non_source_roots(asset_key)}
return {key: None for key in self.asset_graph.get_materializable_roots(asset_key)}
record_timestamp = check.not_none(record_timestamp)

partitions_def = self.asset_graph.get_partitions_def(asset_key)
Expand Down Expand Up @@ -372,7 +372,7 @@ def _get_in_progress_data_time_in_run(

# if you're here, then this asset is planned, but not materialized. in the worst case, this
# asset's data time will be equal to the current time once it finishes materializing
if not self.asset_graph.has_non_source_parents(asset_key):
if not self.asset_graph.has_materializable_parents(asset_key):
return current_time

data_time = current_time
Expand Down
18 changes: 9 additions & 9 deletions python_modules/dagster/dagster/_core/definitions/data_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def _get_status(self, key: "AssetKeyPartitionKey") -> StaleStatus:
current_version = self._get_current_data_version(key=key)
if current_version == NULL_DATA_VERSION:
return StaleStatus.MISSING
elif self.asset_graph.is_source(key.asset_key):
elif self.asset_graph.is_external(key.asset_key):
return StaleStatus.FRESH
else:
causes = self._get_stale_causes(key=key)
Expand All @@ -442,7 +442,7 @@ def _get_stale_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause]
# is inferred) for backcompat.
if self.asset_graph.is_partitioned(key.asset_key) and not key.partition_key:
return []
elif self.asset_graph.is_source(key.asset_key):
elif self.asset_graph.is_external(key.asset_key):
return []
else:
current_version = self._get_current_data_version(key=key)
Expand All @@ -461,14 +461,14 @@ def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitio
cursor = provenance.input_storage_ids[dep_key.asset_key]
updated_record = self._instance.get_latest_data_version_record(
dep_key.asset_key,
self.asset_graph.is_source(dep_key.asset_key),
self.asset_graph.is_external(dep_key.asset_key),
dep_key.partition_key,
after_cursor=cursor,
)
if updated_record:
previous_record = self._instance.get_latest_data_version_record(
dep_key.asset_key,
self.asset_graph.is_source(dep_key.asset_key),
self.asset_graph.is_external(dep_key.asset_key),
dep_key.partition_key,
before_cursor=cursor + 1 if cursor else None,
)
Expand Down Expand Up @@ -563,7 +563,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato
# timestamps instead of versions this should be removable eventually since
# provenance is on all newer materializations. If dep is a source, then we'll never
# provide a stale reason here.
elif not self.asset_graph.is_source(dep_key.asset_key):
elif not self.asset_graph.is_external(dep_key.asset_key):
dep_materialization = self._get_latest_data_version_record(key=dep_key)
if dep_materialization is None:
# The input must be new if it has no materialization
Expand Down Expand Up @@ -622,7 +622,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi
# Currently we can only use asset records, which are fetched in one shot, for non-source
# assets. This is because the most recent AssetObservation is not stored on the AssetRecord.
record = self._get_latest_data_version_record(key=key)
if self.asset_graph.is_source(key.asset_key) and record is None:
if self.asset_graph.is_external(key.asset_key) and record is None:
return DEFAULT_DATA_VERSION
elif record is None:
return NULL_DATA_VERSION
Expand All @@ -632,7 +632,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi

@cached_method
def _is_current_data_version_user_provided(self, *, key: "AssetKeyPartitionKey") -> bool:
if self.asset_graph.is_source(key.asset_key):
if self.asset_graph.is_external(key.asset_key):
return True
else:
provenance = self._get_current_data_provenance(key=key)
Expand All @@ -654,7 +654,7 @@ def _get_current_data_provenance(
# are at the root of the graph (have no dependencies) or are downstream of a volatile asset.
@cached_method
def _is_volatile(self, *, key: "AssetKey") -> bool:
if self.asset_graph.is_source(key):
if self.asset_graph.is_external(key):
return self.asset_graph.is_observable(key)
else:
deps = self.asset_graph.get_parents(key)
Expand All @@ -678,7 +678,7 @@ def _get_latest_data_version_record(
# If an asset record is cached, all of its ancestors have already been cached.
if (
key.partition_key is None
and not self.asset_graph.is_source(key.asset_key)
and not self.asset_graph.is_external(key.asset_key)
and not self.instance_queryer.has_cached_asset_record(key.asset_key)
):
ancestors = self.asset_graph.get_ancestors(key.asset_key, include_self=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_expected_data_time_for_asset_key(
# if asset will not be materialized, just return the current time
elif not will_materialize:
return context.data_time_resolver.get_current_data_time(asset_key, current_time)
elif asset_graph.has_non_source_parents(asset_key):
elif asset_graph.has_materializable_parents(asset_key):
expected_data_time = None
for parent_key in asset_graph.get_parents(asset_key):
# if the parent will be materialized on this tick, and it's not in the same repo, then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,10 @@ def fetch_sinks(
return sinks


def fetch_sources(
def fetch_roots(
graph: DependencyGraph[T_Hashable], within_selection: AbstractSet[T_Hashable]
) -> AbstractSet[T_Hashable]:
"""A source is a node that has no upstream dependencies within the provided selection.
"""A root is a node that has no upstream dependencies within the provided selection.
It can have other dependencies outside of the selection.
"""
dp: Dict[T_Hashable, bool] = {}
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def _process_auto_materialize_tick_generator(
else:
eligible_keys = {
*asset_graph.materializable_asset_keys,
*asset_graph.source_asset_keys,
*asset_graph.external_asset_keys,
}

auto_materialize_asset_keys = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def asset_partition_has_materialization_or_observation(
after_cursor (Optional[int]): Filter parameter such that only records with a storage_id
greater than this value will be considered.
"""
if not self.asset_graph.is_source(asset_partition.asset_key):
if not self.asset_graph.is_external(asset_partition.asset_key):
asset_record = self.get_asset_record(asset_partition.asset_key)
if (
asset_record is None
Expand Down Expand Up @@ -534,7 +534,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor(
"""Finds asset partitions of the given child whose parents have been materialized since
latest_storage_id.
"""
if self.asset_graph.is_source(child_asset_key):
if self.asset_graph.is_external(child_asset_key):
return set(), latest_storage_id

child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key)
Expand All @@ -549,9 +549,9 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor(
]
for parent_asset_key in self.asset_graph.get_parents(child_asset_key):
# ignore non-observable sources
if self.asset_graph.is_source(parent_asset_key) and not self.asset_graph.is_observable(
if self.asset_graph.is_external(
parent_asset_key
):
) and not self.asset_graph.is_observable(parent_asset_key):
continue

# if the parent has not been updated at all since the latest_storage_id, then skip
Expand Down Expand Up @@ -803,7 +803,8 @@ def get_asset_partitions_updated_after_cursor(
if not updated_after_cursor:
return set()
if after_cursor is None or (
not self.asset_graph.is_source(asset_key) and not respect_materialization_data_versions
not self.asset_graph.is_external(asset_key)
and not respect_materialization_data_versions
):
return updated_after_cursor

Expand Down Expand Up @@ -863,7 +864,7 @@ def get_parent_asset_partitions_updated_after_child(
continue

# ignore non-observable source parents
if self.asset_graph.is_source(parent_key) and not self.asset_graph.is_observable(
if self.asset_graph.is_external(parent_key) and not self.asset_graph.is_observable(
parent_key
):
continue
Expand Down Expand Up @@ -909,7 +910,7 @@ def have_ignorable_partition_mapping_for_outdated(
def get_outdated_ancestors(
self, *, asset_partition: AssetKeyPartitionKey
) -> AbstractSet[AssetKey]:
if self.asset_graph.is_source(asset_partition.asset_key):
if self.asset_graph.is_external(asset_partition.asset_key):
return set()

parent_asset_partitions = self.asset_graph.get_parents_partitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def bar(foo):
source_asset = SourceAsset("source_asset")

asset_graph = asset_graph_from_assets([foo, bar, source_asset])
assert asset_graph.get_non_source_roots(AssetKey("bar")) == {AssetKey("foo")}
assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")}


def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_cross_repo_dep_with_source_asset(instance):
asset_graph = ExternalAssetGraph.from_workspace(
_make_context(instance, ["defs1", "downstream_defs"])
)
assert len(asset_graph.source_asset_keys) == 0
assert len(asset_graph.external_asset_keys) == 0
assert asset_graph.get_parents(AssetKey("downstream")) == {AssetKey("asset1")}
assert asset_graph.get_children(AssetKey("asset1")) == {AssetKey("downstream")}
assert (
Expand All @@ -213,7 +213,7 @@ def test_cross_repo_dep_no_source_asset(instance):
asset_graph = ExternalAssetGraph.from_workspace(
_make_context(instance, ["defs1", "downstream_defs_no_source"])
)
assert len(asset_graph.source_asset_keys) == 0
assert len(asset_graph.external_asset_keys) == 0
assert asset_graph.get_parents(AssetKey("downstream_non_arg_dep")) == {AssetKey("asset1")}
assert asset_graph.get_children(AssetKey("asset1")) == {AssetKey("downstream_non_arg_dep")}
assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str,
elif failures == "root_failures":
fail_asset_partitions = set(
(
backfill_data.target_subset.filter_asset_keys(asset_graph.root_asset_keys)
backfill_data.target_subset.filter_asset_keys(
asset_graph.root_materializable_asset_keys
)
).iterate_asset_partitions()
)
elif failures == "random_half_failures":
Expand Down Expand Up @@ -463,7 +465,7 @@ def make_random_subset(
) -> AssetGraphSubset:
# all partitions downstream of half of the partitions in each partitioned root asset
root_asset_partitions: Set[AssetKeyPartitionKey] = set()
for i, root_asset_key in enumerate(sorted(asset_graph.root_asset_keys)):
for i, root_asset_key in enumerate(sorted(asset_graph.root_materializable_asset_keys)):
partitions_def = asset_graph.get_partitions_def(root_asset_key)

if partitions_def is not None:
Expand Down Expand Up @@ -496,7 +498,7 @@ def make_subset_from_partition_keys(
evaluation_time: datetime.datetime,
) -> AssetGraphSubset:
root_asset_partitions: Set[AssetKeyPartitionKey] = set()
for i, root_asset_key in enumerate(sorted(asset_graph.root_asset_keys)):
for i, root_asset_key in enumerate(sorted(asset_graph.root_materializable_asset_keys)):
if asset_graph.get_partitions_def(root_asset_key) is not None:
root_asset_partitions.update(
AssetKeyPartitionKey(root_asset_key, partition_key)
Expand Down
Loading

0 comments on commit da95774

Please sign in to comment.