From e3d4c42354b7195af166b904897f70269e356cbb Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 1 Mar 2024 14:45:30 -0800 Subject: [PATCH] [auto-materialize] fix external asset + amp (#20204) ## Summary & Motivation ## How I Tested These Changes --- .../dagster/dagster/_core/definitions/data_time.py | 2 +- .../dagster/dagster/_utils/caching_instance_queryer.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index 3530e4214b8f9..a0b3b21f7a11f 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -539,7 +539,7 @@ def get_minutes_overdue( "Cannot calculate minutes late for asset without a FreshnessPolicy" ) - if self.asset_graph.is_external(asset_key): + if self.asset_graph.is_observable(asset_key): current_data_time = self._get_source_data_time(asset_key, current_time=evaluation_time) else: current_data_time = self.get_current_data_time(asset_key, current_time=evaluation_time) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index bfab4cd330364..1434868c1b79b 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -185,7 +185,7 @@ def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: return self._asset_record_cache[asset_key] def _event_type_for_key(self, asset_key: AssetKey) -> DagsterEventType: - if self.asset_graph.is_external(asset_key): + if self.asset_graph.is_observable(asset_key): return DagsterEventType.ASSET_OBSERVATION else: return DagsterEventType.ASSET_MATERIALIZATION @@ -535,7 +535,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( """Finds asset partitions of the given child whose parents have been materialized since latest_storage_id. """ - if self.asset_graph.is_external(child_asset_key): + if not self.asset_graph.get_parents(child_asset_key): return set(), latest_storage_id child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key) @@ -805,7 +805,7 @@ def get_asset_partitions_updated_after_cursor( if not updated_after_cursor: return set() if after_cursor is None or ( - not self.asset_graph.is_external(asset_key) + not self.asset_graph.is_observable(asset_key) and not respect_materialization_data_versions ): return updated_after_cursor