From b8ebe0aec2580666ed44cf57f847ae9e03f47bbf Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 27 Nov 2023 09:58:57 -0800 Subject: [PATCH] [7/n subset refactor] Use new asset backfill data serialization format (#17929) At long last, we have arrived to the final part in this stack. This PR migrates `PartitionBackfill` logic to use the new serialization format of `AssetBackfillData`. When new backfills are created from now on, the UI will be able to display the asset backfill page even if the partitions defs are changed/removed. For old backfills, the UI continue to show the "partitions def has changed" message, and the asset backfill page will be blank. Description of changes: - Adds an additional `asset_backfill_data` field to `PartitionBackfill` - Asset backfills from now on will use this field with the new serialization - Existing backfills will continue to use `serialized_asset_backfill_data`. We could force the daemon to migrate these objects mid-backfill, but that value add is pretty low. It also improves debug-ability by forcing old backfills to use the old serialization, and new backfills to use the new serialization. - Serializes the unique ID of each partitions def in a field on `AssetBackfillData`. Adds a new method in asset backfill execution that uses the unique ID to check if partitions defs have changed, in which case we should stop execution. - This previously existed in the old serialization version of `AssetGraphSubset`, but was unfortunately duplicated across each subset type (materialized, in-progress, failed) - Adds tests cases to cover this new surface area --- .../implementation/execution/backfill.py | 2 +- .../graphql/test_asset_backfill.py | 76 +++++++- .../graphql/test_partition_backfill.py | 5 +- .../dagster/_core/definitions/partition.py | 4 +- .../definitions/time_window_partitions.py | 5 +- .../dagster/_core/execution/asset_backfill.py | 159 ++++++++++++--- .../dagster/_core/execution/backfill.py | 120 +++++++----- .../dagster/dagster/_serdes/__init__.py | 1 + .../_utils/caching_instance_queryer.py | 10 +- .../execution_tests/test_asset_backfill.py | 14 +- .../dagster_tests/daemon_tests/conftest.py | 50 +++++ .../daemon_tests/test_backfill.py | 181 ++++++++++++++++-- .../location_1.py | 22 +++ .../location_2.py | 20 ++ 14 files changed, 547 insertions(+), 122 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_1.py create mode 100644 python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_2.py diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index ddee5287d7639..d971f979eadf8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -300,7 +300,7 @@ def cancel_partition_backfill( if not backfill: check.failed(f"No backfill found for id: {backfill_id}") - if backfill.serialized_asset_backfill_data: + if backfill.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(graphene_info.context) _assert_permission_for_asset_graph( graphene_info, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 35cc1a7e7a36f..d417fa980ca3e 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -1,5 +1,6 @@ from typing import Optional, Tuple +import mock from dagster import ( AssetKey, DailyPartitionsDefinition, @@ -421,6 +422,71 @@ def test_launch_asset_backfill(): ) +def test_remove_partitions_defs_after_backfill_backcompat(): + repo = get_repo() + all_asset_keys = repo.asset_graph.materializable_asset_keys + + with instance_for_test() as instance: + with define_out_of_process_context(__file__, "get_repo", instance) as context: + launch_backfill_result = execute_dagster_graphql( + context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": ["a", "b"], + "assetSelection": [key.to_graphql_input() for key in all_asset_keys], + } + }, + ) + backfill_id, asset_backfill_data = _get_backfill_data( + launch_backfill_result, instance, repo + ) + assert asset_backfill_data.target_subset.asset_keys == all_asset_keys + + # Replace the asset backfill data with the backcompat serialization + backfill = instance.get_backfills()[0] + backcompat_backfill = backfill._replace( + asset_backfill_data=None, + serialized_asset_backfill_data=backfill.asset_backfill_data.serialize( + instance, asset_graph=repo.asset_graph + ), + ) + + with mock.patch( + "dagster._core.instance.DagsterInstance.get_backfills", + return_value=[backcompat_backfill], + ): + # When the partitions defs are unchanged, the backfill data can be fetched + with define_out_of_process_context(__file__, "get_repo", instance) as context: + get_backfills_result = execute_dagster_graphql( + context, GET_PARTITION_BACKFILLS_QUERY, variables={} + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + assert len(backfill_results) == 1 + assert backfill_results[0]["numPartitions"] == 2 + assert backfill_results[0]["id"] == backfill_id + assert set(backfill_results[0]["partitionNames"]) == {"a", "b"} + + # When the partitions defs are changed, the backfill data cannot be fetched + with define_out_of_process_context( + __file__, "get_repo_with_non_partitioned_asset", instance + ) as context: + get_backfills_result = execute_dagster_graphql( + context, GET_PARTITION_BACKFILLS_QUERY, variables={} + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + assert len(backfill_results) == 1 + assert backfill_results[0]["numPartitions"] == 0 + assert backfill_results[0]["id"] == backfill_id + assert set(backfill_results[0]["partitionNames"]) == set() + + def test_remove_partitions_defs_after_backfill(): repo = get_repo() all_asset_keys = repo.asset_graph.materializable_asset_keys @@ -454,11 +520,11 @@ def test_remove_partitions_defs_after_backfill(): assert get_backfills_result.data backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] assert len(backfill_results) == 1 - assert backfill_results[0]["numPartitions"] == 0 + assert backfill_results[0]["numPartitions"] == 2 assert backfill_results[0]["id"] == backfill_id assert backfill_results[0]["partitionSet"] is None assert backfill_results[0]["partitionSetName"] is None - assert set(backfill_results[0]["partitionNames"]) == set() + assert set(backfill_results[0]["partitionNames"]) == {"a", "b"} # on PartitionBackfill single_backfill_result = execute_dagster_graphql( @@ -790,11 +856,9 @@ def _get_backfill_data( assert len(backfills) == 1 backfill = backfills[0] assert backfill.backfill_id == backfill_id - assert backfill.serialized_asset_backfill_data + assert backfill.asset_backfill_data - return backfill_id, AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, repo.asset_graph, backfill.backfill_timestamp - ) + return backfill_id, backfill.asset_backfill_data def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index f6cffd301b897..3219f4c83ba2d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -13,7 +13,6 @@ from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.external_asset_graph import ExternalAssetGraph from dagster._core.execution.asset_backfill import ( - AssetBackfillData, AssetBackfillIterationResult, execute_asset_backfill_iteration, execute_asset_backfill_iteration_inner, @@ -209,9 +208,7 @@ def _execute_asset_backfill_iteration_no_side_effects( However, does not execute side effects i.e. launching runs. """ backfill = graphql_context.instance.get_backfill(backfill_id) - asset_backfill_data = AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp - ) + asset_backfill_data = backfill.asset_backfill_data result = None for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 4bfbb0e163cd6..a37c87b2b1a97 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -1266,7 +1266,9 @@ def __eq__(self, other: object) -> bool: ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset": - return other + return other.empty_subset(other.partitions_def).with_partition_keys( + set(self.get_partition_keys()) & set(other.get_partition_keys()) + ) def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset": if self == other: diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index fd761723893a3..484aaa33f7d36 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -47,7 +47,6 @@ create_pendulum_time, to_timezone, ) -from dagster._utils import utc_datetime_from_timestamp from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE from dagster._utils.schedules import ( cron_string_iterator, @@ -200,9 +199,7 @@ def unpack( whitelist_map, context, ) - unpacked_datetime = pendulum.instance( - utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone - ).in_tz(tz=unpacked.timezone) + unpacked_datetime = pendulum.from_timestamp(unpacked.timestamp, unpacked.timezone) check.invariant(unpacked_datetime.tzinfo is not None) return unpacked_datetime diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index c5b1f6ecf653d..7749e0e2fdb22 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -35,10 +35,17 @@ from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset +from dagster._core.definitions.partition import ( + AllPartitionsSubset, + PartitionsDefinition, + PartitionsSubset, +) from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import JobSubsetSelector, PartitionsByAssetSelector -from dagster._core.definitions.time_window_partitions import DatetimeFieldSerializer +from dagster._core.definitions.time_window_partitions import ( + DatetimeFieldSerializer, + TimeWindowPartitionsSubset, +) from dagster._core.errors import ( DagsterAssetBackfillDataLoadError, DagsterBackfillFailedError, @@ -389,7 +396,11 @@ def get_partition_names(self) -> Optional[Sequence[str]]: @classmethod def empty( - cls, target_subset: AssetGraphSubset, backfill_start_time: datetime + cls, + target_subset: AssetGraphSubset, + backfill_start_time: datetime, + asset_graph: AssetGraph, + dynamic_partitions_store: DynamicPartitionsStore, ) -> "AssetBackfillData": return cls( target_subset=target_subset, @@ -482,7 +493,7 @@ def from_partitions_by_assets( partitions_subsets_by_asset_key=partitions_subsets_by_asset_key, non_partitioned_asset_keys=non_partitioned_asset_keys, ) - return cls.empty(target_subset, backfill_start_time) + return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store) @classmethod def from_asset_partitions( @@ -546,7 +557,7 @@ def from_asset_partitions( else: check.failed("Either partition_names must not be None or all_partitions must be True") - return cls.empty(target_subset, backfill_start_time) + return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store) def serialize( self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph @@ -744,31 +755,89 @@ def _submit_runs_and_update_backfill_in_chunks( yield backfill_data_with_submitted_runs -def execute_asset_backfill_iteration( +def _check_target_partitions_subset_is_valid( + asset_key: AssetKey, + asset_graph: AssetGraph, + target_partitions_subset: Optional[PartitionsSubset], + instance_queryer: CachingInstanceQueryer, +) -> None: + """Checks for any partitions definition changes since backfill launch that should mark + the backfill as failed. + """ + if asset_key not in asset_graph.all_asset_keys: + raise DagsterDefinitionChangedDeserializationError( + f"Asset {asset_key} existed at storage-time, but no longer does" + ) + + partitions_def = asset_graph.get_partitions_def(asset_key) + + if target_partitions_subset: # Asset was partitioned at storage time + if partitions_def is None: + raise DagsterDefinitionChangedDeserializationError( + f"Asset {asset_key} had a PartitionsDefinition at storage-time, but no longer" + " does" + ) + + # If the asset was time-partitioned at storage time but the time partitions def + # has changed, mark the backfill as failed + if isinstance( + target_partitions_subset, TimeWindowPartitionsSubset + ) and target_partitions_subset.partitions_def.get_serializable_unique_identifier( + instance_queryer + ) != partitions_def.get_serializable_unique_identifier(instance_queryer): + raise DagsterDefinitionChangedDeserializationError( + f"This partitions definition for asset {asset_key} has changed since this backfill" + " was stored. Changing the partitions definition for a time-partitioned " + "asset during a backfill is not supported." + ) + + else: + # Check that all target partitions still exist. If so, the backfill can continue.a + existent_partitions_subset = ( + AllPartitionsSubset( + partitions_def, + dynamic_partitions_store=instance_queryer, + current_time=instance_queryer.evaluation_time, + ) + & target_partitions_subset + ) + removed_partitions_subset = target_partitions_subset - existent_partitions_subset + if len(removed_partitions_subset) > 0: + raise DagsterDefinitionChangedDeserializationError( + f"Targeted partitions for asset {asset_key} have been removed since this backfill was stored. " + f"The following partitions were removed: {removed_partitions_subset.get_partition_keys()}" + ) + + else: # Asset unpartitioned at storage time + if partitions_def is not None: + raise DagsterDefinitionChangedDeserializationError( + f"Asset {asset_key} was not partitioned at storage-time, but is now" + ) + + +def _check_validity_and_deserialize_asset_backfill_data( + workspace_context: BaseWorkspaceRequestContext, backfill: "PartitionBackfill", + asset_graph: AssetGraph, + instance_queryer: CachingInstanceQueryer, logger: logging.Logger, - workspace_process_context: IWorkspaceProcessContext, - instance: DagsterInstance, -) -> Iterable[None]: - """Runs an iteration of the backfill, including submitting runs and updating the backfill object - in the DB. - - This is a generator so that we can return control to the daemon and let it heartbeat during - expensive operations. +) -> Optional[AssetBackfillData]: + """Attempts to deserialize asset backfill data. If the asset backfill data is valid, + returns the deserialized data, else returns None. """ - from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill - - workspace_context = workspace_process_context.create_request_context() unloadable_locations = _get_unloadable_location_names(workspace_context, logger) - asset_graph = ExternalAssetGraph.from_workspace(workspace_context) - - if backfill.serialized_asset_backfill_data is None: - check.failed("Asset backfill missing serialized_asset_backfill_data") try: - previous_asset_backfill_data = AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp - ) + asset_backfill_data = backfill.get_asset_backfill_data(asset_graph) + for asset_key in asset_backfill_data.target_subset.asset_keys: + _check_target_partitions_subset_is_valid( + asset_key, + asset_graph, + asset_backfill_data.target_subset.get_partitions_subset(asset_key) + if asset_key in asset_backfill_data.target_subset.partitions_subsets_by_asset_key + else None, + instance_queryer, + ) except DagsterDefinitionChangedDeserializationError as ex: unloadable_locations_error = ( "This could be because it's inside a code location that's failing to load:" @@ -782,17 +851,45 @@ def execute_asset_backfill_iteration( " partition in the asset graph. The backfill will resume once it is available" f" again.\n{ex}. {unloadable_locations_error}" ) - yield None - return + return None else: raise DagsterAssetBackfillDataLoadError(f"{ex}. {unloadable_locations_error}") - backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp) + return asset_backfill_data + +def execute_asset_backfill_iteration( + backfill: "PartitionBackfill", + logger: logging.Logger, + workspace_process_context: IWorkspaceProcessContext, + instance: DagsterInstance, +) -> Iterable[None]: + """Runs an iteration of the backfill, including submitting runs and updating the backfill object + in the DB. + + This is a generator so that we can return control to the daemon and let it heartbeat during + expensive operations. + """ + from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill + + workspace_context = workspace_process_context.create_request_context() + + asset_graph = ExternalAssetGraph.from_workspace(workspace_context) + + if not backfill.is_asset_backfill: + check.failed("Backfill must be an asset backfill") + + backfill_start_time = pendulum.from_timestamp(backfill.backfill_timestamp, "UTC") instance_queryer = CachingInstanceQueryer( instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time ) + previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data( + workspace_context, backfill, asset_graph, instance_queryer, logger + ) + if previous_asset_backfill_data is None: + return + if backfill.status == BulkActionStatus.REQUESTED: result = None for result in execute_asset_backfill_iteration_inner( @@ -835,7 +932,9 @@ def execute_asset_backfill_iteration( # Refetch, in case the backfill was canceled in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph + updated_asset_backfill_data, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) if updated_asset_backfill_data.is_complete(): # The asset backfill is complete when all runs to be requested have finished (success, @@ -886,7 +985,9 @@ def execute_asset_backfill_iteration( ) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph + updated_asset_backfill_data, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) # The asset backfill is successfully canceled when all requested runs have finished (success, # failure, or cancellation). Since the AssetBackfillData object stores materialization states diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 81492d36c0472..8d91f73615425 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -1,6 +1,8 @@ from enum import Enum from typing import Mapping, NamedTuple, Optional, Sequence, Union +import pendulum + from dagster import _check as check from dagster._core.definitions import AssetKey from dagster._core.definitions.asset_graph import AssetGraph @@ -13,7 +15,6 @@ from dagster._core.storage.tags import USER_TAG from dagster._core.workspace.workspace import IWorkspace from dagster._serdes import whitelist_for_serdes -from dagster._utils import utc_datetime_from_timestamp from dagster._utils.error import SerializableErrorInfo from ..definitions.selector import PartitionsByAssetSelector @@ -56,6 +57,7 @@ class PartitionBackfill( ("reexecution_steps", Optional[Sequence[str]]), # only used by asset backfills ("serialized_asset_backfill_data", Optional[str]), + ("asset_backfill_data", Optional[AssetBackfillData]), ], ), ): @@ -73,6 +75,7 @@ def __new__( last_submitted_partition_name: Optional[str] = None, reexecution_steps: Optional[Sequence[str]] = None, serialized_asset_backfill_data: Optional[str] = None, + asset_backfill_data: Optional[AssetBackfillData] = None, ): check.invariant( not (asset_selection and reexecution_steps), @@ -101,6 +104,7 @@ def __new__( check.opt_str_param(last_submitted_partition_name, "last_submitted_partition_name"), check.opt_nullable_sequence_param(reexecution_steps, "reexecution_steps", of_type=str), check.opt_str_param(serialized_asset_backfill_data, "serialized_asset_backfill_data"), + check.opt_inst_param(asset_backfill_data, "asset_backfill_data", AssetBackfillData), ) @property @@ -109,7 +113,21 @@ def selector_id(self): @property def is_asset_backfill(self) -> bool: - return self.serialized_asset_backfill_data is not None + return ( + self.serialized_asset_backfill_data is not None or self.asset_backfill_data is not None + ) + + def get_asset_backfill_data(self, asset_graph: AssetGraph) -> AssetBackfillData: + if self.serialized_asset_backfill_data: + asset_backfill_data = AssetBackfillData.from_serialized( + self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp + ) + elif self.asset_backfill_data: + asset_backfill_data = self.asset_backfill_data + else: + check.failed("Expected either serialized_asset_backfill_data or asset_backfill_data") + + return asset_backfill_data @property def bulk_action_type(self) -> BulkActionType: @@ -132,10 +150,14 @@ def user(self) -> Optional[str]: return None def is_valid_serialization(self, workspace: IWorkspace) -> bool: - if self.serialized_asset_backfill_data is not None: - return AssetBackfillData.is_valid_serialization( - self.serialized_asset_backfill_data, ExternalAssetGraph.from_workspace(workspace) - ) + if self.is_asset_backfill: + if self.serialized_asset_backfill_data: + return AssetBackfillData.is_valid_serialization( + self.serialized_asset_backfill_data, + ExternalAssetGraph.from_workspace(workspace), + ) + else: + return True else: return True @@ -148,14 +170,10 @@ def get_backfill_status_per_asset_key( if not self.is_valid_serialization(workspace): return [] - if self.serialized_asset_backfill_data is not None: + if self.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(workspace) try: - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, - asset_graph, - self.backfill_timestamp, - ) + asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: return [] @@ -169,13 +187,10 @@ def get_target_partitions_subset( if not self.is_valid_serialization(workspace): return None - if self.serialized_asset_backfill_data is not None: + if self.is_asset_backfill: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), - self.backfill_timestamp, - ) + asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: return None @@ -189,14 +204,10 @@ def get_target_root_partitions_subset( if not self.is_valid_serialization(workspace): return None - if self.serialized_asset_backfill_data is not None: + if self.is_asset_backfill: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: - asset_graph = ExternalAssetGraph.from_workspace(workspace) - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, - asset_graph, - self.backfill_timestamp, - ) + asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: return None @@ -208,13 +219,10 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]: if not self.is_valid_serialization(workspace): return 0 - if self.serialized_asset_backfill_data is not None: + if self.is_asset_backfill: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), - self.backfill_timestamp, - ) + asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: return 0 @@ -229,13 +237,10 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]: if not self.is_valid_serialization(workspace): return [] - if self.serialized_asset_backfill_data is not None: + if self.is_asset_backfill: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: - asset_backfill_data = AssetBackfillData.from_serialized( - self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), - self.backfill_timestamp, - ) + asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: return None @@ -285,6 +290,7 @@ def with_status(self, status): error=self.error, asset_selection=self.asset_selection, serialized_asset_backfill_data=self.serialized_asset_backfill_data, + asset_backfill_data=self.asset_backfill_data, ) def with_partition_checkpoint(self, last_submitted_partition_name): @@ -302,6 +308,7 @@ def with_partition_checkpoint(self, last_submitted_partition_name): error=self.error, asset_selection=self.asset_selection, serialized_asset_backfill_data=self.serialized_asset_backfill_data, + asset_backfill_data=self.asset_backfill_data, ) def with_error(self, error): @@ -319,6 +326,7 @@ def with_error(self, error): error=error, asset_selection=self.asset_selection, serialized_asset_backfill_data=self.serialized_asset_backfill_data, + asset_backfill_data=self.asset_backfill_data, ) def with_asset_backfill_data( @@ -327,6 +335,7 @@ def with_asset_backfill_data( dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph, ) -> "PartitionBackfill": + is_backcompat = self.serialized_asset_backfill_data is not None return PartitionBackfill( status=self.status, backfill_id=self.backfill_id, @@ -341,7 +350,10 @@ def with_asset_backfill_data( asset_selection=self.asset_selection, serialized_asset_backfill_data=asset_backfill_data.serialize( dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph - ), + ) + if is_backcompat + else None, + asset_backfill_data=asset_backfill_data if not is_backcompat else None, ) @classmethod @@ -364,6 +376,14 @@ def from_asset_partitions( the anchor asset, as well as all partitions of other selected assets that are downstream of those partitions of the anchor asset. """ + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + partition_names=partition_names, + asset_selection=asset_selection, + dynamic_partitions_store=dynamic_partitions_store, + all_partitions=all_partitions, + backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"), + ) return cls( backfill_id=backfill_id, status=BulkActionStatus.REQUESTED, @@ -371,14 +391,8 @@ def from_asset_partitions( tags=tags, backfill_timestamp=backfill_timestamp, asset_selection=asset_selection, - serialized_asset_backfill_data=AssetBackfillData.from_asset_partitions( - asset_graph=asset_graph, - partition_names=partition_names, - asset_selection=asset_selection, - dynamic_partitions_store=dynamic_partitions_store, - all_partitions=all_partitions, - backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), - ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), + serialized_asset_backfill_data=None, + asset_backfill_data=asset_backfill_data, ) @classmethod @@ -391,16 +405,18 @@ def from_partitions_by_assets( dynamic_partitions_store: DynamicPartitionsStore, partitions_by_assets: Sequence[PartitionsByAssetSelector], ): + asset_backfill_data = AssetBackfillData.from_partitions_by_assets( + asset_graph=asset_graph, + dynamic_partitions_store=dynamic_partitions_store, + backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"), + partitions_by_assets=partitions_by_assets, + ) return cls( backfill_id=backfill_id, status=BulkActionStatus.REQUESTED, from_failure=False, tags=tags, backfill_timestamp=backfill_timestamp, - serialized_asset_backfill_data=AssetBackfillData.from_partitions_by_assets( - asset_graph=asset_graph, - dynamic_partitions_store=dynamic_partitions_store, - backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), - partitions_by_assets=partitions_by_assets, - ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), + serialized_asset_backfill_data=None, + asset_backfill_data=asset_backfill_data, ) diff --git a/python_modules/dagster/dagster/_serdes/__init__.py b/python_modules/dagster/dagster/_serdes/__init__.py index da2553c4b94d3..1686360303226 100644 --- a/python_modules/dagster/dagster/_serdes/__init__.py +++ b/python_modules/dagster/dagster/_serdes/__init__.py @@ -6,6 +6,7 @@ from .serdes import ( EnumSerializer as EnumSerializer, NamedTupleSerializer as NamedTupleSerializer, + SerializableNonScalarKeyMapping as SerializableNonScalarKeyMapping, WhitelistMap as WhitelistMap, deserialize_value as deserialize_value, pack_value as pack_value, diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index ca8829130379b..1d96dece12266 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -429,7 +429,6 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: """Returns an AssetGraphSubset representing the set of assets that are currently targeted by an active asset backfill. """ - from dagster._core.execution.asset_backfill import AssetBackfillData from dagster._core.execution.backfill import BulkActionStatus asset_backfills = [ @@ -440,15 +439,8 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: result = AssetGraphSubset() for asset_backfill in asset_backfills: - if asset_backfill.serialized_asset_backfill_data is None: - check.failed("Asset backfill missing serialized_asset_backfill_data") - try: - asset_backfill_data = AssetBackfillData.from_serialized( - asset_backfill.serialized_asset_backfill_data, - self.asset_graph, - asset_backfill.backfill_timestamp, - ) + asset_backfill_data = asset_backfill.get_asset_backfill_data(self.asset_graph) except DagsterDefinitionChangedDeserializationError: self._logger.warning( f"Not considering assets in backfill {asset_backfill.backfill_id} since its" diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index f959c1794b0b5..d887dd4cccf51 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -290,7 +290,12 @@ def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str, else: assert False - backfill_data = AssetBackfillData.empty(target_subset, scenario.evaluation_time) + backfill_data = AssetBackfillData.empty( + target_subset, + scenario.evaluation_time, + asset_graph, + dynamic_partitions_store=instance, + ) if failures == "no_failures": fail_asset_partitions: Set[AssetKeyPartitionKey] = set() @@ -423,7 +428,12 @@ def make_backfill_data( else: assert False - return AssetBackfillData.empty(target_subset, current_time or pendulum.now("UTC")) + return AssetBackfillData.empty( + target_subset, + current_time or pendulum.now("UTC"), + asset_graph, + dynamic_partitions_store=instance, + ) def make_random_subset( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index 93004dacd55e2..2d072c0e4ca1d 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -100,3 +100,53 @@ def unloadable_location_fixture(instance_module_scoped) -> Iterator[WorkspacePro workspace_load_target=invalid_workspace_load_target(), instance=instance_module_scoped ) as workspace_context: yield workspace_context + + +def partitions_def_changes_workspace_1_load_target(attribute=None): + return InProcessTestWorkspaceLoadTarget( + InProcessCodeLocationOrigin( + loadable_target_origin=LoadableTargetOrigin( + executable_path=sys.executable, + module_name="dagster_tests.daemon_tests.test_locations.partitions_defs_changes_locations.location_1", + working_directory=os.getcwd(), + attribute=attribute, + ), + location_name="partitions_def_changes_1", + ) + ) + + +@pytest.fixture(name="partitions_defs_changes_location_1_workspace_context", scope="module") +def partitions_defs_changes_location_1_fixture( + instance_module_scoped +) -> Iterator[WorkspaceProcessContext]: + with create_test_daemon_workspace_context( + workspace_load_target=partitions_def_changes_workspace_1_load_target(), + instance=instance_module_scoped, + ) as workspace_context: + yield workspace_context + + +def partitions_def_changes_workspace_2_load_target(attribute=None): + return InProcessTestWorkspaceLoadTarget( + InProcessCodeLocationOrigin( + loadable_target_origin=LoadableTargetOrigin( + executable_path=sys.executable, + module_name="dagster_tests.daemon_tests.test_locations.partitions_defs_changes_locations.location_2", + working_directory=os.getcwd(), + attribute=attribute, + ), + location_name="partitions_def_changes_1", + ) + ) + + +@pytest.fixture(name="partitions_defs_changes_location_2_workspace_context", scope="module") +def partitions_defs_changes_location_2_fixture( + instance_module_scoped +) -> Iterator[WorkspaceProcessContext]: + with create_test_daemon_workspace_context( + workspace_load_target=partitions_def_changes_workspace_2_load_target(), + instance=instance_module_scoped, + ) as workspace_context: + yield workspace_context diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7fb78209316b1..704732a29f680 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3,8 +3,8 @@ import string import sys import time -from typing import cast +import dagster._check as check import mock import pendulum import pytest @@ -41,7 +41,7 @@ PartitionsByAssetSelector, PartitionsSelector, ) -from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE, AssetBackfillData +from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.host_representation import ( ExternalRepository, @@ -1109,11 +1109,7 @@ def _override_backfill_cancellation(backfill: PartitionBackfill): # Check that the requested subset only contains runs that were submitted updated_backfill = instance.get_backfill(backfill_id) assert updated_backfill - updated_asset_backfill_data = AssetBackfillData.from_serialized( - cast(str, updated_backfill.serialized_asset_backfill_data), - asset_graph, - backfill.backfill_timestamp, - ) + updated_asset_backfill_data = check.not_none(backfill.asset_backfill_data) assert all( len(partitions_subset) == RUN_CHUNK_SIZE for partitions_subset in updated_asset_backfill_data.requested_subset.partitions_subsets_by_asset_key.values() @@ -1211,12 +1207,11 @@ def test_asset_backfill_with_multi_run_backfill_policy( updated_backfill = instance.get_backfill(backfill_id) assert updated_backfill - updated_asset_backfill_data = AssetBackfillData.from_serialized( - cast(str, updated_backfill.serialized_asset_backfill_data), - asset_graph, - backfill.backfill_timestamp, - ) - assert list(updated_asset_backfill_data.requested_subset.iterate_asset_partitions()) == [ + assert list( + check.not_none( + updated_backfill.asset_backfill_data + ).requested_subset.iterate_asset_partitions() + ) == [ AssetKeyPartitionKey(asset_with_multi_run_backfill_policy.key, partition) for partition in partitions ] @@ -1252,8 +1247,166 @@ def test_error_code_location( assert len(errors) == 1 assert ( - "dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset asset_a existed at" + "dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset AssetKey(['asset_a']) existed at" " storage-time, but no longer does. This could be because it's inside a code location" " that's failing to load" in errors[0].message ) assert "Failure loading location" in caplog.text + + +@pytest.mark.parametrize("backcompat_serialization", [True, False]) +def test_raise_error_on_asset_backfill_partitions_defs_changes( + caplog, + instance, + partitions_defs_changes_location_1_workspace_context, + partitions_defs_changes_location_2_workspace_context, + backcompat_serialization: bool, +): + asset_selection = [AssetKey("time_partitions_def_changes")] + partition_keys = ["2023-01-01"] + backfill_id = "dummy_backfill" + asset_graph = ExternalAssetGraph.from_workspace( + partitions_defs_changes_location_1_workspace_context.create_request_context() + ) + + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id=backfill_id, + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + ) + + if backcompat_serialization: + backfill = backfill._replace( + serialized_asset_backfill_data=check.not_none(backfill.asset_backfill_data).serialize( + instance, asset_graph + ), + asset_backfill_data=None, + ) + + instance.add_backfill(backfill) + + errors = list( + execute_backfill_iteration( + partitions_defs_changes_location_2_workspace_context, + get_default_daemon_logger("BackfillDaemon"), + ) + ) + + assert len(errors) == 1 + error_msg = check.not_none(errors[0]).message + assert ("partitions definition has changed") in error_msg or ( + "partitions definition for asset AssetKey(['time_partitions_def_changes']) has changed" + ) in error_msg + + +@pytest.mark.parametrize("backcompat_serialization", [True, False]) +def test_raise_error_on_partitions_defs_removed( + caplog, + instance, + partitions_defs_changes_location_1_workspace_context, + partitions_defs_changes_location_2_workspace_context, + backcompat_serialization: bool, +): + asset_selection = [AssetKey("partitions_def_removed")] + partition_keys = ["2023-01-01"] + backfill_id = "dummy_backfill" + asset_graph = ExternalAssetGraph.from_workspace( + partitions_defs_changes_location_1_workspace_context.create_request_context() + ) + + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id=backfill_id, + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + ) + + if backcompat_serialization: + backfill = backfill._replace( + serialized_asset_backfill_data=check.not_none(backfill.asset_backfill_data).serialize( + instance, asset_graph + ), + asset_backfill_data=None, + ) + + instance.add_backfill(backfill) + + errors = [ + e + for e in execute_backfill_iteration( + partitions_defs_changes_location_2_workspace_context, + get_default_daemon_logger("BackfillDaemon"), + ) + if e is not None + ] + assert len(errors) == 1 + assert ("had a PartitionsDefinition at storage-time, but no longer does") in errors[0].message + + +def test_raise_error_on_target_static_partition_removed( + caplog, + instance, + partitions_defs_changes_location_1_workspace_context, + partitions_defs_changes_location_2_workspace_context, +): + asset_selection = [AssetKey("static_partition_removed")] + partition_keys = ["a"] + asset_graph = ExternalAssetGraph.from_workspace( + partitions_defs_changes_location_1_workspace_context.create_request_context() + ) + + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id="dummy_backfill", + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + ) + instance.add_backfill(backfill) + # When a static partitions def is changed, but all target partitions still exist, + # backfill executes successfully + errors = [ + e + for e in execute_backfill_iteration( + partitions_defs_changes_location_2_workspace_context, + get_default_daemon_logger("BackfillDaemon"), + ) + if e is not None + ] + assert len(errors) == 0 + + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id="dummy_backfill_2", + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=["c"], + dynamic_partitions_store=instance, + all_partitions=False, + ) + instance.add_backfill(backfill) + # When a static partitions def is changed, but any target partitions is removed, + # error is raised + errors = [ + e + for e in execute_backfill_iteration( + partitions_defs_changes_location_2_workspace_context, + get_default_daemon_logger("BackfillDaemon"), + ) + if e is not None + ] + assert len(errors) == 1 + assert ("The following partitions were removed: {'c'}.") in errors[0].message diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_1.py b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_1.py new file mode 100644 index 0000000000000..64206df3aa385 --- /dev/null +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_1.py @@ -0,0 +1,22 @@ +from dagster import DailyPartitionsDefinition, StaticPartitionsDefinition, asset + + +@asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), +) +def time_partitions_def_changes(): + pass + + +@asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), +) +def partitions_def_removed(): + pass + + +@asset( + partitions_def=StaticPartitionsDefinition(["a", "b", "c"]), +) +def static_partition_removed(): + pass diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_2.py b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_2.py new file mode 100644 index 0000000000000..8c2de48680f36 --- /dev/null +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/partitions_defs_changes_locations/location_2.py @@ -0,0 +1,20 @@ +from dagster import DailyPartitionsDefinition, StaticPartitionsDefinition, asset + + +@asset( # partitions def changed to start in June instead of Jan + partitions_def=DailyPartitionsDefinition("2023-06-01"), +) +def time_partitions_def_changes(): + pass + + +@asset # partitions def removed +def partitions_def_removed(): + pass + + +@asset( # partition "c" removed + partitions_def=StaticPartitionsDefinition(["a", "b"]), +) +def static_partition_removed(): + pass