Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent 1b0494c commit 5732fb7
Showing 1 changed file with 40 additions and 83 deletions.
123 changes: 40 additions & 83 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ def is_asset_backfill(self) -> bool:
self.serialized_asset_backfill_data is not None or self.asset_backfill_data is not None
)

def get_asset_backfill_data(self, asset_graph: AssetGraph) -> Optional[AssetBackfillData]:
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp
)
except DagsterDefinitionChangedDeserializationError:
return None
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed("Expected either serialized_asset_backfill_data or asset_backfill_data")

return asset_backfill_data

@property
def bulk_action_type(self) -> BulkActionType:
if self.is_asset_backfill:
Expand Down Expand Up @@ -160,21 +175,12 @@ def get_backfill_status_per_asset_key(

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data, asset_graph, self.backfill_timestamp
)
except DagsterDefinitionChangedDeserializationError:
return []
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data"
)

return asset_backfill_data.get_backfill_status_per_asset_key(asset_graph)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_backfill_status_per_asset_key(asset_graph)
if asset_backfill_data
else []
)
else:
return []

Expand All @@ -185,23 +191,13 @@ def get_target_partitions_subset(
return None

if self.is_asset_backfill:
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data,
ExternalAssetGraph.from_workspace(workspace),
self.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
return None
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data"
)

return asset_backfill_data.get_target_partitions_subset(asset_key)
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_target_partitions_subset(asset_key)
if asset_backfill_data
else None
)
else:
return None

Expand All @@ -213,23 +209,12 @@ def get_target_root_partitions_subset(

if self.is_asset_backfill:
asset_graph = ExternalAssetGraph.from_workspace(workspace)
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data,
asset_graph,
self.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
return None
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data"
)

return asset_backfill_data.get_target_root_partitions_subset(asset_graph)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return (
asset_backfill_data.get_target_root_partitions_subset(asset_graph)
if asset_backfill_data
else None
)
else:
return None

Expand All @@ -238,23 +223,9 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]:
return 0

if self.is_asset_backfill:
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data,
ExternalAssetGraph.from_workspace(workspace),
self.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
return 0
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data"
)

return asset_backfill_data.get_num_partitions()
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return asset_backfill_data.get_num_partitions() if asset_backfill_data else 0
else:
if self.partition_names is None:
check.failed("Non-asset backfills should have a non-null partition_names field")
Expand All @@ -266,23 +237,9 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]:
return []

if self.is_asset_backfill:
if self.serialized_asset_backfill_data:
try:
asset_backfill_data = AssetBackfillData.from_serialized(
self.serialized_asset_backfill_data,
ExternalAssetGraph.from_workspace(workspace),
self.backfill_timestamp,
)
except DagsterDefinitionChangedDeserializationError:
return None
elif self.asset_backfill_data:
asset_backfill_data = self.asset_backfill_data
else:
check.failed(
"Expected either serialized_asset_backfill_data or asset_backfill_data"
)

return asset_backfill_data.get_partition_names()
asset_graph = ExternalAssetGraph.from_workspace(workspace)
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
return asset_backfill_data.get_partition_names() if asset_backfill_data else None
else:
if self.partition_names is None:
check.failed("Non-asset backfills should have a non-null partition_names field")
Expand Down

0 comments on commit 5732fb7

Please sign in to comment.