Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7/n subset refactor] Use new asset backfill data serialization format #17929

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def cancel_partition_backfill(
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.serialized_asset_backfill_data:
if backfill.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(graphene_info.context)
_assert_permission_for_asset_graph(
graphene_info,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional, Tuple

import mock
from dagster import (
AssetKey,
DailyPartitionsDefinition,
Expand Down Expand Up @@ -421,6 +422,71 @@ def test_launch_asset_backfill():
)


def test_remove_partitions_defs_after_backfill_backcompat():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as context:
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "b"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
backfill_id, asset_backfill_data = _get_backfill_data(
launch_backfill_result, instance, repo
)
assert asset_backfill_data.target_subset.asset_keys == all_asset_keys

# Replace the asset backfill data with the backcompat serialization
backfill = instance.get_backfills()[0]
backcompat_backfill = backfill._replace(
asset_backfill_data=None,
serialized_asset_backfill_data=backfill.asset_backfill_data.serialize(
instance, asset_graph=repo.asset_graph
),
)

with mock.patch(
"dagster._core.instance.DagsterInstance.get_backfills",
return_value=[backcompat_backfill],
):
# When the partitions defs are unchanged, the backfill data can be fetched
with define_out_of_process_context(__file__, "get_repo", instance) as context:
get_backfills_result = execute_dagster_graphql(
context, GET_PARTITION_BACKFILLS_QUERY, variables={}
)
assert not get_backfills_result.errors
assert get_backfills_result.data

backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]
assert len(backfill_results) == 1
assert backfill_results[0]["numPartitions"] == 2
assert backfill_results[0]["id"] == backfill_id
assert set(backfill_results[0]["partitionNames"]) == {"a", "b"}

# When the partitions defs are changed, the backfill data cannot be fetched
with define_out_of_process_context(
__file__, "get_repo_with_non_partitioned_asset", instance
) as context:
get_backfills_result = execute_dagster_graphql(
context, GET_PARTITION_BACKFILLS_QUERY, variables={}
)
assert not get_backfills_result.errors
assert get_backfills_result.data

backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]
assert len(backfill_results) == 1
assert backfill_results[0]["numPartitions"] == 0
assert backfill_results[0]["id"] == backfill_id
assert set(backfill_results[0]["partitionNames"]) == set()


def test_remove_partitions_defs_after_backfill():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys
Expand Down Expand Up @@ -454,11 +520,11 @@ def test_remove_partitions_defs_after_backfill():
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]
assert len(backfill_results) == 1
assert backfill_results[0]["numPartitions"] == 0
assert backfill_results[0]["numPartitions"] == 2
assert backfill_results[0]["id"] == backfill_id
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert set(backfill_results[0]["partitionNames"]) == set()
assert set(backfill_results[0]["partitionNames"]) == {"a", "b"}

# on PartitionBackfill
single_backfill_result = execute_dagster_graphql(
Expand Down Expand Up @@ -790,11 +856,9 @@ def _get_backfill_data(
assert len(backfills) == 1
backfill = backfills[0]
assert backfill.backfill_id == backfill_id
assert backfill.serialized_asset_backfill_data
assert backfill.asset_backfill_data

return backfill_id, AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, repo.asset_graph, backfill.backfill_timestamp
)
return backfill_id, backfill.asset_backfill_data


def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.execution.asset_backfill import (
AssetBackfillData,
AssetBackfillIterationResult,
execute_asset_backfill_iteration,
execute_asset_backfill_iteration_inner,
Expand Down Expand Up @@ -209,9 +208,7 @@ def _execute_asset_backfill_iteration_no_side_effects(
However, does not execute side effects i.e. launching runs.
"""
backfill = graphql_context.instance.get_backfill(backfill_id)
asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp
)
asset_backfill_data = backfill.asset_backfill_data
result = None
for result in execute_asset_backfill_iteration_inner(
backfill_id=backfill_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,9 @@ def __eq__(self, other: object) -> bool:
)

def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset":
return other
return other.empty_subset(other.partitions_def).with_partition_keys(
set(self.get_partition_keys()) & set(other.get_partition_keys())
)

def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset":
if self == other:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
create_pendulum_time,
to_timezone,
)
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -200,9 +199,7 @@ def unpack(
whitelist_map,
context,
)
unpacked_datetime = pendulum.instance(
utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone
).in_tz(tz=unpacked.timezone)
unpacked_datetime = pendulum.from_timestamp(unpacked.timestamp, unpacked.timezone)
check.invariant(unpacked_datetime.tzinfo is not None)
return unpacked_datetime

Expand Down
159 changes: 130 additions & 29 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,17 @@
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition import (
AllPartitionsSubset,
PartitionsDefinition,
PartitionsSubset,
)
from dagster._core.definitions.run_request import RunRequest
from dagster._core.definitions.selector import JobSubsetSelector, PartitionsByAssetSelector
from dagster._core.definitions.time_window_partitions import DatetimeFieldSerializer
from dagster._core.definitions.time_window_partitions import (
DatetimeFieldSerializer,
TimeWindowPartitionsSubset,
)
from dagster._core.errors import (
DagsterAssetBackfillDataLoadError,
DagsterBackfillFailedError,
Expand Down Expand Up @@ -389,7 +396,11 @@ def get_partition_names(self) -> Optional[Sequence[str]]:

@classmethod
def empty(
cls, target_subset: AssetGraphSubset, backfill_start_time: datetime
cls,
target_subset: AssetGraphSubset,
backfill_start_time: datetime,
asset_graph: AssetGraph,
dynamic_partitions_store: DynamicPartitionsStore,
) -> "AssetBackfillData":
return cls(
target_subset=target_subset,
Expand Down Expand Up @@ -482,7 +493,7 @@ def from_partitions_by_assets(
partitions_subsets_by_asset_key=partitions_subsets_by_asset_key,
non_partitioned_asset_keys=non_partitioned_asset_keys,
)
return cls.empty(target_subset, backfill_start_time)
return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store)

@classmethod
def from_asset_partitions(
Expand Down Expand Up @@ -546,7 +557,7 @@ def from_asset_partitions(
else:
check.failed("Either partition_names must not be None or all_partitions must be True")

return cls.empty(target_subset, backfill_start_time)
return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store)

def serialize(
self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph
Expand Down Expand Up @@ -744,31 +755,89 @@ def _submit_runs_and_update_backfill_in_chunks(
yield backfill_data_with_submitted_runs


def execute_asset_backfill_iteration(
def _check_target_partitions_subset_is_valid(
asset_key: AssetKey,
asset_graph: AssetGraph,
target_partitions_subset: Optional[PartitionsSubset],
instance_queryer: CachingInstanceQueryer,
) -> None:
"""Checks for any partitions definition changes since backfill launch that should mark
the backfill as failed.
"""
if asset_key not in asset_graph.all_asset_keys:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} existed at storage-time, but no longer does"
)

partitions_def = asset_graph.get_partitions_def(asset_key)

if target_partitions_subset: # Asset was partitioned at storage time
if partitions_def is None:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} had a PartitionsDefinition at storage-time, but no longer"
" does"
)

# If the asset was time-partitioned at storage time but the time partitions def
# has changed, mark the backfill as failed
if isinstance(
target_partitions_subset, TimeWindowPartitionsSubset
) and target_partitions_subset.partitions_def.get_serializable_unique_identifier(
instance_queryer
) != partitions_def.get_serializable_unique_identifier(instance_queryer):
raise DagsterDefinitionChangedDeserializationError(
f"This partitions definition for asset {asset_key} has changed since this backfill"
" was stored. Changing the partitions definition for a time-partitioned "
"asset during a backfill is not supported."
)

else:
# Check that all target partitions still exist. If so, the backfill can continue.a
existent_partitions_subset = (
AllPartitionsSubset(
partitions_def,
dynamic_partitions_store=instance_queryer,
current_time=instance_queryer.evaluation_time,
)
& target_partitions_subset
)
removed_partitions_subset = target_partitions_subset - existent_partitions_subset
if len(removed_partitions_subset) > 0:
raise DagsterDefinitionChangedDeserializationError(
f"Targeted partitions for asset {asset_key} have been removed since this backfill was stored. "
f"The following partitions were removed: {removed_partitions_subset.get_partition_keys()}"
)

else: # Asset unpartitioned at storage time
if partitions_def is not None:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} was not partitioned at storage-time, but is now"
)


def _check_validity_and_deserialize_asset_backfill_data(
workspace_context: BaseWorkspaceRequestContext,
backfill: "PartitionBackfill",
asset_graph: AssetGraph,
instance_queryer: CachingInstanceQueryer,
logger: logging.Logger,
workspace_process_context: IWorkspaceProcessContext,
instance: DagsterInstance,
) -> Iterable[None]:
"""Runs an iteration of the backfill, including submitting runs and updating the backfill object
in the DB.

This is a generator so that we can return control to the daemon and let it heartbeat during
expensive operations.
) -> Optional[AssetBackfillData]:
sryza marked this conversation as resolved.
Show resolved Hide resolved
"""Attempts to deserialize asset backfill data. If the asset backfill data is valid,
returns the deserialized data, else returns None.
"""
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill

workspace_context = workspace_process_context.create_request_context()
unloadable_locations = _get_unloadable_location_names(workspace_context, logger)
asset_graph = ExternalAssetGraph.from_workspace(workspace_context)

if backfill.serialized_asset_backfill_data is None:
check.failed("Asset backfill missing serialized_asset_backfill_data")

try:
previous_asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp
)
asset_backfill_data = backfill.get_asset_backfill_data(asset_graph)
for asset_key in asset_backfill_data.target_subset.asset_keys:
_check_target_partitions_subset_is_valid(
asset_key,
asset_graph,
asset_backfill_data.target_subset.get_partitions_subset(asset_key)
if asset_key in asset_backfill_data.target_subset.partitions_subsets_by_asset_key
else None,
instance_queryer,
)
except DagsterDefinitionChangedDeserializationError as ex:
unloadable_locations_error = (
"This could be because it's inside a code location that's failing to load:"
Expand All @@ -782,17 +851,45 @@ def execute_asset_backfill_iteration(
" partition in the asset graph. The backfill will resume once it is available"
f" again.\n{ex}. {unloadable_locations_error}"
)
yield None
return
return None
else:
raise DagsterAssetBackfillDataLoadError(f"{ex}. {unloadable_locations_error}")

backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp)
return asset_backfill_data


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
workspace_process_context: IWorkspaceProcessContext,
instance: DagsterInstance,
) -> Iterable[None]:
"""Runs an iteration of the backfill, including submitting runs and updating the backfill object
in the DB.

This is a generator so that we can return control to the daemon and let it heartbeat during
expensive operations.
"""
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill

workspace_context = workspace_process_context.create_request_context()

asset_graph = ExternalAssetGraph.from_workspace(workspace_context)

if not backfill.is_asset_backfill:
check.failed("Backfill must be an asset backfill")

backfill_start_time = pendulum.from_timestamp(backfill.backfill_timestamp, "UTC")
instance_queryer = CachingInstanceQueryer(
instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time
)

previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data(
workspace_context, backfill, asset_graph, instance_queryer, logger
)
if previous_asset_backfill_data is None:
return

if backfill.status == BulkActionStatus.REQUESTED:
result = None
for result in execute_asset_backfill_iteration_inner(
Expand Down Expand Up @@ -835,7 +932,9 @@ def execute_asset_backfill_iteration(
# Refetch, in case the backfill was canceled in the meantime
backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id))
updated_backfill = backfill.with_asset_backfill_data(
updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph
updated_asset_backfill_data,
dynamic_partitions_store=instance,
asset_graph=asset_graph,
)
if updated_asset_backfill_data.is_complete():
# The asset backfill is complete when all runs to be requested have finished (success,
Expand Down Expand Up @@ -886,7 +985,9 @@ def execute_asset_backfill_iteration(
)

updated_backfill = backfill.with_asset_backfill_data(
updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph
updated_asset_backfill_data,
dynamic_partitions_store=instance,
asset_graph=asset_graph,
)
# The asset backfill is successfully canceled when all requested runs have finished (success,
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
Expand Down
Loading