From 991eef71e42258ded07c98bfd06190d2ff7539ad Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 28 Feb 2024 12:40:11 -0500 Subject: [PATCH] [external-assets] Update AssetGraph accessor callsites to fetch nodes [INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes] --- .../dagster/_core/definitions/asset_graph.py | 34 +--------------- .../_core/definitions/asset_graph_differ.py | 7 ++-- .../definitions/auto_materialize_rule.py | 4 +- .../dagster/_core/definitions/data_time.py | 13 +++--- .../dagster/_core/definitions/data_version.py | 40 +++++++++++-------- .../freshness_based_auto_materialize.py | 2 +- .../freshness_policy_sensor_definition.py | 2 +- .../dagster/_core/execution/asset_backfill.py | 2 +- .../_core/host_representation/external.py | 5 +-- .../dagster/dagster/_daemon/asset_daemon.py | 4 +- .../_utils/caching_instance_queryer.py | 33 ++++++++------- .../asset_defs_tests/test_asset_graph.py | 4 +- .../test_external_asset_graph.py | 12 +++--- .../asset_daemon_scenario.py | 4 +- .../auto_materialize_tests/base_scenario.py | 2 +- 15 files changed, 71 insertions(+), 97 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 829af7eca94d3..ddb768bdd1203 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -248,33 +248,21 @@ def all_asset_keys(self) -> AbstractSet[AssetKey]: def materializable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_materializable} - def is_materializable(self, key: AssetKey) -> bool: - return self.get(key).is_materializable - @property @cached_method def observable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_observable} - def is_observable(self, key: AssetKey) -> bool: - return self.get(key).is_observable - @property @cached_method def external_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_external} - def is_external(self, key: AssetKey) -> bool: - return self.get(key).is_external - @property @cached_method def executable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_executable} - def is_executable(self, key: AssetKey) -> bool: - return self.get(key).is_executable - @property @cached_method def toposorted_asset_keys(self) -> Sequence[AssetKey]: @@ -353,24 +341,6 @@ def get_partitions_in_range( def is_partitioned(self, asset_key: AssetKey) -> bool: return self.get_partitions_def(asset_key) is not None - def get_group_name(self, asset_key: AssetKey) -> Optional[str]: - return self.get(asset_key).group_name - - def get_freshness_policy(self, asset_key: AssetKey) -> Optional[FreshnessPolicy]: - return self.get(asset_key).freshness_policy - - def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - return self.get(asset_key).auto_materialize_policy - - def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: - return self.get(asset_key).auto_observe_interval_minutes - - def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: - return self.get(asset_key).backfill_policy - - def get_code_version(self, asset_key: AssetKey) -> Optional[str]: - return self.get(asset_key).code_version - def have_same_partitioning(self, asset_key1: AssetKey, asset_key2: AssetKey) -> bool: """Returns whether the given assets have the same partitions definition.""" return self.get(asset_key1).partitions_def == self.get(asset_key2).partitions_def @@ -653,7 +623,7 @@ def has_materializable_parents(self, asset_key: AssetKey) -> bool: ) def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """Returns all assets upstream of the given asset which do not consume any other + """Returns all assets upstream of the giget which do not consume any other materializable assets. """ if not self.has_materializable_parents(asset_key): @@ -667,7 +637,7 @@ def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey] } def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]: - """Iterates through all asset keys which are upstream of the given key.""" + """Iterates tgetl asset keys which are upstream of the given key.""" visited: Set[AssetKey] = set() queue = deque([asset_key]) while queue: diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py index 290df4dff5144..12171c3c60e3f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py @@ -122,9 +122,10 @@ def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[Cha return [ChangeReason.NEW] changes = [] - if self.branch_asset_graph.get_code_version( - asset_key - ) != self.base_asset_graph.get_code_version(asset_key): + if ( + self.branch_asset_graph.get(asset_key).code_version + != self.base_asset_graph.get(asset_key).code_version + ): changes.append(ChangeReason.CODE_VERSION) if self.branch_asset_graph.get_parents(asset_key) != self.base_asset_graph.get_parents( diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 3aec46e13fd38..57864eb9f0245 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -759,8 +759,8 @@ def evaluate_for_asset( # ignore missing or unexecutable assets, which will never have a materialization or # observation if not ( - context.asset_graph.has_asset(parent.asset_key) - and context.asset_graph.is_executable(parent.asset_key) + context.asset_graph.has(parent.asset_key) + and context.asset_graph.get(parent.asset_key).is_executable ): continue if not context.instance_queryer.asset_partition_has_materialization_or_observation( diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index 3530e4214b8f9..ee82b07cfa93a 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -177,8 +177,7 @@ def _upstream_records_by_key( for parent_key in self.asset_graph.get_parents(asset_key): if not ( - self.asset_graph.has_asset(parent_key) - and self.asset_graph.is_executable(parent_key) + self.asset_graph.has(parent_key) and self.asset_graph.get(parent_key).is_executable ): continue @@ -319,7 +318,7 @@ def _calculate_data_time_by_key( cursor=record_id, partitions_def=partitions_def, ) - elif self.asset_graph.is_observable(asset_key): + elif self.asset_graph.get(asset_key).is_observable: return self._calculate_data_time_by_key_observable_source( asset_key=asset_key, record_id=record_id, @@ -533,18 +532,18 @@ def get_minutes_overdue( asset_key: AssetKey, evaluation_time: datetime.datetime, ) -> Optional[FreshnessMinutes]: - freshness_policy = self.asset_graph.get_freshness_policy(asset_key) - if freshness_policy is None: + asset = self.asset_graph.get(asset_key) + if asset.freshness_policy is None: raise DagsterInvariantViolationError( "Cannot calculate minutes late for asset without a FreshnessPolicy" ) - if self.asset_graph.is_external(asset_key): + if asset.is_external: current_data_time = self._get_source_data_time(asset_key, current_time=evaluation_time) else: current_data_time = self.get_current_data_time(asset_key, current_time=evaluation_time) - return freshness_policy.minutes_overdue( + return asset.freshness_policy.minutes_overdue( data_time=current_data_time, evaluation_time=evaluation_time, ) diff --git a/python_modules/dagster/dagster/_core/definitions/data_version.py b/python_modules/dagster/dagster/_core/definitions/data_version.py index 9ed11a4d87a3c..e4e905ff529a6 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_version.py +++ b/python_modules/dagster/dagster/_core/definitions/data_version.py @@ -423,13 +423,14 @@ def get_current_data_version( def _get_status(self, key: "AssetKeyPartitionKey") -> StaleStatus: # The status loader does not support querying for the stale status of a # partitioned asset without specifying a partition, so we return here. - if self.asset_graph.is_partitioned(key.asset_key) and not key.partition_key: + asset = self.asset_graph.get(key.asset_key) + if asset.is_partitioned and not key.partition_key: return StaleStatus.FRESH else: current_version = self._get_current_data_version(key=key) if current_version == NULL_DATA_VERSION: return StaleStatus.MISSING - elif self.asset_graph.is_external(key.asset_key): + elif asset.is_external: return StaleStatus.FRESH else: causes = self._get_stale_causes(key=key) @@ -440,9 +441,10 @@ def _get_stale_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause] # Querying for the stale status of a partitioned asset without specifying a partition key # is strictly speaking undefined, but we return an empty list here (from which FRESH status # is inferred) for backcompat. - if self.asset_graph.is_partitioned(key.asset_key) and not key.partition_key: + asset = self.asset_graph.get(key.asset_key) + if asset.is_partitioned and not key.partition_key: return [] - elif self.asset_graph.is_external(key.asset_key): + elif asset.is_external: return [] else: current_version = self._get_current_data_version(key=key) @@ -454,6 +456,7 @@ def _get_stale_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause] ) def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitionKey") -> bool: + dep_asset = self.asset_graph.get(dep_key.asset_key) if dep_key.partition_key is None: current_data_version = self._get_current_data_version(key=dep_key) return provenance.input_data_versions[dep_key.asset_key] != current_data_version @@ -461,14 +464,14 @@ def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitio cursor = provenance.input_storage_ids[dep_key.asset_key] updated_record = self._instance.get_latest_data_version_record( dep_key.asset_key, - self.asset_graph.is_external(dep_key.asset_key), + dep_asset.is_external, dep_key.partition_key, after_cursor=cursor, ) if updated_record: previous_record = self._instance.get_latest_data_version_record( dep_key.asset_key, - self.asset_graph.is_external(dep_key.asset_key), + dep_asset.is_external, dep_key.partition_key, before_cursor=cursor + 1 if cursor else None, ) @@ -485,7 +488,7 @@ def _is_dep_updated(self, provenance: DataProvenance, dep_key: "AssetKeyPartitio def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterator[StaleCause]: from dagster._core.definitions.events import AssetKeyPartitionKey - code_version = self.asset_graph.get_code_version(key.asset_key) + code_version = self.asset_graph.get(key.asset_key).code_version provenance = self._get_current_data_provenance(key=key) asset_deps = self.asset_graph.get_parents(key.asset_key) @@ -513,6 +516,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # partition counts. partition_deps = self._get_partition_dependencies(key=key) for dep_key in sorted(partition_deps): + dep_asset = self.asset_graph.get(dep_key.asset_key) if self._get_status(key=dep_key) == StaleStatus.STALE: yield StaleCause( key, @@ -532,9 +536,10 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # Currently we exclude assets downstream of AllPartitionMappings from stale # status logic due to potentially huge numbers of dependencies. elif self._is_dep_updated(provenance, dep_key): - report_data_version = self.asset_graph.get_code_version( - dep_key.asset_key - ) is not None or self._is_current_data_version_user_provided(key=dep_key) + report_data_version = ( + dep_asset.code_version is not None + or self._is_current_data_version_user_provided(key=dep_key) + ) yield StaleCause( key, StaleCauseCategory.DATA, @@ -563,7 +568,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato # timestamps instead of versions this should be removable eventually since # provenance is on all newer materializations. If dep is a source, then we'll never # provide a stale reason here. - elif not self.asset_graph.is_external(dep_key.asset_key): + elif not dep_asset.is_external: dep_materialization = self._get_latest_data_version_record(key=dep_key) if dep_materialization is None: # The input must be new if it has no materialization @@ -622,7 +627,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi # Currently we can only use asset records, which are fetched in one shot, for non-source # assets. This is because the most recent AssetObservation is not stored on the AssetRecord. record = self._get_latest_data_version_record(key=key) - if self.asset_graph.is_external(key.asset_key) and record is None: + if self.asset_graph.get(key.asset_key).is_external and record is None: return DEFAULT_DATA_VERSION elif record is None: return NULL_DATA_VERSION @@ -632,7 +637,7 @@ def _get_current_data_version(self, *, key: "AssetKeyPartitionKey") -> DataVersi @cached_method def _is_current_data_version_user_provided(self, *, key: "AssetKeyPartitionKey") -> bool: - if self.asset_graph.is_external(key.asset_key): + if self.asset_graph.get(key.asset_key).is_external: return True else: provenance = self._get_current_data_provenance(key=key) @@ -654,10 +659,11 @@ def _get_current_data_provenance( # are at the root of the graph (have no dependencies) or are downstream of a volatile asset. @cached_method def _is_volatile(self, *, key: "AssetKey") -> bool: - if self.asset_graph.is_external(key): - return self.asset_graph.is_observable(key) + asset = self.asset_graph.get(key) + if asset.is_external: + return asset.is_observable else: - deps = self.asset_graph.get_parents(key) + deps = asset.get_parents(key) return len(deps) == 0 or any(self._is_volatile(key=dep_key) for dep_key in deps) @cached_method @@ -678,7 +684,7 @@ def _get_latest_data_version_record( # If an asset record is cached, all of its ancestors have already been cached. if ( key.partition_key is None - and not self.asset_graph.is_external(key.asset_key) + and not self.asset_graph.get(key.asset_key).is_external and not self.instance_queryer.has_cached_asset_record(key.asset_key) ): ancestors = self.asset_graph.get_ancestors(key.asset_key, include_self=True) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index bdfb23296e0e7..f961b7659d821 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -207,7 +207,7 @@ def freshness_evaluation_results_for_asset_key( execution_period, evaluation_data, ) = get_execution_period_and_evaluation_data_for_policies( - local_policy=context.asset_graph.get_freshness_policy(asset_key), + local_policy=context.asset_graph.get(asset_key).freshness_policy, policies=context.asset_graph.get_downstream_freshness_policies(asset_key=asset_key), effective_data_time=effective_data_time, current_time=current_time, diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py index dce9d9ecb50f0..167b0275337ca 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_policy_sensor_definition.py @@ -258,7 +258,7 @@ def _wrapped_fn(context: SensorEvaluationContext): minutes_late_by_key: Dict[AssetKey, Optional[float]] = {} for asset_key in monitored_keys: - freshness_policy = asset_graph.get_freshness_policy(asset_key) + freshness_policy = asset_graph.get(asset_key).freshness_policy if freshness_policy is None: continue diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 810286c35adf3..580e186f6360f 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1255,7 +1255,7 @@ def execute_asset_backfill_iteration_inner( # check if all assets have backfill policies if any of them do, otherwise, raise error asset_backfill_policies = [ - asset_graph.get_backfill_policy(asset_key) + asset_graph.get_asset(asset_key).backfill_policy for asset_key in { asset_partition.asset_key for asset_partition in asset_partitions_to_request } diff --git a/python_modules/dagster/dagster/_core/host_representation/external.py b/python_modules/dagster/dagster/_core/host_representation/external.py index 16b93a8e49822..45e1a39d219d2 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external.py +++ b/python_modules/dagster/dagster/_core/host_representation/external.py @@ -212,15 +212,14 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]: default_sensor_asset_keys = set() for asset_key in asset_graph.materializable_asset_keys: - policy = asset_graph.get_auto_materialize_policy(asset_key) - if not policy: + if not asset_graph.get(asset_key).auto_materialize_policy: continue if asset_key not in covered_asset_keys: default_sensor_asset_keys.add(asset_key) for asset_key in asset_graph.observable_asset_keys: - if asset_graph.get_auto_observe_interval_minutes(asset_key) is None: + if asset_graph.get(asset_key).auto_observe_interval_minutes is None: continue has_any_auto_observe_source_assets = True diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index ef9fccd5ff4d6..d9b95d6ee8d22 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -682,14 +682,14 @@ def _process_auto_materialize_tick_generator( auto_materialize_asset_keys = { target_key for target_key in eligible_keys - if asset_graph.get_auto_materialize_policy(target_key) is not None + if asset_graph.get(target_key).auto_materialize_policy is not None } num_target_assets = len(auto_materialize_asset_keys) auto_observe_asset_keys = { key for key in eligible_keys - if asset_graph.get_auto_observe_interval_minutes(key) is not None + if asset_graph.get(key).auto_observe_interval_minutes is not None } num_auto_observe_assets = len(auto_observe_asset_keys) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index f45654cc1ec09..c2106b05844c0 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -185,7 +185,7 @@ def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: return self._asset_record_cache[asset_key] def _event_type_for_key(self, asset_key: AssetKey) -> DagsterEventType: - if self.asset_graph.is_external(asset_key): + if self.asset_graph.get(asset_key).is_external: return DagsterEventType.ASSET_OBSERVATION else: return DagsterEventType.ASSET_MATERIALIZATION @@ -204,7 +204,7 @@ def _get_latest_materialization_or_observation_record( and asset_partition.partition_key is None and not ( self.asset_graph.has(asset_partition.asset_key) - and self.asset_graph.is_observable(asset_partition.asset_key) + and self.asset_graph.get(asset_partition.asset_key).is_observable ) ): asset_record = self.get_asset_record(asset_partition.asset_key) @@ -286,7 +286,7 @@ def asset_partition_has_materialization_or_observation( greater than this value will be considered. """ asset_key = asset_partition.asset_key - if self.asset_graph.has_asset(asset_key) and self.asset_graph.is_materializable(asset_key): + if self.asset_graph.has(asset_key) and self.asset_graph.get(asset_key).is_materializable: asset_record = self.get_asset_record(asset_key) if ( asset_record is None @@ -535,11 +535,11 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( """Finds asset partitions of the given child whose parents have been materialized since latest_storage_id. """ - if self.asset_graph.is_external(child_asset_key): + child_asset = self.asset_graph.get(child_asset_key) + if child_asset.is_external: return set(), latest_storage_id - child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key) - child_time_partitions_def = get_time_partitions_def(child_partitions_def) + child_time_partitions_def = get_time_partitions_def(child_asset.partitions_def) child_asset_partitions_with_updated_parents = set() @@ -551,8 +551,8 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( for parent_asset_key in self.asset_graph.get_parents(child_asset_key): # ignore missing and non-executable parents if not ( - self.asset_graph.has_asset(parent_asset_key) - and self.asset_graph.is_executable(parent_asset_key) + self.asset_graph.has(parent_asset_key) + and self.asset_graph.get(parent_asset_key).is_executable ): continue @@ -587,7 +587,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( child_asset_key=child_asset_key, current_time=self.evaluation_time, ) - if child_partitions_def + if child_asset.partitions_def else [None] ): if not ( @@ -595,7 +595,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( # historical time partitions not map_old_time_partitions and child_time_partitions_def is not None - and get_time_partition_key(child_partitions_def, child_partition_key) + and get_time_partition_key(child_asset.partitions_def, child_partition_key) != child_time_partitions_def.get_last_partition_key( current_time=self.evaluation_time ) @@ -608,7 +608,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( else: # we know a parent updated, and because the parent has a partitions def and the # child does not, the child could not have been materialized in the same run - if child_partitions_def is None: + if child_asset.partitions_def is None: child_asset_partitions_with_updated_parents = { AssetKeyPartitionKey(child_asset_key) } @@ -627,7 +627,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( partition_mapping.get_downstream_partitions_for_partitions( parent_partitions_subset, upstream_partitions_def=parent_partitions_def, - downstream_partitions_def=child_partitions_def, + downstream_partitions_def=child_asset.partitions_def, dynamic_partitions_store=self, current_time=self.evaluation_time, ) @@ -645,7 +645,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( if ( # if child has a different partitions def than the parent, then it must # have been executed in a different run, so it's a valid candidate - child_partitions_def != parent_partitions_def + child_asset.partitions_def != parent_partitions_def # if child partition key is not the same as any newly materialized # parent key, then it could not have been executed in the same run as # its parent @@ -805,7 +805,7 @@ def get_asset_partitions_updated_after_cursor( if not updated_after_cursor: return set() if after_cursor is None or ( - not self.asset_graph.is_external(asset_key) + not self.asset_graph.get(asset_key).is_external and not respect_materialization_data_versions ): return updated_after_cursor @@ -901,8 +901,7 @@ def get_parent_asset_partitions_updated_after_child( # ignore non-existent or unexecutable parents if not ( - self.asset_graph.has_asset(parent_key) - and self.asset_graph.is_executable(parent_key) + self.asset_graph.has(parent_key) and self.asset_graph.get(parent_key).is_executable ): continue @@ -950,7 +949,7 @@ def get_outdated_ancestors( asset_key = asset_partition.asset_key partition_key = asset_partition.partition_key if not ( - self.asset_graph.has_asset(asset_key) and self.asset_graph.is_materializable(asset_key) + self.asset_graph.has(asset_key) and self.asset_graph.get(asset_key).is_materializable ): return set() diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index c68a2b9d20b31..85ef20fc71336 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -97,8 +97,8 @@ def asset3(asset1, asset2): assert asset_graph.get_parents(asset3.key) == {asset1.key, asset2.key} for asset_def in assets: assert asset_graph.get_execution_unit_asset_keys(asset_def.key) == {asset_def.key} - assert asset_graph.get_code_version(asset0.key) == "1" - assert asset_graph.get_code_version(asset1.key) is None + assert asset_graph.get_asset(asset0.key).code_version == "1" + assert asset_graph.get_asset(asset1.key).code_version is None def test_get_children_partitions_unpartitioned_parent_partitioned_child( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py index c8e5bbea4a6a0..12d2741ec026a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py @@ -327,9 +327,9 @@ def test_get_implicit_job_name_for_assets(instance): def test_auto_materialize_policy(instance): asset_graph = ExternalAssetGraph.from_workspace(_make_context(instance, ["partitioned_defs"])) - assert asset_graph.get_auto_materialize_policy( + assert asset_graph.get( AssetKey("downstream_of_partitioned_source") - ) == AutoMaterializePolicy.eager( + ).auto_materialize_policy == AutoMaterializePolicy.eager( max_materializations_per_minute=75, ) @@ -405,16 +405,16 @@ def test_assets_with_backfill_policies(instance): _make_context(instance, ["backfill_assets_defs"]) ) assert ( - asset_graph.get_backfill_policy(AssetKey("static_partitioned_single_run_backfill_asset")) + asset_graph.get(AssetKey("static_partitioned_single_run_backfill_asset")).backfill_policy == BackfillPolicy.single_run() ) assert ( - asset_graph.get_backfill_policy(AssetKey("non_partitioned_single_run_backfill_asset")) + asset_graph.get(AssetKey("non_partitioned_single_run_backfill_asset")).backfill_policy == BackfillPolicy.single_run() ) - assert asset_graph.get_backfill_policy( + assert asset_graph.get( AssetKey("static_partitioned_multi_run_backfill_asset") - ) == BackfillPolicy.multi_run(5) + ).backfill_policy == BackfillPolicy.multi_run(5) @asset(deps=[SourceAsset("b")]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py index 8d5cf5bb029ac..3eb2df14cbc5f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py @@ -438,7 +438,7 @@ def _evaluate_tick_fast( auto_materialize_asset_keys={ key for key in self.asset_graph.materializable_asset_keys - if self.asset_graph.get_auto_materialize_policy(key) is not None + if self.asset_graph.get(key).auto_materialize_policy is not None }, instance=self.instance, materialize_run_tags={}, @@ -447,7 +447,7 @@ def _evaluate_tick_fast( auto_observe_asset_keys={ key for key in self.asset_graph.external_asset_keys - if self.asset_graph.get_auto_observe_interval_minutes(key) is not None + if self.asset_graph.get(key).auto_observe_interval_minutes is not None }, respect_materialization_data_versions=False, logger=self.logger, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 4d0e8f5c787f8..940eb30914c17 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -410,7 +410,7 @@ def test_time_fn(): auto_observe_asset_keys={ key for key in asset_graph.observable_asset_keys - if asset_graph.get_auto_observe_interval_minutes(key) is not None + if asset_graph.get(key).auto_observe_interval_minutes is not None }, respect_materialization_data_versions=respect_materialization_data_versions, logger=logging.getLogger("dagster.amp"),