Skip to content

Commit

Permalink
compare serializable partitions defs ids
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent baef1b3 commit 336e81c
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 34 deletions.
149 changes: 120 additions & 29 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ class AssetBackfillData(NamedTuple):
requested_subset: AssetGraphSubset
failed_and_downstream_subset: AssetGraphSubset
backfill_start_time: datetime
partitions_ids_by_serialized_asset_key: Optional[Mapping[str, str]]

# TODO add __new__ that asserts that partitions_ids_by_serialized_asset_key
# contains all keys for target subset

def replace_requested_subset(self, requested_subset: AssetGraphSubset) -> "AssetBackfillData":
return AssetBackfillData(
Expand All @@ -155,6 +159,7 @@ def replace_requested_subset(self, requested_subset: AssetGraphSubset) -> "Asset
failed_and_downstream_subset=self.failed_and_downstream_subset,
requested_subset=requested_subset,
backfill_start_time=self.backfill_start_time,
partitions_ids_by_serialized_asset_key=self.partitions_ids_by_serialized_asset_key,
)

def is_complete(self) -> bool:
Expand Down Expand Up @@ -389,8 +394,18 @@ def get_partition_names(self) -> Optional[Sequence[str]]:

@classmethod
def empty(
cls, target_subset: AssetGraphSubset, backfill_start_time: datetime
cls,
target_subset: AssetGraphSubset,
backfill_start_time: datetime,
asset_graph: AssetGraph,
dynamic_partitions_store: DynamicPartitionsStore,
) -> "AssetBackfillData":
partition_ids_by_serialized_asset_key = {
asset_key.to_string(): check.not_none(
asset_graph.get_partitions_def(asset_key)
).get_serializable_unique_identifier(dynamic_partitions_store)
for asset_key in target_subset.partitions_subsets_by_asset_key.keys()
}
return cls(
target_subset=target_subset,
requested_runs_for_target_roots=False,
Expand All @@ -399,6 +414,7 @@ def empty(
failed_and_downstream_subset=AssetGraphSubset(),
latest_storage_id=None,
backfill_start_time=backfill_start_time,
partitions_ids_by_serialized_asset_key=partition_ids_by_serialized_asset_key,
)

@classmethod
Expand All @@ -414,10 +430,12 @@ def from_serialized(
) -> "AssetBackfillData":
storage_dict = json.loads(serialized)

target_subset = AssetGraphSubset.from_storage_dict(
storage_dict["serialized_target_subset"], asset_graph
)

return cls(
target_subset=AssetGraphSubset.from_storage_dict(
storage_dict["serialized_target_subset"], asset_graph
),
target_subset=target_subset,
requested_runs_for_target_roots=storage_dict["requested_runs_for_target_roots"],
requested_subset=AssetGraphSubset.from_storage_dict(
storage_dict["serialized_requested_subset"], asset_graph
Expand All @@ -430,6 +448,7 @@ def from_serialized(
),
latest_storage_id=storage_dict["latest_storage_id"],
backfill_start_time=utc_datetime_from_timestamp(backfill_start_timestamp),
partitions_ids_by_serialized_asset_key=None,
)

@classmethod
Expand Down Expand Up @@ -482,7 +501,7 @@ def from_partitions_by_assets(
partitions_subsets_by_asset_key=partitions_subsets_by_asset_key,
non_partitioned_asset_keys=non_partitioned_asset_keys,
)
return cls.empty(target_subset, backfill_start_time)
return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store)

@classmethod
def from_asset_partitions(
Expand Down Expand Up @@ -546,7 +565,7 @@ def from_asset_partitions(
else:
check.failed("Either partition_names must not be None or all_partitions must be True")

return cls.empty(target_subset, backfill_start_time)
return cls.empty(target_subset, backfill_start_time, asset_graph, dynamic_partitions_store)

def serialize(
self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph
Expand Down Expand Up @@ -744,31 +763,73 @@ def _submit_runs_and_update_backfill_in_chunks(
yield backfill_data_with_submitted_runs


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
workspace_process_context: IWorkspaceProcessContext,
instance: DagsterInstance,
) -> Iterable[None]:
"""Runs an iteration of the backfill, including submitting runs and updating the backfill object
in the DB.
def _check_no_partitions_def_changes_to_asset(
asset_key: AssetKey,
asset_graph: AssetGraph,
serialized_partitions_def_id: Optional[str],
instance_queryer: CachingInstanceQueryer,
) -> None:
if asset_key not in asset_graph.all_asset_keys:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} existed at storage-time, but no longer does"
)

This is a generator so that we can return control to the daemon and let it heartbeat during
expensive operations.
"""
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
partitions_def = asset_graph.get_partitions_def(asset_key)

if serialized_partitions_def_id: # Asset was partitioned at storage time
if partitions_def is None:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} had a PartitionsDefinition at storage-time, but no longer"
" does"
)

if (
partitions_def.get_serializable_unique_identifier(instance_queryer)
!= serialized_partitions_def_id
):
raise DagsterDefinitionChangedDeserializationError(
f"This partitions definition for asset {asset_key} has changed since this backfill"
" was stored"
)

else: # Asset unpartitioned at storage time
if partitions_def is not None:
raise DagsterDefinitionChangedDeserializationError(
f"Asset {asset_key} was not partitioned at storage-time, but is now"
)

workspace_context = workspace_process_context.create_request_context()
unloadable_locations = _get_unloadable_location_names(workspace_context, logger)
asset_graph = ExternalAssetGraph.from_workspace(workspace_context)

if backfill.serialized_asset_backfill_data is None:
check.failed("Asset backfill missing serialized_asset_backfill_data")
def _check_and_deserialize_asset_backfill_data(
workspace_context: BaseWorkspaceRequestContext,
backfill: "PartitionBackfill",
asset_graph: AssetGraph,
instance_queryer: CachingInstanceQueryer,
logger: logging.Logger,
) -> Optional[AssetBackfillData]:
unloadable_locations = _get_unloadable_location_names(workspace_context, logger)

try:
previous_asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp
)
if backfill.serialized_asset_backfill_data:
asset_backfill_data = AssetBackfillData.from_serialized(
backfill.serialized_asset_backfill_data,
asset_graph,
backfill.backfill_timestamp,
)
elif backfill.asset_backfill_data:
asset_backfill_data = backfill.asset_backfill_data
partitions_ids_by_serialized_asset_key = check.not_none(
asset_backfill_data.partitions_ids_by_serialized_asset_key
)
for asset_key in asset_backfill_data.target_subset.asset_keys:
_check_no_partitions_def_changes_to_asset(
asset_key,
asset_graph,
partitions_ids_by_serialized_asset_key.get(asset_key.to_string()),
instance_queryer,
)
else:
check.failed("Backfill missing asset_backfill_data")

except DagsterDefinitionChangedDeserializationError as ex:
unloadable_locations_error = (
"This could be because it's inside a code location that's failing to load:"
Expand All @@ -782,17 +843,45 @@ def execute_asset_backfill_iteration(
" partition in the asset graph. The backfill will resume once it is available"
f" again.\n{ex}. {unloadable_locations_error}"
)
yield None
return
return None
else:
raise DagsterAssetBackfillDataLoadError(f"{ex}. {unloadable_locations_error}")

backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp)
return asset_backfill_data


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
workspace_process_context: IWorkspaceProcessContext,
instance: DagsterInstance,
) -> Iterable[None]:
"""Runs an iteration of the backfill, including submitting runs and updating the backfill object
in the DB.
This is a generator so that we can return control to the daemon and let it heartbeat during
expensive operations.
"""
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill

workspace_context = workspace_process_context.create_request_context()

asset_graph = ExternalAssetGraph.from_workspace(workspace_context)

if not backfill.is_asset_backfill:
check.failed("Backfill must be an asset backfill")

backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp)
instance_queryer = CachingInstanceQueryer(
instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time
)

previous_asset_backfill_data = _check_and_deserialize_asset_backfill_data(
workspace_context, backfill, asset_graph, instance_queryer, logger
)
if previous_asset_backfill_data is None:
return

if backfill.status == BulkActionStatus.REQUESTED:
result = None
for result in execute_asset_backfill_iteration_inner(
Expand Down Expand Up @@ -937,6 +1026,7 @@ def get_canceling_asset_backfill_iteration_data(
failed_and_downstream_subset=failed_and_downstream_subset,
requested_subset=asset_backfill_data.requested_subset,
backfill_start_time=backfill_start_time,
partitions_ids_by_serialized_asset_key=asset_backfill_data.partitions_ids_by_serialized_asset_key,
)

yield updated_backfill_data
Expand Down Expand Up @@ -1220,6 +1310,7 @@ def execute_asset_backfill_iteration_inner(
requested_subset=asset_backfill_data.requested_subset
| AssetGraphSubset.from_asset_partition_set(set(asset_partitions_to_request), asset_graph),
backfill_start_time=backfill_start_time,
partitions_ids_by_serialized_asset_key=asset_backfill_data.partitions_ids_by_serialized_asset_key,
)
yield AssetBackfillIterationResult(run_requests, updated_asset_backfill_data)

Expand Down
4 changes: 1 addition & 3 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ def get_backfill_status_per_asset_key(
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data,
asset_graph,
self.backfill_timestamp,
self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp
)
except DagsterDefinitionChangedDeserializationError:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ def test_scenario_to_completion(scenario: AssetBackfillScenario, failures: str,
else:
assert False

backfill_data = AssetBackfillData.empty(target_subset, scenario.evaluation_time)
backfill_data = AssetBackfillData.empty(
target_subset, scenario.evaluation_time, asset_graph
)

if failures == "no_failures":
fail_asset_partitions: Set[AssetKeyPartitionKey] = set()
Expand Down Expand Up @@ -423,7 +425,7 @@ def make_backfill_data(
else:
assert False

return AssetBackfillData.empty(target_subset, current_time or pendulum.now("UTC"))
return AssetBackfillData.empty(target_subset, current_time or pendulum.now("UTC"), asset_graph)


def make_random_subset(
Expand Down

0 comments on commit 336e81c

Please sign in to comment.