Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 22, 2023
1 parent 0888b69 commit 05a0f5d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 82 deletions.
42 changes: 20 additions & 22 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,12 +751,15 @@ def _submit_runs_and_update_backfill_in_chunks(
yield backfill_data_with_submitted_runs


def _check_no_partitions_def_changes_to_asset(
def _check_target_partitions_subset_is_valid(
asset_key: AssetKey,
asset_graph: AssetGraph,
target_partitions_subset: Optional[PartitionsSubset],
instance_queryer: CachingInstanceQueryer,
) -> None:
"""Checks for any partitions definition changes since backfill launch that should mark
the backfill as failed.
"""
if asset_key not in asset_graph.all_asset_keys:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} existed at storage-time, but no longer does"
Expand All @@ -771,6 +774,8 @@ def _check_no_partitions_def_changes_to_asset(
" does"
)

# If the asset was time-partitioned at storage time but the time partitions def
# has changed, mark the backfill as failed
if isinstance(
target_partitions_subset, TimeWindowPartitionsSubset
) and target_partitions_subset.partitions_def.get_serializable_unique_identifier(
Expand All @@ -783,6 +788,7 @@ def _check_no_partitions_def_changes_to_asset(
)

else:
# Check that all target partitions still exist. If so, the backfill can continue.
for target_key in target_partitions_subset.get_partition_keys():
if not partitions_def.has_partition_key(
target_key, instance_queryer.evaluation_time, instance_queryer
Expand All @@ -799,37 +805,29 @@ def _check_no_partitions_def_changes_to_asset(
)


def _check_and_deserialize_asset_backfill_data(
def _check_validity_and_deserialize_asset_backfill_data(
workspace_context: BaseWorkspaceRequestContext,
backfill: "PartitionBackfill",
asset_graph: AssetGraph,
instance_queryer: CachingInstanceQueryer,
logger: logging.Logger,
) -> Optional[AssetBackfillData]:
"""Attempts to deserialize asset backfill data. If the asset backfill data is valid,
returns the deserialized data, else returns None.
"""
unloadable_locations = _get_unloadable_location_names(workspace_context, logger)

try:
if backfill.serialized_asset_backfill_data:
asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data,
asset_backfill_data = backfill.get_asset_backfill_data(asset_graph)
for asset_key in asset_backfill_data.target_subset.asset_keys:
_check_target_partitions_subset_is_valid(
asset_key,
asset_graph,
backfill.backfill_timestamp,
asset_backfill_data.target_subset.get_partitions_subset(asset_key)
if asset_key in asset_backfill_data.target_subset.partitions_subsets_by_asset_key
else None,
instance_queryer,
)
elif backfill.asset_backfill_data:
asset_backfill_data = backfill.asset_backfill_data
for asset_key in asset_backfill_data.target_subset.asset_keys:
_check_no_partitions_def_changes_to_asset(
asset_key,
asset_graph,
asset_backfill_data.target_subset.get_partitions_subset(asset_key)
if asset_key
in asset_backfill_data.target_subset.partitions_subsets_by_asset_key
else None,
instance_queryer,
)
else:
check.failed("Backfill missing asset_backfill_data")

except DagsterDefinitionChangedDeserializationError as ex:
unloadable_locations_error = (
"This could be because it's inside a code location that's failing to load:"
Expand Down Expand Up @@ -876,7 +874,7 @@ def execute_asset_backfill_iteration(
instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time
)

previous_asset_backfill_data = _check_and_deserialize_asset_backfill_data(
previous_asset_backfill_data = _check_validity_and_deserialize_asset_backfill_data(
workspace_context, backfill, asset_graph, instance_queryer, logger
)
if previous_asset_backfill_data is None:
Expand Down
63 changes: 34 additions & 29 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,11 @@ def is_asset_backfill(self) -> bool:
self.serialized_asset_backfill_data is not None or self.asset_backfill_data is not None
)

def get_asset_backfill_data(self, asset_graph: AssetGraph) -> Optional[AssetBackfillData]:
def get_asset_backfill_data(self, asset_graph: AssetGraph) -> AssetBackfillData:
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp
)
except DagsterDefinitionChangedDeserializationError:
return None
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp
)
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
Expand Down Expand Up @@ -175,12 +172,12 @@ def get_backfill_status_per_asset_key(

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_backfill_status_per_asset_key(asset_graph)
if asset_backfill_data
else []
)
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
return []

return asset_backfill_data.get_backfill_status_per_asset_key(asset_graph)
else:
return []

Expand All @@ -192,12 +189,12 @@ def get_target_partitions_subset(

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_target_partitions_subset(asset_key)
if asset_backfill_data
else None
)
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
return None

return asset_backfill_data.get_target_partitions_subset(asset_key)
else:
return None

Expand All @@ -209,12 +206,12 @@ def get_target_root_partitions_subset(

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_target_root_partitions_subset(asset_graph)
if asset_backfill_data
else None
)
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
return None

return asset_backfill_data.get_target_root_partitions_subset(asset_graph)
else:
return None

Expand All @@ -224,8 +221,12 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]:

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return asset_backfill_data.get_num_partitions() if asset_backfill_data else 0
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
return 0

return asset_backfill_data.get_num_partitions()
else:
if self.partition_names is None:
check.failed("Non-asset backfills should have a non-null partition_names field")
Expand All @@ -238,8 +239,12 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]:

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return asset_backfill_data.get_partition_names() if asset_backfill_data else None
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
return None

return asset_backfill_data.get_partition_names()
else:
if self.partition_names is None:
check.failed("Non-asset backfills should have a non-null partition_names field")
Expand Down
28 changes: 8 additions & 20 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset:
"""Returns an AssetGraphSubset representing the set of assets that are currently targeted by
an active asset backfill.
"""
from dagster._core.execution.asset_backfill import AssetBackfillData
from dagster._core.execution.backfill import BulkActionStatus

asset_backfills = [
Expand All @@ -440,26 +439,15 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset:

result = AssetGraphSubset()
for asset_backfill in asset_backfills:
if asset_backfill.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
asset_backfill.serialized_asset_backfill_data,
self.asset_graph,
asset_backfill.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
self._logger.warning(
f"Not considering assets in backfill {asset_backfill.backfill_id} since its"
" data could not be deserialized"
)
# Backfill can't be loaded, so no risk of the assets interfering
continue
elif asset_backfill.asset_backfill_data:
asset_backfill_data = asset_backfill.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data field"
try:
asset_backfill_data = asset_backfill.get_asset_backfill_data(self.asset_graph)
except DagsterDefinitionChangedDeserializationError:
self._logger.warning(
f"Not considering assets in backfill {asset_backfill.backfill_id} since its"
" data could not be deserialized"
)
# Backfill can't be loaded, so no risk of the assets interfering
continue

result |= asset_backfill_data.target_subset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,13 +1340,14 @@ def test_raise_error_on_partitions_defs_removed(

instance.add_backfill(backfill)

errors = list(
execute_backfill_iteration(
errors = [
e
for e in execute_backfill_iteration(
partitions_defs_changes_location_2_workspace_context,
get_default_daemon_logger("BackfillDaemon"),
)
)

if e is not None
]
assert len(errors) == 1
assert ("had a PartitionsDefinition at storage-time, but no longer does") in errors[0].message

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,6 @@ def repo():
partitions_subsets_by_asset_key={},
non_partitioned_asset_keys=set(),
)
partitions_defs_ids_by_asset_key = {
asset_key: check.not_none(
repo.asset_graph.get_partitions_def(asset_key)
).get_serializable_unique_identifier(dynamic_partitions_store=instance)
for asset_key in target_subset.partitions_subsets_by_asset_key.keys()
}
asset_backfill_data = AssetBackfillData(
latest_storage_id=0,
target_subset=target_subset,
Expand All @@ -333,7 +327,6 @@ def repo():
requested_subset=empty_subset,
failed_and_downstream_subset=empty_subset,
backfill_start_time=test_time,
partitions_def_ids_by_asset_key=partitions_defs_ids_by_asset_key,
)
backfill = PartitionBackfill(
backfill_id=f"backfill{i}",
Expand Down

0 comments on commit 05a0f5d

Please sign in to comment.