From 96a6b6903bc631d470d2dd1fce00007b350abd71 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 10 Nov 2023 16:35:57 -0800 Subject: [PATCH] backcompat arg --- .../implementation/execution/backfill.py | 2 +- .../graphql/test_asset_backfill.py | 10 +++--- .../graphql/test_partition_backfill.py | 5 +-- .../dagster/_core/execution/asset_backfill.py | 8 +++-- .../dagster/_core/execution/backfill.py | 9 +++-- .../_utils/caching_instance_queryer.py | 34 +++++++++++-------- .../execution_tests/test_asset_backfill.py | 12 +++++-- .../daemon_tests/test_backfill.py | 23 +++++-------- .../auto_materialize_tests/base_scenario.py | 7 ++++ 9 files changed, 64 insertions(+), 46 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index ddee5287d7639..d971f979eadf8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -300,7 +300,7 @@ def cancel_partition_backfill( if not backfill: check.failed(f"No backfill found for id: {backfill_id}") - if backfill.serialized_asset_backfill_data: + if backfill.is_asset_backfill: asset_graph = ExternalAssetGraph.from_workspace(graphene_info.context) _assert_permission_for_asset_graph( graphene_info, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 35cc1a7e7a36f..97ef327520f75 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -454,11 +454,11 @@ def test_remove_partitions_defs_after_backfill(): assert get_backfills_result.data backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] assert len(backfill_results) == 1 - assert backfill_results[0]["numPartitions"] == 0 + assert backfill_results[0]["numPartitions"] == 2 assert backfill_results[0]["id"] == backfill_id assert backfill_results[0]["partitionSet"] is None assert backfill_results[0]["partitionSetName"] is None - assert set(backfill_results[0]["partitionNames"]) == set() + assert set(backfill_results[0]["partitionNames"]) == {"a", "b"} # on PartitionBackfill single_backfill_result = execute_dagster_graphql( @@ -790,11 +790,9 @@ def _get_backfill_data( assert len(backfills) == 1 backfill = backfills[0] assert backfill.backfill_id == backfill_id - assert backfill.serialized_asset_backfill_data + assert backfill.asset_backfill_data - return backfill_id, AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, repo.asset_graph, backfill.backfill_timestamp - ) + return backfill_id, backfill.asset_backfill_data def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index f6cffd301b897..3219f4c83ba2d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -13,7 +13,6 @@ from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.external_asset_graph import ExternalAssetGraph from dagster._core.execution.asset_backfill import ( - AssetBackfillData, AssetBackfillIterationResult, execute_asset_backfill_iteration, execute_asset_backfill_iteration_inner, @@ -209,9 +208,7 @@ def _execute_asset_backfill_iteration_no_side_effects( However, does not execute side effects i.e. launching runs. """ backfill = graphql_context.instance.get_backfill(backfill_id) - asset_backfill_data = AssetBackfillData.from_serialized( - backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp - ) + asset_backfill_data = backfill.asset_backfill_data result = None for result in execute_asset_backfill_iteration_inner( backfill_id=backfill_id, diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 697d33ba35195..9cb7d707d2bb6 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -924,7 +924,9 @@ def execute_asset_backfill_iteration( # Refetch, in case the backfill was canceled in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph + updated_asset_backfill_data, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) if updated_asset_backfill_data.is_complete(): # The asset backfill is complete when all runs to be requested have finished (success, @@ -975,7 +977,9 @@ def execute_asset_backfill_iteration( ) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph + updated_asset_backfill_data, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) # The asset backfill is successfully canceled when all requested runs have finished (success, # failure, or cancellation). Since the AssetBackfillData object stores materialization states diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 2340dcaa27812..99382f3f1ab58 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -372,6 +372,7 @@ def with_asset_backfill_data( dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph, ) -> "PartitionBackfill": + is_backcompat = self.serialized_asset_backfill_data is not None return PartitionBackfill( status=self.status, backfill_id=self.backfill_id, @@ -384,8 +385,12 @@ def with_asset_backfill_data( last_submitted_partition_name=self.last_submitted_partition_name, error=self.error, asset_selection=self.asset_selection, - serialized_asset_backfill_data=None, - asset_backfill_data=asset_backfill_data, + serialized_asset_backfill_data=asset_backfill_data.serialize( + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph + ) + if is_backcompat + else None, + asset_backfill_data=asset_backfill_data if not is_backcompat else None, ) @classmethod diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index ca8829130379b..dbd35771ccef3 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -440,22 +440,26 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: result = AssetGraphSubset() for asset_backfill in asset_backfills: - if asset_backfill.serialized_asset_backfill_data is None: - check.failed("Asset backfill missing serialized_asset_backfill_data") - - try: - asset_backfill_data = AssetBackfillData.from_serialized( - asset_backfill.serialized_asset_backfill_data, - self.asset_graph, - asset_backfill.backfill_timestamp, - ) - except DagsterDefinitionChangedDeserializationError: - self._logger.warning( - f"Not considering assets in backfill {asset_backfill.backfill_id} since its" - " data could not be deserialized" + if asset_backfill.serialized_asset_backfill_data: + try: + asset_backfill_data = AssetBackfillData.from_serialized( + asset_backfill.serialized_asset_backfill_data, + self.asset_graph, + asset_backfill.backfill_timestamp, + ) + except DagsterDefinitionChangedDeserializationError: + self._logger.warning( + f"Not considering assets in backfill {asset_backfill.backfill_id} since its" + " data could not be deserialized" + ) + # Backfill can't be loaded, so no risk of the assets interfering + continue + elif asset_backfill.asset_backfill_data: + asset_backfill_data = asset_backfill.asset_backfill_data + else: + check.failed( + "Expected either serialized_asset_backfill_data or asset_backfill_data field" ) - # Backfill can't be loaded, so no risk of the assets interfering - continue result |= asset_backfill_data.target_subset diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index be11b3a419125..d887dd4cccf51 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -291,7 +291,10 @@ def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str, assert False backfill_data = AssetBackfillData.empty( - target_subset, scenario.evaluation_time, asset_graph + target_subset, + scenario.evaluation_time, + asset_graph, + dynamic_partitions_store=instance, ) if failures == "no_failures": @@ -425,7 +428,12 @@ def make_backfill_data( else: assert False - return AssetBackfillData.empty(target_subset, current_time or pendulum.now("UTC"), asset_graph) + return AssetBackfillData.empty( + target_subset, + current_time or pendulum.now("UTC"), + asset_graph, + dynamic_partitions_store=instance, + ) def make_random_subset( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7fb78209316b1..d7a1b03424b35 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3,8 +3,8 @@ import string import sys import time -from typing import cast +import dagster._check as check import mock import pendulum import pytest @@ -41,7 +41,7 @@ PartitionsByAssetSelector, PartitionsSelector, ) -from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE, AssetBackfillData +from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.host_representation import ( ExternalRepository, @@ -1109,11 +1109,7 @@ def _override_backfill_cancellation(backfill: PartitionBackfill): # Check that the requested subset only contains runs that were submitted updated_backfill = instance.get_backfill(backfill_id) assert updated_backfill - updated_asset_backfill_data = AssetBackfillData.from_serialized( - cast(str, updated_backfill.serialized_asset_backfill_data), - asset_graph, - backfill.backfill_timestamp, - ) + updated_asset_backfill_data = check.not_none(backfill.asset_backfill_data) assert all( len(partitions_subset) == RUN_CHUNK_SIZE for partitions_subset in updated_asset_backfill_data.requested_subset.partitions_subsets_by_asset_key.values() @@ -1211,12 +1207,11 @@ def test_asset_backfill_with_multi_run_backfill_policy( updated_backfill = instance.get_backfill(backfill_id) assert updated_backfill - updated_asset_backfill_data = AssetBackfillData.from_serialized( - cast(str, updated_backfill.serialized_asset_backfill_data), - asset_graph, - backfill.backfill_timestamp, - ) - assert list(updated_asset_backfill_data.requested_subset.iterate_asset_partitions()) == [ + assert list( + check.not_none( + updated_backfill.asset_backfill_data + ).requested_subset.iterate_asset_partitions() + ) == [ AssetKeyPartitionKey(asset_with_multi_run_backfill_policy.key, partition) for partition in partitions ] @@ -1252,7 +1247,7 @@ def test_error_code_location( assert len(errors) == 1 assert ( - "dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset asset_a existed at" + "dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset AssetKey(['asset_a']) existed at" " storage-time, but no longer does. This could be because it's inside a code location" " that's failing to load" in errors[0].message ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index d7e9d7b2066a8..c300ae2d19a22 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -319,6 +319,12 @@ def repo(): partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=set(), ) + partition_ids_by_serialized_asset_key = { + asset_key.to_string(): check.not_none( + repo.asset_graph.get_partitions_def(asset_key) + ).get_serializable_unique_identifier(dynamic_partitions_store=instance) + for asset_key in target_subset.partitions_subsets_by_asset_key.keys() + } asset_backfill_data = AssetBackfillData( latest_storage_id=0, target_subset=target_subset, @@ -327,6 +333,7 @@ def repo(): requested_subset=empty_subset, failed_and_downstream_subset=empty_subset, backfill_start_time=test_time, + partitions_ids_by_serialized_asset_key=partition_ids_by_serialized_asset_key, ) backfill = PartitionBackfill( backfill_id=f"backfill{i}",