Skip to content

Commit

Permalink
background asset wipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Fulwiler committed Nov 25, 2024
1 parent 4db6e7e commit 00cb5c7
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dagster._check as check
from dagster._annotations import deprecated
from dagster._core.definitions.events import AssetKey, AssetPartitionWipeRange
from dagster._core.definitions.partition import T_str
from dagster._core.events import (
AssetMaterialization,
AssetObservation,
Expand Down Expand Up @@ -57,6 +58,7 @@
GraphenePipelineRunLogsSubscriptionSuccess,
)
from dagster_graphql.schema.roots.mutation import (
GrapheneAssetWipeInProgress,
GrapheneAssetWipeSuccess,
GrapheneDeletePipelineRunSuccess,
GrapheneReportRunlessAssetEventsSuccess,
Expand Down Expand Up @@ -337,49 +339,92 @@ def _enqueue(new_event):
subscription.dispose()


def background_wipe_assets(
graphene_info: "ResolveInfo", asset_partition_ranges: Sequence[AssetPartitionWipeRange]
) -> Union[
"GrapheneAssetWipeInProgress",
"GrapheneAssetNotFoundError",
]:
instance = graphene_info.context.instance
from dagster_graphql.schema.errors import GrapheneAssetNotFoundError
from dagster_graphql.schema.roots.mutation import GrapheneAssetWipeInProgress

try:
whole_assets_to_wipe, asset_partitions_to_wipe = get_asset_wipe_work(
graphene_info, instance, asset_partition_ranges
)
except AssetNotFoundError as e:
return GrapheneAssetNotFoundError(asset_key=e.asset_key)

work_token = instance.background_asset_wipe(whole_assets_to_wipe, asset_partitions_to_wipe)
return GrapheneAssetWipeInProgress(work_token=work_token)


class AssetNotFoundError(BaseException):
def __init__(self, asset_key: AssetKey):
self.asset_key = asset_key


def wipe_assets(
graphene_info: "ResolveInfo", asset_partition_ranges: Sequence[AssetPartitionWipeRange]
) -> Union[
"GrapheneAssetWipeSuccess", "GrapheneUnsupportedOperationError", "GrapheneAssetNotFoundError"
"GrapheneAssetWipeSuccess",
"GrapheneUnsupportedOperationError",
"GrapheneAssetNotFoundError",
]:
instance = graphene_info.context.instance
from dagster_graphql.schema.backfill import GrapheneAssetPartitionRange
from dagster_graphql.schema.errors import (
GrapheneAssetNotFoundError,
GrapheneUnsupportedOperationError,
)
from dagster_graphql.schema.roots.mutation import GrapheneAssetWipeSuccess

instance = graphene_info.context.instance
try:
whole_assets_to_wipe, asset_partitions_to_wipe = get_asset_wipe_work(
graphene_info, instance, asset_partition_ranges
)
except AssetNotFoundError as e:
return GrapheneAssetNotFoundError(asset_key=e.asset_key)

for asset_key, partition_keys in asset_partitions_to_wipe:
try:
instance.wipe_asset_partitions(asset_key, partition_keys)
# NotImplementedError will be thrown if the underlying EventLogStorage does not support
# partitioned asset wipe.
except NotImplementedError:
return GrapheneUnsupportedOperationError("Partitioned asset wipe is not supported yet.")

instance.wipe_assets(whole_assets_to_wipe)

result_ranges = [
GrapheneAssetPartitionRange(asset_key=apr.asset_key, partition_range=apr.partition_range)
for apr in asset_partition_ranges
]
return GrapheneAssetWipeSuccess(assetPartitionRanges=result_ranges)


def get_asset_wipe_work(
graphene_info: "ResolveInfo",
instance: DagsterInstance,
asset_partition_ranges: Sequence[AssetPartitionWipeRange],
) -> Tuple[List[AssetKey], List[Tuple[AssetKey, Sequence[T_str]]]]:
whole_assets_to_wipe: List[AssetKey] = []
asset_partitions_to_wipe: List[Tuple[AssetKey, Sequence[T_str]]] = []
for apr in asset_partition_ranges:
if apr.partition_range is None:
whole_assets_to_wipe.append(apr.asset_key)
else:
if apr.asset_key not in graphene_info.context.asset_graph.asset_node_snaps_by_key:
return GrapheneAssetNotFoundError(asset_key=apr.asset_key)
raise AssetNotFoundError(asset_key=apr.asset_key)

node = graphene_info.context.asset_graph.asset_node_snaps_by_key[apr.asset_key]
partitions_def = check.not_none(node.partitions).get_partitions_definition()
partition_keys = partitions_def.get_partition_keys_in_range(
apr.partition_range, dynamic_partitions_store=instance
)
try:
instance.wipe_asset_partitions(apr.asset_key, partition_keys)

# NotImplementedError will be thrown if the underlying EventLogStorage does not support
# partitioned asset wipe.
except NotImplementedError:
return GrapheneUnsupportedOperationError(
"Partitioned asset wipe is not supported yet."
)

instance.wipe_assets(whole_assets_to_wipe)

result_ranges = [
GrapheneAssetPartitionRange(asset_key=apr.asset_key, partition_range=apr.partition_range)
for apr in asset_partition_ranges
]
return GrapheneAssetWipeSuccess(assetPartitionRanges=result_ranges)
asset_partitions_to_wipe.append((apr.asset_key, partition_keys))
return whole_assets_to_wipe, asset_partitions_to_wipe


def create_asset_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
GrapheneStopSensorMutation,
)
from dagster_graphql.schema.util import ResolveInfo, non_null_list
from python_modules.dagster.dagster._core.definitions.events import BackgroundAssetWipeWorkToken


def create_execution_params(graphene_info, graphql_execution_params):
Expand Down Expand Up @@ -711,6 +712,15 @@ class Meta:
name = "AssetWipeSuccess"


class GrapheneAssetWipeInProgress(graphene.ObjectType):
"""Output indicating that asset history deletion is in progress."""

workToken = graphene.NonNull(BackgroundAssetWipeWorkToken)

class Meta:
name = "AssetWipeInProgress"


class GrapheneAssetWipeMutationResult(graphene.Union):
"""The output from deleting asset history."""

Expand All @@ -725,6 +735,53 @@ class Meta:
name = "AssetWipeMutationResult"


class GrapheneBackgroundAssetWipeMutationResult(graphene.Union):
"""The output from deleting asset history."""

class Meta:
types = (
GrapheneAssetNotFoundError,
GrapheneUnauthorizedError,
GraphenePythonError,
GrapheneUnsupportedOperationError,
GrapheneAssetWipeInProgress,
)
name = "BackgroundAssetWipeMutationResult"


class GrapheneBackgroundAssetWipeMutation(graphene.Mutation):
"""Deletes asset history from storage in the background, returning instead a work
token that can be used to check the progress of the delete.
"""

Output = graphene.NonNull(GrapheneBackgroundAssetWipeMutationResult)

class Arguments:
assetPartitionRanges = graphene.Argument(non_null_list(GraphenePartitionsByAssetSelector))

class Meta:
name = "BackgroundAssetWipeMutation"

@capture_error
@check_permission(Permissions.WIPE_ASSETS)
def mutate(
self,
graphene_info: ResolveInfo,
assetPartitionRanges: Sequence[GraphenePartitionsByAssetSelector],
) -> GrapheneBackgroundAssetWipeMutationResult:
normalized_ranges = [
AssetPartitionWipeRange(
AssetKey.from_graphql_input(ap.assetKey),
PartitionKeyRange(start=ap.partitions.range.start, end=ap.partitions.range.end)
if ap.partitions
else None,
)
for ap in assetPartitionRanges
]

return wipe_assets(graphene_info, normalized_ranges)


class GrapheneAssetWipeMutation(graphene.Mutation):
"""Deletes asset history from storage."""

Expand Down Expand Up @@ -1004,6 +1061,7 @@ class Meta:
reloadWorkspace = GrapheneReloadWorkspaceMutation.Field()
shutdownRepositoryLocation = GrapheneShutdownRepositoryLocationMutation.Field()
wipeAssets = GrapheneAssetWipeMutation.Field()
backgroundWipeAssets = GrapheneBackgroundAssetWipeMutation.Field()
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
Expand Down
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.utils import DEFAULT_OUTPUT, check_valid_name
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, REPORTING_USER_TAG
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer

Expand All @@ -58,8 +59,15 @@ class AssetKeyPartitionKey(NamedTuple):
partition_key: Optional[str] = None


# This is a "work token" associated with the background process responsible
# for wiping the asset
BackgroundAssetWipeWorkToken = str


# This is currently used only for the asset partition wipe codepath. In the future, we can rename
# to AssetPartitionRange or similar for more general use.
@whitelist_for_serdes
@record
class AssetPartitionWipeRange(NamedTuple):
"""An AssetKey with a partition range."""

Expand Down
9 changes: 8 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
AssetCheckEvaluationPlanned,
)
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.events import (
AssetKey,
AssetObservation,
BackgroundAssetWipeWorkToken,
)
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.errors import (
DagsterHomeNotSetError,
Expand Down Expand Up @@ -3276,5 +3280,8 @@ def get_asset_check_support(self) -> "AssetCheckInstanceSupport":
def backfill_log_storage_enabled(self) -> bool:
return False

def background_asset_wipe(self) -> BackgroundAssetWipeWorkToken:
raise NotImplementedError()

def da_request_backfills(self) -> bool:
return False

0 comments on commit 00cb5c7

Please sign in to comment.