diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 5a63949c19e0a..961a84e51ceca 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -53,7 +53,7 @@ def get_implicit_auto_materialize_policy( asset_key: AssetKey, asset_graph: BaseAssetGraph ) -> Optional[AutoMaterializePolicy]: """For backcompat with pre-auto materialize policy graphs, assume a default scope of 1 day.""" - auto_materialize_policy = asset_graph.get_auto_materialize_policy(asset_key) + auto_materialize_policy = asset_graph.get(asset_key).auto_materialize_policy if auto_materialize_policy is None: time_partitions_def = get_time_partitions_def(asset_graph.get_partitions_def(asset_key)) if time_partitions_def is None: @@ -147,9 +147,7 @@ def auto_materialize_asset_keys_and_parents(self) -> AbstractSet[AssetKey]: @property def asset_records_to_prefetch(self) -> Sequence[AssetKey]: return [ - key - for key in self.auto_materialize_asset_keys_and_parents - if self.asset_graph.has_asset(key) + key for key in self.auto_materialize_asset_keys_and_parents if self.asset_graph.has(key) ] @property @@ -192,7 +190,7 @@ def evaluate_asset( """ # convert the legacy AutoMaterializePolicy to an Evaluator asset_condition = check.not_none( - self.asset_graph.get_auto_materialize_policy(asset_key) + self.asset_graph.get(asset_key).auto_materialize_policy ).to_asset_condition() asset_cursor = self.cursor.get_previous_evaluation_state(asset_key) @@ -434,7 +432,7 @@ def build_run_requests_with_backfill_policies( run_requests.append(RunRequest(asset_selection=list(asset_keys), tags=tags)) else: backfill_policies = { - check.not_none(asset_graph.get_backfill_policy(asset_key)) + check.not_none(asset_graph.get(asset_key).backfill_policy) for asset_key in asset_keys } if len(backfill_policies) == 1: @@ -453,7 +451,7 @@ def build_run_requests_with_backfill_policies( else: # if backfill policies are different, we need to backfill them separately for asset_key in asset_keys: - backfill_policy = asset_graph.get_backfill_policy(asset_key) + backfill_policy = asset_graph.get(asset_key).backfill_policy run_requests.extend( _build_run_requests_with_backfill_policy( [asset_key], @@ -567,7 +565,7 @@ def get_auto_observe_run_requests( assets_to_auto_observe: Set[AssetKey] = set() for asset_key in auto_observe_asset_keys: last_observe_request_timestamp = last_observe_request_timestamp_by_asset_key.get(asset_key) - auto_observe_interval_minutes = asset_graph.get_auto_observe_interval_minutes(asset_key) + auto_observe_interval_minutes = asset_graph.get(asset_key).auto_observe_interval_minutes if auto_observe_interval_minutes and ( last_observe_request_timestamp is None diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index a8bfddf3f6d9e..dda6c4fcdbad4 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -17,6 +17,7 @@ ) from dagster._core.definitions.events import AssetKey from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.metadata import ArbitraryMetadataMapping from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.definitions.partition_mapping import PartitionMapping from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies @@ -61,6 +62,10 @@ def is_external(self) -> bool: def is_executable(self) -> bool: return self.assets_def.is_executable + @property + def metadata(self) -> ArbitraryMetadataMapping: + return self.assets_def.metadata_by_key.get(self.key, {}) + @property def is_partitioned(self) -> bool: return self.assets_def.partitions_def is not None @@ -245,9 +250,6 @@ def get_execution_set_asset_and_check_keys( asset_unit_keys if asset_or_check_key in asset_unit_keys else {asset_or_check_key} ) - def get_assets_def(self, asset_key: AssetKey) -> AssetsDefinition: - return self.get(asset_key).assets_def - @cached_property def assets_defs(self) -> Sequence[AssetsDefinition]: return list(dict.fromkeys(asset.assets_def for asset in self.asset_nodes)) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py index d3b1c5829b1f8..9334cc223f269 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py @@ -122,9 +122,10 @@ def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[Cha return [ChangeReason.NEW] changes = [] - if self.branch_asset_graph.get_code_version( - asset_key - ) != self.base_asset_graph.get_code_version(asset_key): + if ( + self.branch_asset_graph.get(asset_key).code_version + != self.base_asset_graph.get(asset_key).code_version + ): changes.append(ChangeReason.CODE_VERSION) if self.branch_asset_graph.get_parents(asset_key) != self.base_asset_graph.get_parents( diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 87dc55049da70..c620ad2008be3 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -769,8 +769,8 @@ def evaluate_for_asset( # ignore missing or unexecutable assets, which will never have a materialization or # observation if not ( - context.asset_graph.has_asset(parent.asset_key) - and context.asset_graph.is_executable(parent.asset_key) + context.asset_graph.has(parent.asset_key) + and context.asset_graph.get(parent.asset_key).is_executable ): continue if not context.instance_queryer.asset_partition_has_materialization_or_observation( diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 7e2d36a88cc2b..3832eb1de04d8 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -168,10 +168,6 @@ def asset_nodes(self) -> Iterable[T_AssetNode]: def has(self, asset_key: AssetKey) -> bool: return asset_key in self._asset_nodes_by_key - # To be removed in upstack PR and callsites replaced with `has` - def has_asset(self, asset_key: AssetKey) -> bool: - return self.has(asset_key) - def get(self, asset_key: AssetKey) -> T_AssetNode: return self._asset_nodes_by_key[asset_key] @@ -193,30 +189,18 @@ def all_asset_keys(self) -> AbstractSet[AssetKey]: def materializable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_materializable} - def is_materializable(self, key: AssetKey) -> bool: - return self.get(key).is_materializable - @cached_property def observable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_observable} - def is_observable(self, key: AssetKey) -> bool: - return self.get(key).is_observable - @cached_property def external_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_external} - def is_external(self, key: AssetKey) -> bool: - return self.get(key).is_external - @cached_property def executable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_executable} - def is_executable(self, key: AssetKey) -> bool: - return self.get(key).is_executable - @cached_property def toposorted_asset_keys(self) -> Sequence[AssetKey]: """Return topologically sorted asset keys in graph. Keys with the same topological level are @@ -287,24 +271,6 @@ def get_partitions_in_range( def is_partitioned(self, asset_key: AssetKey) -> bool: return self.get_partitions_def(asset_key) is not None - def get_group_name(self, asset_key: AssetKey) -> Optional[str]: - return self.get(asset_key).group_name - - def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]: - return self.get(asset_key).freshness_policy - - def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - return self.get(asset_key).auto_materialize_policy - - def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: - return self.get(asset_key).auto_observe_interval_minutes - - def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: - return self.get(asset_key).backfill_policy - - def get_code_version(self, asset_key: AssetKey) -> Optional[str]: - return self.get(asset_key).code_version - def have_same_partitioning(self, asset_key1: AssetKey, asset_key2: AssetKey) -> bool: """Returns whether the given assets have the same partitions definition.""" return self.get(asset_key1).partitions_def == self.get(asset_key2).partitions_def @@ -336,7 +302,7 @@ def get_parents(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """Returns all asset keys that are direct dependencies on the given asset key.""" return self.get(asset_key).parent_keys - def get_ancestors( + def get_ancestor_asset_keys( self, asset_key: AssetKey, include_self: bool = False ) -> AbstractSet[AssetKey]: """Returns all nth-order dependencies of an asset.""" @@ -643,7 +609,7 @@ def get_downstream_freshness_policies( downstream_policies = set().union( *( self.get_downstream_freshness_policies(asset_key=child_key) - for child_key in self.get_children(asset_key) + for child_key in self.get(asset_key).child_keys if child_key != asset_key ) ) diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index 76b0b2d3ecf7e..ef441f233b250 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -177,8 +177,7 @@ def _upstream_records_by_key( for parent_key in self.asset_graph.get_parents(asset_key): if not ( - self.asset_graph.has_asset(parent_key) - and self.asset_graph.is_executable(parent_key) + self.asset_graph.has(parent_key) and self.asset_graph.get(parent_key).is_executable ): continue @@ -319,7 +318,7 @@ def _calculate_data_time_by_key( cursor=record_id, partitions_def=partitions_def, ) - elif self.asset_graph.is_observable(asset_key): + elif self.asset_graph.get(asset_key).is_observable: return self._calculate_data_time_by_key_observable_source( asset_key=asset_key, record_id=record_id, @@ -533,18 +532,18 @@ def get_minutes_overdue( asset_key: AssetKey, evaluation_time: datetime.datetime, ) -> Optional[FreshnessMinutes]: - freshness_policy = self.asset_graph.get_freshness_policy(asset_key) - if freshness_policy is None: + asset = self.asset_graph.get(asset_key) + if asset.freshness_policy is None: raise DagsterInvariantViolationError( "Cannot calculate minutes late for asset without a FreshnessPolicy" ) - if self.asset_graph.is_observable(asset_key): + if asset.is_observable: current_data_time = self._get_source_data_time(asset_key, current_time=evaluation_time) else: current_data_time = self.get_current_data_time(asset_key, current_time=evaluation_time) - return freshness_policy.minutes_overdue( + return asset.freshness_policy.minutes_overdue( data_time=current_data_time, evaluation_time=evaluation_time, ) diff --git a/python_modules/dagster/dagster/_core/definitions/data_version.py b/python_modules/dagster/dagster/_core/definitions/data_version.py index a3a5105bb089a..64c587b676166 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_version.py +++ b/python_modules/dagster/dagster/_core/definitions/data_version.py @@ -423,13 +423,14 @@ def get_current_data_version( def _get_status(self, key: "AssetKeyPartitionKey") -> StaleStatus: # The status loader does not support querying for the stale status of a # partitioned asset without specifying a partition, so we return here. - if self.asset_graph.is_partitioned(key.asset_key) and not key.partition_key: + asset = self.asset_graph.get(key.asset_key) + if asset.is_partitioned and not key.partition_key: return StaleStatus.FRESH else: current_version = self._get_current_data_version(key=key) if current_version == NULL_DATA_VERSION: return StaleStatus.MISSING - elif self.asset_graph.is_external(key.asset_key): + elif asset.is_external: return StaleStatus.FRESH else: causes = self._get_stale_causes(key=key) @@ -440,9 +441,10 @@ def _get_stale_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause] # Querying for the stale status of a partitioned asset without specifying a partition key # is strictly speaking undefined, but we return an empty list here (from which FRESH status # is inferred) for backcompat. - if self.asset_graph.is_partitioned(key.asset_key) and not key.partition_key: + asset = self.asset_graph.get(key.asset_key) + if asset.is_partitioned and not key.partition_key: return [] - elif self.asset_graph.is_external(key.asset_key): + elif asset.is_external: return [] else: current_version = self._get_current_data_version(key=key) @@ -454,6 +456,7 @@ def _get_stale_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause] ) def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitionKey") -> bool: + dep_asset = self.asset_graph.get(dep_key.asset_key) if dep_key.partition_key is None: current_data_version = self._get_current_data_version(key=dep_key) return provenance.input_data_versions[dep_key.asset_key] != current_data_version @@ -461,14 +464,14 @@ def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitio cursor = provenance.input_storage_ids[dep_key.asset_key] updated_record = self._instance.get_latest_data_version_record( dep_key.asset_key, - self.asset_graph.is_external(dep_key.asset_key), + dep_asset.is_external, dep_key.partition_key, after_cursor=cursor, ) if updated_record: previous_record = self._instance.get_latest_data_version_record( dep_key.asset_key, - self.asset_graph.is_external(dep_key.asset_key), + dep_asset.is_external, dep_key.partition_key, before_cursor=cursor + 1 if cursor else None, ) @@ -485,7 +488,7 @@ def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitio def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterator[StaleCause]: from dagster._core.definitions.events import AssetKeyPartitionKey - code_version = self.asset_graph.get_code_version(key.asset_key) + code_version = self.asset_graph.get(key.asset_key).code_version provenance = self._get_current_data_provenance(key=key) asset_deps = self.asset_graph.get_parents(key.asset_key) @@ -513,6 +516,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # partition counts. partition_deps = self._get_partition_dependencies(key=key) for dep_key in sorted(partition_deps): + dep_asset = self.asset_graph.get(dep_key.asset_key) if self._get_status(key=dep_key) == StaleStatus.STALE: yield StaleCause( key, @@ -532,9 +536,10 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # Currently we exclude assets downstream of AllPartitionMappings from stale # status logic due to potentially huge numbers of dependencies. elif self._is_dep_updated(provenance, dep_key): - report_data_version = self.asset_graph.get_code_version( - dep_key.asset_key - ) is not None or self._is_current_data_version_user_provided(key=dep_key) + report_data_version = ( + dep_asset.code_version is not None + or self._is_current_data_version_user_provided(key=dep_key) + ) yield StaleCause( key, StaleCauseCategory.DATA, @@ -563,7 +568,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # timestamps instead of versions this should be removable eventually since # provenance is on all newer materializations. If dep is a source, then we'll never # provide a stale reason here. - elif not self.asset_graph.is_external(dep_key.asset_key): + elif not dep_asset.is_external: dep_materialization = self._get_latest_data_version_record(key=dep_key) if dep_materialization is None: # The input must be new if it has no materialization @@ -622,7 +627,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi # Currently we can only use asset records, which are fetched in one shot, for non-source # assets. This is because the most recent AssetObservation is not stored on the AssetRecord. record = self._get_latest_data_version_record(key=key) - if self.asset_graph.is_external(key.asset_key) and record is None: + if self.asset_graph.get(key.asset_key).is_external and record is None: return DEFAULT_DATA_VERSION elif record is None: return NULL_DATA_VERSION @@ -632,7 +637,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi @cached_method def _is_current_data_version_user_provided(self, *, key: "AssetKeyPartitionKey") -> bool: - if self.asset_graph.is_external(key.asset_key): + if self.asset_graph.get(key.asset_key).is_external: return True else: provenance = self._get_current_data_provenance(key=key) @@ -654,10 +659,11 @@ def _get_current_data_provenance( # are at the root of the graph (have no dependencies) or are downstream of a volatile asset. @cached_method def _is_volatile(self, *, key: "AssetKey") -> bool: - if self.asset_graph.is_external(key): - return self.asset_graph.is_observable(key) + asset = self.asset_graph.get(key) + if asset.is_external: + return asset.is_observable else: - deps = self.asset_graph.get_parents(key) + deps = asset.get_parents(key) return len(deps) == 0 or any(self._is_volatile(key=dep_key) for dep_key in deps) @cached_method @@ -678,10 +684,10 @@ def _get_latest_data_version_record( # If an asset record is cached, all of its ancestors have already been cached. if ( key.partition_key is None - and not self.asset_graph.is_external(key.asset_key) + and not self.asset_graph.get(key.asset_key).is_external and not self.instance_queryer.has_cached_asset_record(key.asset_key) ): - ancestors = self.asset_graph.get_ancestors(key.asset_key, include_self=True) + ancestors = self.asset_graph.get_ancestor_asset_keys(key.asset_key, include_self=True) self.instance_queryer.prefetch_asset_records(ancestors) return self.instance_queryer.get_latest_materialization_or_observation_record( asset_partition=key diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index d52b1a3f82b1e..f81085159b1aa 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -208,7 +208,7 @@ def freshness_evaluation_results_for_asset_key( execution_period, evaluation_data, ) = get_execution_period_and_evaluation_data_for_policies( - local_policy=context.asset_graph.get_freshness_policy(asset_key), + local_policy=context.asset_graph.get(asset_key).freshness_policy, policies=context.asset_graph.get_downstream_freshness_policies(asset_key=asset_key), effective_data_time=effective_data_time, current_time=current_time, diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py index dce9d9ecb50f0..167b0275337ca 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py @@ -258,7 +258,7 @@ def _wrapped_fn(context: SensorEvaluationContext): minutes_late_by_key: Dict[AssetKey, Optional[float]] = {} for asset_key in monitored_keys: - freshness_policy = asset_graph.get_freshness_policy(asset_key) + freshness_policy = asset_graph.get(asset_key).freshness_policy if freshness_policy is None: continue diff --git a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py index 44903b410232c..3f13ea01caa61 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py @@ -259,7 +259,7 @@ def __init__( asset_graph = self._repository_def.asset_graph for asset_key in self._monitored_asset_keys: assets_def = ( - asset_graph.get_assets_def(asset_key) if asset_graph.has_asset(asset_key) else None + asset_graph.get(asset_key).assets_def if asset_graph.has(asset_key) else None ) self._assets_by_key[asset_key] = assets_def @@ -683,8 +683,8 @@ def _get_asset(self, asset_key: AssetKey, fn_name: str) -> AssetsDefinition: ) else: return asset_def - elif repo_def.asset_graph.has_asset(asset_key): - return repo_def.asset_graph.get_assets_def(asset_key) + elif repo_def.asset_graph.has(asset_key): + return repo_def.asset_graph.get(asset_key).assets_def else: raise DagsterInvalidInvocationError( f"Asset key {asset_key} not monitored in sensor and does not exist in target jobs" diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index b6407a9c10ce6..89e787e6bd3d3 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1255,7 +1255,7 @@ def execute_asset_backfill_iteration_inner( # check if all assets have backfill policies if any of them do, otherwise, raise error asset_backfill_policies = [ - asset_graph.get_backfill_policy(asset_key) + asset_graph.get(asset_key).backfill_policy for asset_key in { asset_partition.asset_key for asset_partition in asset_partitions_to_request } @@ -1318,9 +1318,9 @@ def can_run_with_parent( this tick. """ parent_target_subset = target_subset.get_asset_subset(parent.asset_key, asset_graph) - parent_backfill_policy = asset_graph.get_backfill_policy(parent.asset_key) + parent_backfill_policy = asset_graph.get(parent.asset_key).backfill_policy candidate_target_subset = target_subset.get_asset_subset(candidate.asset_key, asset_graph) - candidate_backfill_policy = asset_graph.get_backfill_policy(candidate.asset_key) + candidate_backfill_policy = asset_graph.get(candidate.asset_key).backfill_policy partition_mapping = asset_graph.get_partition_mapping( candidate.asset_key, in_asset_key=parent.asset_key ) diff --git a/python_modules/dagster/dagster/_core/host_representation/external.py b/python_modules/dagster/dagster/_core/host_representation/external.py index a6418b4ef704d..be41ef0208b7c 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external.py +++ b/python_modules/dagster/dagster/_core/host_representation/external.py @@ -212,15 +212,14 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]: default_sensor_asset_keys = set() for asset_key in asset_graph.materializable_asset_keys: - policy = asset_graph.get_auto_materialize_policy(asset_key) - if not policy: + if not asset_graph.get(asset_key).auto_materialize_policy: continue if asset_key not in covered_asset_keys: default_sensor_asset_keys.add(asset_key) for asset_key in asset_graph.observable_asset_keys: - if asset_graph.get_auto_observe_interval_minutes(asset_key) is None: + if asset_graph.get(asset_key).auto_observe_interval_minutes is None: continue has_any_auto_observe_source_assets = True diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 521fa2ad0c78d..f501d379f4f6b 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -683,14 +683,14 @@ def _process_auto_materialize_tick_generator( auto_materialize_asset_keys = { target_key for target_key in eligible_keys - if asset_graph.get_auto_materialize_policy(target_key) is not None + if asset_graph.get(target_key).auto_materialize_policy is not None } num_target_assets = len(auto_materialize_asset_keys) auto_observe_asset_keys = { key for key in eligible_keys - if asset_graph.get_auto_observe_interval_minutes(key) is not None + if asset_graph.get(key).auto_observe_interval_minutes is not None } num_auto_observe_assets = len(auto_observe_asset_keys) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index cc5b4def5293f..bca552deab7f0 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -185,7 +185,7 @@ def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: return self._asset_record_cache[asset_key] def _event_type_for_key(self, asset_key: AssetKey) -> DagsterEventType: - if self.asset_graph.is_observable(asset_key): + if self.asset_graph.get(asset_key).is_observable: return DagsterEventType.ASSET_OBSERVATION else: return DagsterEventType.ASSET_MATERIALIZATION @@ -203,8 +203,8 @@ def _get_latest_materialization_or_observation_record( before_cursor is None and asset_partition.partition_key is None and not ( - self.asset_graph.has_asset(asset_partition.asset_key) - and self.asset_graph.is_observable(asset_partition.asset_key) + self.asset_graph.has(asset_partition.asset_key) + and self.asset_graph.get(asset_partition.asset_key).is_observable ) ): asset_record = self.get_asset_record(asset_partition.asset_key) @@ -286,7 +286,7 @@ def asset_partition_has_materialization_or_observation( greater than this value will be considered. """ asset_key = asset_partition.asset_key - if self.asset_graph.has_asset(asset_key) and self.asset_graph.is_materializable(asset_key): + if self.asset_graph.has(asset_key) and self.asset_graph.get(asset_key).is_materializable: asset_record = self.get_asset_record(asset_key) if ( asset_record is None @@ -540,11 +540,11 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( """Finds asset partitions of the given child whose parents have been materialized since latest_storage_id. """ - if not self.asset_graph.get_parents(child_asset_key): + child_asset = self.asset_graph.get(child_asset_key) + if not child_asset.parent_keys: return set(), latest_storage_id - child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key) - child_time_partitions_def = get_time_partitions_def(child_partitions_def) + child_time_partitions_def = get_time_partitions_def(child_asset.partitions_def) child_asset_partitions_with_updated_parents = set() @@ -555,7 +555,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( ] for parent_asset_key in self.asset_graph.get_parents(child_asset_key): # ignore non-existent parents - if not self.asset_graph.has_asset(parent_asset_key): + if not self.asset_graph.has(parent_asset_key): continue # if the parent has not been updated at all since the latest_storage_id, then skip @@ -589,7 +589,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( child_asset_key=child_asset_key, current_time=self.evaluation_time, ) - if child_partitions_def + if child_asset.partitions_def else [None] ): if not ( @@ -597,7 +597,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( # historical time partitions not map_old_time_partitions and child_time_partitions_def is not None - and get_time_partition_key(child_partitions_def, child_partition_key) + and get_time_partition_key(child_asset.partitions_def, child_partition_key) != child_time_partitions_def.get_last_partition_key( current_time=self.evaluation_time ) @@ -610,7 +610,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( else: # we know a parent updated, and because the parent has a partitions def and the # child does not, the child could not have been materialized in the same run - if child_partitions_def is None: + if child_asset.partitions_def is None: child_asset_partitions_with_updated_parents = { AssetKeyPartitionKey(child_asset_key) } @@ -629,7 +629,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( partition_mapping.get_downstream_partitions_for_partitions( parent_partitions_subset, upstream_partitions_def=parent_partitions_def, - downstream_partitions_def=child_partitions_def, + downstream_partitions_def=child_asset.partitions_def, dynamic_partitions_store=self, current_time=self.evaluation_time, ) @@ -647,7 +647,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( if ( # if child has a different partitions def than the parent, then it must # have been executed in a different run, so it's a valid candidate - child_partitions_def != parent_partitions_def + child_asset.partitions_def != parent_partitions_def # if child partition key is not the same as any newly materialized # parent key, then it could not have been executed in the same run as # its parent @@ -807,7 +807,7 @@ def get_asset_partitions_updated_after_cursor( if not updated_after_cursor: return set() if after_cursor is None or ( - not self.asset_graph.is_observable(asset_key) + not self.asset_graph.get(asset_key).is_observable and not respect_materialization_data_versions ): return updated_after_cursor @@ -902,7 +902,7 @@ def get_parent_asset_partitions_updated_after_child( continue # ignore non-existent parents - if not self.asset_graph.has_asset(parent_key): + if not self.asset_graph.has(parent_key): continue # when mapping from unpartitioned assets to time partitioned assets, we ignore @@ -949,7 +949,7 @@ def get_outdated_ancestors( asset_key = asset_partition.asset_key partition_key = asset_partition.partition_key if not ( - self.asset_graph.has_asset(asset_key) and self.asset_graph.is_materializable(asset_key) + self.asset_graph.has(asset_key) and self.asset_graph.get(asset_key).is_materializable ): return set() diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index c28744a1175ab..60ae61e411659 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -93,8 +93,8 @@ def asset3(asset1, asset2): ... assert asset_graph.get_parents(asset3.key) == {asset1.key, asset2.key} for asset_def in assets: assert asset_graph.get_execution_set_asset_keys(asset_def.key) == {asset_def.key} - assert asset_graph.get_code_version(asset0.key) == "1" - assert asset_graph.get_code_version(asset1.key) is None + assert asset_graph.get(asset0.key).code_version == "1" + assert asset_graph.get(asset1.key).code_version is None def test_get_children_partitions_unpartitioned_parent_partitioned_child( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py index 71d67f3455947..445172ef9aedb 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py @@ -189,8 +189,8 @@ def test_cross_repo_dep_with_source_asset(instance): _make_context(instance, ["defs1", "downstream_defs"]) ) assert len(asset_graph.external_asset_keys) == 0 - assert asset_graph.get_parents(AssetKey("downstream")) == {AssetKey("asset1")} - assert asset_graph.get_children(AssetKey("asset1")) == {AssetKey("downstream")} + assert asset_graph.get(AssetKey("downstream")).parent_keys == {AssetKey("asset1")} + assert asset_graph.get(AssetKey("asset1")).child_keys == {AssetKey("downstream")} assert ( asset_graph.get_repository_handle( AssetKey("asset1") @@ -212,8 +212,8 @@ def test_cross_repo_dep_no_source_asset(instance): _make_context(instance, ["defs1", "downstream_defs_no_source"]) ) assert len(asset_graph.external_asset_keys) == 0 - assert asset_graph.get_parents(AssetKey("downstream_non_arg_dep")) == {AssetKey("asset1")} - assert asset_graph.get_children(AssetKey("asset1")) == {AssetKey("downstream_non_arg_dep")} + assert asset_graph.get(AssetKey("downstream_non_arg_dep")).parent_keys == {AssetKey("asset1")} + assert asset_graph.get(AssetKey("asset1")).child_keys == {AssetKey("downstream_non_arg_dep")} assert ( asset_graph.get_repository_handle( AssetKey("asset1") @@ -324,9 +324,9 @@ def test_get_implicit_job_name_for_assets(instance): def test_auto_materialize_policy(instance): asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["partitioned_defs"])) - assert asset_graph.get_auto_materialize_policy( + assert asset_graph.get( AssetKey("downstream_of_partitioned_source") - ) == AutoMaterializePolicy.eager( + ).auto_materialize_policy == AutoMaterializePolicy.eager( max_materializations_per_minute=75, ) @@ -400,16 +400,16 @@ def static_partitioned_multi_run_backfill_asset(): def test_assets_with_backfill_policies(instance): asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["backfill_assets_defs"])) assert ( - asset_graph.get_backfill_policy(AssetKey("static_partitioned_single_run_backfill_asset")) + asset_graph.get(AssetKey("static_partitioned_single_run_backfill_asset")).backfill_policy == BackfillPolicy.single_run() ) assert ( - asset_graph.get_backfill_policy(AssetKey("non_partitioned_single_run_backfill_asset")) + asset_graph.get(AssetKey("non_partitioned_single_run_backfill_asset")).backfill_policy == BackfillPolicy.single_run() ) - assert asset_graph.get_backfill_policy( + assert asset_graph.get( AssetKey("static_partitioned_multi_run_backfill_asset") - ) == BackfillPolicy.multi_run(5) + ).backfill_policy == BackfillPolicy.multi_run(5) @asset(deps=[SourceAsset("b")]) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py index 198fecc2b303c..190832ec08efb 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py @@ -333,7 +333,7 @@ def observe_sources(*args): def observe_sources_fn(*, instance, times_by_key, **kwargs): for arg in args: key = AssetKey(arg) - observe(assets=[versioned_repo.asset_graph.get_assets_def(key)], instance=instance) + observe(assets=[versioned_repo.asset_graph.get(key).assets_def], instance=instance) latest_record = instance.get_latest_data_version_record(key, is_source=True) latest_timestamp = latest_record.timestamp times_by_key[key].append( @@ -345,7 +345,7 @@ def observe_sources_fn(*, instance, times_by_key, **kwargs): def run_assets(*args): def run_assets_fn(*, instance, **kwargs): - assets = [versioned_repo.asset_graph.get_assets_def(AssetKey(arg)) for arg in args] + assets = [versioned_repo.asset_graph.get(AssetKey(arg)).assets_def for arg in args] materialize_to_memory(assets=assets, instance=instance) return run_assets_fn diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index abe0fd48ab5e6..15b2c73a0aa89 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -410,7 +410,7 @@ def test_time_fn(): auto_observe_asset_keys={ key for key in asset_graph.observable_asset_keys - if asset_graph.get_auto_observe_interval_minutes(key) is not None + if asset_graph.get(key).auto_observe_interval_minutes is not None }, respect_materialization_data_versions=respect_materialization_data_versions, logger=logging.getLogger("dagster.amp"), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py index 53edf69c7b5e3..6acc474dc1e09 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py @@ -156,7 +156,7 @@ def _evaluate_tick_fast( auto_materialize_asset_keys={ key for key in self.asset_graph.materializable_asset_keys - if self.asset_graph.get_auto_materialize_policy(key) is not None + if self.asset_graph.get(key).auto_materialize_policy is not None }, instance=self.instance, materialize_run_tags={}, @@ -165,7 +165,7 @@ def _evaluate_tick_fast( auto_observe_asset_keys={ key for key in self.asset_graph.external_asset_keys - if self.asset_graph.get_auto_observe_interval_minutes(key) is not None + if self.asset_graph.get(key).auto_observe_interval_minutes is not None }, respect_materialization_data_versions=False, logger=self.logger,