Skip to content

Commit

Permalink
backcompat arg
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent 336e81c commit 18f4f88
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def cancel_partition_backfill(
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.serialized_asset_backfill_data:
if backfill.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(graphene_info.context)
_assert_permission_for_asset_graph(
graphene_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,11 @@ def test_remove_partitions_defs_after_backfill():
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]
assert len(backfill_results) == 1
assert backfill_results[0]["numPartitions"] == 0
assert backfill_results[0]["numPartitions"] == 2
assert backfill_results[0]["id"] == backfill_id
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert set(backfill_results[0]["partitionNames"]) == set()
assert set(backfill_results[0]["partitionNames"]) == {"a", "b"}

# on PartitionBackfill
single_backfill_result = execute_dagster_graphql(
Expand Down Expand Up @@ -790,11 +790,9 @@ def _get_backfill_data(
assert len(backfills) == 1
backfill = backfills[0]
assert backfill.backfill_id == backfill_id
assert backfill.serialized_asset_backfill_data
assert backfill.asset_backfill_data

return backfill_id, AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, repo.asset_graph, backfill.backfill_timestamp
)
return backfill_id, backfill.asset_backfill_data


def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.execution.asset_backfill import (
AssetBackfillData,
AssetBackfillIterationResult,
execute_asset_backfill_iteration,
execute_asset_backfill_iteration_inner,
Expand Down Expand Up @@ -209,9 +208,7 @@ def _execute_asset_backfill_iteration_no_side_effects(
However, does not execute side effects i.e. launching runs.
"""
backfill = graphql_context.instance.get_backfill(backfill_id)
asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp
)
asset_backfill_data = backfill.asset_backfill_data
result = None
for result in execute_asset_backfill_iteration_inner(
backfill_id=backfill_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,9 @@ def execute_asset_backfill_iteration(
# Refetch, in case the backfill was canceled in the meantime
backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id))
updated_backfill = backfill.with_asset_backfill_data(
updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph
updated_asset_backfill_data,
dynamic_partitions_store=instance,
asset_graph=asset_graph,
)
if updated_asset_backfill_data.is_complete():
# The asset backfill is complete when all runs to be requested have finished (success,
Expand Down Expand Up @@ -975,7 +977,9 @@ def execute_asset_backfill_iteration(
)

updated_backfill = backfill.with_asset_backfill_data(
updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph
updated_asset_backfill_data,
dynamic_partitions_store=instance,
asset_graph=asset_graph,
)
# The asset backfill is successfully canceled when all requested runs have finished (success,
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
Expand Down
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def with_asset_backfill_data(
dynamic_partitions_store: DynamicPartitionsStore,
asset_graph: AssetGraph,
) -> "PartitionBackfill":
is_backcompat = self.serialized_asset_backfill_data is not None
return PartitionBackfill(
status=self.status,
backfill_id=self.backfill_id,
Expand All @@ -384,8 +385,12 @@ def with_asset_backfill_data(
last_submitted_partition_name=self.last_submitted_partition_name,
error=self.error,
asset_selection=self.asset_selection,
serialized_asset_backfill_data=None,
asset_backfill_data=asset_backfill_data,
serialized_asset_backfill_data=asset_backfill_data.serialize(
dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph
)
if is_backcompat
else None,
asset_backfill_data=asset_backfill_data if not is_backcompat else None,
)

@classmethod
Expand Down
34 changes: 19 additions & 15 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,22 +440,26 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset:

result = AssetGraphSubset()
for asset_backfill in asset_backfills:
if asset_backfill.serialized_asset_backfill_data is None:
check.failed("Asset backfill missing serialized_asset_backfill_data")

try:
asset_backfill_data = AssetBackfillData.from_serialized(
asset_backfill.serialized_asset_backfill_data,
self.asset_graph,
asset_backfill.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
self._logger.warning(
f"Not considering assets in backfill {asset_backfill.backfill_id} since its"
" data could not be deserialized"
if asset_backfill.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
asset_backfill.serialized_asset_backfill_data,
self.asset_graph,
asset_backfill.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
self._logger.warning(
f"Not considering assets in backfill {asset_backfill.backfill_id} since its"
" data could not be deserialized"
)
# Backfill can't be loaded, so no risk of the assets interfering
continue
elif asset_backfill.asset_backfill_data:
asset_backfill_data = asset_backfill.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data field"
)
# Backfill can't be loaded, so no risk of the assets interfering
continue

result |= asset_backfill_data.target_subset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str,
assert False

backfill_data = AssetBackfillData.empty(
target_subset, scenario.evaluation_time, asset_graph
target_subset,
scenario.evaluation_time,
asset_graph,
dynamic_partitions_store=instance,
)

if failures == "no_failures":
Expand Down Expand Up @@ -425,7 +428,12 @@ def make_backfill_data(
else:
assert False

return AssetBackfillData.empty(target_subset, current_time or pendulum.now("UTC"), asset_graph)
return AssetBackfillData.empty(
target_subset,
current_time or pendulum.now("UTC"),
asset_graph,
dynamic_partitions_store=instance,
)


def make_random_subset(
Expand Down
23 changes: 9 additions & 14 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import string
import sys
import time
from typing import cast

import dagster._check as check
import mock
import pendulum
import pytest
Expand Down Expand Up @@ -41,7 +41,7 @@
PartitionsByAssetSelector,
PartitionsSelector,
)
from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE, AssetBackfillData
from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.host_representation import (
ExternalRepository,
Expand Down Expand Up @@ -1109,11 +1109,7 @@ def _override_backfill_cancellation(backfill: PartitionBackfill):
# Check that the requested subset only contains runs that were submitted
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
updated_asset_backfill_data = AssetBackfillData.from_serialized(
cast(str, updated_backfill.serialized_asset_backfill_data),
asset_graph,
backfill.backfill_timestamp,
)
updated_asset_backfill_data = check.not_none(backfill.asset_backfill_data)
assert all(
len(partitions_subset) == RUN_CHUNK_SIZE
for partitions_subset in updated_asset_backfill_data.requested_subset.partitions_subsets_by_asset_key.values()
Expand Down Expand Up @@ -1211,12 +1207,11 @@ def test_asset_backfill_with_multi_run_backfill_policy(

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
updated_asset_backfill_data = AssetBackfillData.from_serialized(
cast(str, updated_backfill.serialized_asset_backfill_data),
asset_graph,
backfill.backfill_timestamp,
)
assert list(updated_asset_backfill_data.requested_subset.iterate_asset_partitions()) == [
assert list(
check.not_none(
updated_backfill.asset_backfill_data
).requested_subset.iterate_asset_partitions()
) == [
AssetKeyPartitionKey(asset_with_multi_run_backfill_policy.key, partition)
for partition in partitions
]
Expand Down Expand Up @@ -1252,7 +1247,7 @@ def test_error_code_location(

assert len(errors) == 1
assert (
"dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset asset_a existed at"
"dagster._core.errors.DagsterAssetBackfillDataLoadError: Asset AssetKey(['asset_a']) existed at"
" storage-time, but no longer does. This could be because it's inside a code location"
" that's failing to load" in errors[0].message
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ def repo():
partitions_subsets_by_asset_key={},
non_partitioned_asset_keys=set(),
)
partition_ids_by_serialized_asset_key = {
asset_key.to_string(): check.not_none(
repo.asset_graph.get_partitions_def(asset_key)
).get_serializable_unique_identifier(dynamic_partitions_store=instance)
for asset_key in target_subset.partitions_subsets_by_asset_key.keys()
}
asset_backfill_data = AssetBackfillData(
latest_storage_id=0,
target_subset=target_subset,
Expand All @@ -327,6 +333,7 @@ def repo():
requested_subset=empty_subset,
failed_and_downstream_subset=empty_subset,
backfill_start_time=test_time,
partitions_ids_by_serialized_asset_key=partition_ids_by_serialized_asset_key,
)
backfill = PartitionBackfill(
backfill_id=f"backfill{i}",
Expand Down

0 comments on commit 18f4f88

Please sign in to comment.