diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 65a15743049ea..7b581459b3c86 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -751,12 +751,15 @@ def _submit_runs_and_update_backfill_in_chunks( yield backfill_data_with_submitted_runs -def _check_no_partitions_def_changes_to_asset( +def _check_target_partitions_subset_is_valid( asset_key: AssetKey, asset_graph: AssetGraph, target_partitions_subset: Optional[PartitionsSubset], instance_queryer: CachingInstanceQueryer, ) -> None: + """Checks for any partitions definition changes since backfill launch that should mark + the backfill as failed. + """ if asset_key not in asset_graph.all_asset_keys: raise DagsterDefinitionChangedDeserializationError( f"Asset {asset_key} existed at storage-time, but no longer does" @@ -771,6 +774,8 @@ def _check_no_partitions_def_changes_to_asset( " does" ) + # If the asset was time-partitioned at storage time but the time partitions def + # has changed, mark the backfill as failed if isinstance( target_partitions_subset, TimeWindowPartitionsSubset ) and target_partitions_subset.partitions_def.get_serializable_unique_identifier( @@ -783,6 +788,7 @@ def _check_no_partitions_def_changes_to_asset( ) else: + # Check that all target partitions still exist. If so, the backfill can continue. for target_key in target_partitions_subset.get_partition_keys(): if not partitions_def.has_partition_key( target_key, instance_queryer.evaluation_time, instance_queryer @@ -799,37 +805,29 @@ def _check_no_partitions_def_changes_to_asset( ) -def _check_and_deserialize_asset_backfill_data( +def _check_validity_and_deserialize_asset_backfill_data( workspace_context: BaseWorkspaceRequestContext, backfill: "PartitionBackfill", asset_graph: AssetGraph, instance_queryer: CachingInstanceQueryer, logger: logging.Logger, ) -> Optional[AssetBackfillData]: + """Attempts to deserialize asset backfill data. If the asset backfill data is valid, + returns the deserialized data, else returns None. + """ unloadable_locations = _get_unloadable_location_names(workspace_context, logger) try: - if backfill.serialized_asset_backfill_data: - asset_backfill_data = AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, + asset_backfill_data = backfill.get_asset_backfill_data(asset_graph) + for asset_key in asset_backfill_data.target_subset.asset_keys: + _check_target_partitions_subset_is_valid( + asset_key, asset_graph, - backfill.backfill_timestamp, + asset_backfill_data.target_subset.get_partitions_subset(asset_key) + if asset_key in asset_backfill_data.target_subset.partitions_subsets_by_asset_key + else None, + instance_queryer, ) - elif backfill.asset_backfill_data: - asset_backfill_data = backfill.asset_backfill_data - for asset_key in asset_backfill_data.target_subset.asset_keys: - _check_no_partitions_def_changes_to_asset( - asset_key, - asset_graph, - asset_backfill_data.target_subset.get_partitions_subset(asset_key) - if asset_key - in asset_backfill_data.target_subset.partitions_subsets_by_asset_key - else None, - instance_queryer, - ) - else: - check.failed("Backfill missing asset_backfill_data") - except DagsterDefinitionChangedDeserializationError as ex: unloadable_locations_error = ( "This could be because it's inside a code location that's failing to load:" @@ -876,7 +874,7 @@ def execute_asset_backfill_iteration( instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time ) - previous_asset_backfill_data = _check_and_deserialize_asset_backfill_data( + previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data( workspace_context, backfill, asset_graph, instance_queryer, logger ) if previous_asset_backfill_data is None: diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index e41d4c5374ddd..8d91f73615425 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -117,14 +117,11 @@ def is_asset_backfill(self) -> bool: self.serialized_asset_backfill_data is not None or self.asset_backfill_data is not None ) - def get_asset_backfill_data(self, asset_graph: AssetGraph) -> Optional[AssetBackfillData]: + def get_asset_backfill_data(self, asset_graph: AssetGraph) -> AssetBackfillData: if self.serialized_asset_backfill_data: - try: - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp - ) - except DagsterDefinitionChangedDeserializationError: - return None + asset_backfill_data = AssetBackfillData.from_serialized( + self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp + ) elif self.asset_backfill_data: asset_backfill_data = self.asset_backfill_data else: @@ -175,12 +172,12 @@ def get_backfill_status_per_asset_key( if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = self.get_asset_backfill_data(asset_graph) - return ( - asset_backfill_data.get_backfill_status_per_asset_key(asset_graph) - if asset_backfill_data - else [] - ) + try: + asset_backfill_data = self.get_asset_backfill_data(asset_graph) + except DagsterDefinitionChangedDeserializationError: + return [] + + return asset_backfill_data.get_backfill_status_per_asset_key(asset_graph) else: return [] @@ -192,12 +189,12 @@ def get_target_partitions_subset( if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = self.get_asset_backfill_data(asset_graph) - return ( - asset_backfill_data.get_target_partitions_subset(asset_key) - if asset_backfill_data - else None - ) + try: + asset_backfill_data = self.get_asset_backfill_data(asset_graph) + except DagsterDefinitionChangedDeserializationError: + return None + + return asset_backfill_data.get_target_partitions_subset(asset_key) else: return None @@ -209,12 +206,12 @@ def get_target_root_partitions_subset( if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = self.get_asset_backfill_data(asset_graph) - return ( - asset_backfill_data.get_target_root_partitions_subset(asset_graph) - if asset_backfill_data - else None - ) + try: + asset_backfill_data = self.get_asset_backfill_data(asset_graph) + except DagsterDefinitionChangedDeserializationError: + return None + + return asset_backfill_data.get_target_root_partitions_subset(asset_graph) else: return None @@ -224,8 +221,12 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]: if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = self.get_asset_backfill_data(asset_graph) - return asset_backfill_data.get_num_partitions() if asset_backfill_data else 0 + try: + asset_backfill_data = self.get_asset_backfill_data(asset_graph) + except DagsterDefinitionChangedDeserializationError: + return 0 + + return asset_backfill_data.get_num_partitions() else: if self.partition_names is None: check.failed("Non-asset backfills should have a non-null partition_names field") @@ -238,8 +239,12 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]: if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = self.get_asset_backfill_data(asset_graph) - return asset_backfill_data.get_partition_names() if asset_backfill_data else None + try: + asset_backfill_data = self.get_asset_backfill_data(asset_graph) + except DagsterDefinitionChangedDeserializationError: + return None + + return asset_backfill_data.get_partition_names() else: if self.partition_names is None: check.failed("Non-asset backfills should have a non-null partition_names field") diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index dbd35771ccef3..1d96dece12266 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -429,7 +429,6 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: """Returns an AssetGraphSubset representing the set of assets that are currently targeted by an active asset backfill. """ - from dagster._core.execution.asset_backfill import AssetBackfillData from dagster._core.execution.backfill import BulkActionStatus asset_backfills = [ @@ -440,26 +439,15 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: result = AssetGraphSubset() for asset_backfill in asset_backfills: - if asset_backfill.serialized_asset_backfill_data: - try: - asset_backfill_data = AssetBackfillData.from_serialized( - asset_backfill.serialized_asset_backfill_data, - self.asset_graph, - asset_backfill.backfill_timestamp, - ) - except DagsterDefinitionChangedDeserializationError: - self._logger.warning( - f"Not considering assets in backfill {asset_backfill.backfill_id} since its" - " data could not be deserialized" - ) - # Backfill can't be loaded, so no risk of the assets interfering - continue - elif asset_backfill.asset_backfill_data: - asset_backfill_data = asset_backfill.asset_backfill_data - else: - check.failed( - "Expected either serialized_asset_backfill_data or asset_backfill_data field" + try: + asset_backfill_data = asset_backfill.get_asset_backfill_data(self.asset_graph) + except DagsterDefinitionChangedDeserializationError: + self._logger.warning( + f"Not considering assets in backfill {asset_backfill.backfill_id} since its" + " data could not be deserialized" ) + # Backfill can't be loaded, so no risk of the assets interfering + continue result |= asset_backfill_data.target_subset diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 0848df2ff6aa4..9b08bf763b2af 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -1340,13 +1340,14 @@ def test_raise_error_on_partitions_defs_removed( instance.add_backfill(backfill) - errors = list( - execute_backfill_iteration( + errors = [ + e + for e in execute_backfill_iteration( partitions_defs_changes_location_2_workspace_context, get_default_daemon_logger("BackfillDaemon"), ) - ) - + if e is not None + ] assert len(errors) == 1 assert ("had a PartitionsDefinition at storage-time, but no longer does") in errors[0].message diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 39d7e9bca6329..d7e9d7b2066a8 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -319,12 +319,6 @@ def repo(): partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=set(), ) - partitions_defs_ids_by_asset_key = { - asset_key: check.not_none( - repo.asset_graph.get_partitions_def(asset_key) - ).get_serializable_unique_identifier(dynamic_partitions_store=instance) - for asset_key in target_subset.partitions_subsets_by_asset_key.keys() - } asset_backfill_data = AssetBackfillData( latest_storage_id=0, target_subset=target_subset, @@ -333,7 +327,6 @@ def repo(): requested_subset=empty_subset, failed_and_downstream_subset=empty_subset, backfill_start_time=test_time, - partitions_def_ids_by_asset_key=partitions_defs_ids_by_asset_key, ) backfill = PartitionBackfill( backfill_id=f"backfill{i}",