diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 800f7f9f91f11..2101c527eae27 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -7,7 +7,12 @@ import dagster._check as check from dagster._annotations import deprecated from dagster._core.definitions.events import AssetKey -from dagster._core.events import EngineEventData +from dagster._core.events import ( + EngineEventData, + DagsterEventType, + AssetMaterialization, + AssetObservation, +) from dagster._core.instance import ( DagsterInstance, ) @@ -46,6 +51,7 @@ from ...schema.errors import GrapheneRunNotFoundError from ...schema.roots.mutation import ( GrapheneAssetWipeSuccess, + GrapheneReportRunlessAssetEventsSuccess, GrapheneDeletePipelineRunSuccess, GrapheneTerminateRunFailure, GrapheneTerminateRunsResult, @@ -370,3 +376,43 @@ def wipe_assets( instance = graphene_info.context.instance instance.wipe_assets(asset_keys) return GrapheneAssetWipeSuccess(assetKeys=asset_keys) + + +def create_asset_event( + event_type: DagsterEventType, + asset_key: AssetKey, + partition_key: Optional[str], + description: Optional[str], +) -> Union[AssetMaterialization, AssetObservation]: + if event_type == DagsterEventType.ASSET_MATERIALIZATION: + return AssetMaterialization( + asset_key=asset_key, partition=partition_key, description=description + ) + elif event_type == DagsterEventType.ASSET_OBSERVATION: + return AssetObservation( + asset_key=asset_key, partition=partition_key, description=description + ) + else: + check.failed(f"Unexpected event type {event_type}") + + +def report_runless_asset_events( + graphene_info: "ResolveInfo", + event_type: DagsterEventType, + asset_key: AssetKey, + partition_keys: Optional[Sequence[str]] = None, + description: Optional[str] = None, +) -> "GrapheneAssetWipeSuccess": + instance = graphene_info.context.instance + + if partition_keys is not None: + for partition_key in partition_keys: + instance.report_runless_asset_event( + create_asset_event(event_type, asset_key, partition_key, description) + ) + else: + instance.report_runless_asset_event( + create_asset_event(event_type, asset_key, None, description) + ) + + return GrapheneReportRunlessAssetEventsSuccess(asset_key=asset_key) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index a70cbb9f7cedf..f6290b755e5d5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -1,6 +1,7 @@ import graphene import pendulum from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter +from dagster._core.events import DagsterEventType from .pipelines.status import GrapheneRunStatus from .runs import GrapheneRunConfigData @@ -188,6 +189,34 @@ class Meta: name = "LaunchBackfillParams" +class GrapheneAssetEventType(graphene.Enum): + """The event type of an asset event.""" + + ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" + ASSET_OBSERVATION = "ASSET_OBSERVATION" + + class Meta: + name = "AssetEventType" + + def to_dagster_event_type(self): + if self == GrapheneAssetEventType.ASSET_MATERIALIZATION: + return DagsterEventType.ASSET_MATERIALIZATION + elif self == GrapheneAssetEventType.ASSET_OBSERVATION: + return DagsterEventType.ASSET_OBSERVATION + else: + check.assert_never(self) + + +class GrapheneReportRunlessAssetEventsParams(graphene.InputObjectType): + eventType = graphene.NonNull(GrapheneAssetEventType) + assetKey = graphene.NonNull(GrapheneAssetKeyInput) + partitionNames = graphene.InputField(graphene.List(graphene.String)) + description = graphene.String() + + class Meta: + name = "ReportRunlessAssetEventsParams" + + class GrapheneSensorSelector(graphene.InputObjectType): repositoryName = graphene.NonNull(graphene.String) repositoryLocationName = graphene.NonNull(graphene.String) @@ -348,4 +377,5 @@ class Meta: GrapheneStepExecution, GrapheneStepOutputHandle, GrapheneInputTag, + GrapheneReportRunlessAssetEventsParams, ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index f0cd518b3f602..63781537bd84f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -25,6 +25,7 @@ terminate_pipeline_execution, terminate_pipeline_execution_for_runs, wipe_assets, + report_runless_asset_events, ) from ...implementation.external import fetch_workspace, get_full_external_job_or_raise from ...implementation.telemetry import log_ui_telemetry_event @@ -58,6 +59,7 @@ from ..external import GrapheneWorkspace, GrapheneWorkspaceLocationEntry from ..inputs import ( GrapheneAssetKeyInput, + GrapheneReportRunlessAssetEventsParams, GrapheneExecutionParams, GrapheneLaunchBackfillParams, GrapheneReexecutionParams, @@ -673,6 +675,50 @@ def mutate(self, graphene_info: ResolveInfo, assetKeys: Sequence[GrapheneAssetKe ) +class GrapheneReportRunlessAssetEventsSuccess(graphene.ObjectType): + """Output indicating that runless asset events were reported.""" + + assetKey = GrapheneAssetKey + + class Meta: + name = "ReportRunlessAssetEventsSuccess" + + +class GrapheneReportRunlessAssetEventsResult(graphene.Union): + """The output from reporting runless events.""" + + class Meta: + types = ( + GrapheneAssetNotFoundError, + GrapheneUnauthorizedError, + GraphenePythonError, + GrapheneReportRunlessAssetEventsSuccess, + ) + name = "ReportRunlessAssetEventsResult" + + +class GrapheneReportRunlessAssetEventsMutation(graphene.Mutation): + """Reports runless events for an asset or a subset of its partitions.""" + + Output = graphene.NonNull(GrapheneReportRunlessAssetEventsResult) + + class Arguments: + eventParams = graphene.Argument(GrapheneReportRunlessAssetEventsParams) + + class Meta: + name = "ReportRunlessAssetEventsMutation" + + @capture_error + @check_permission(Permissions.REPORT_RUNLESS_ASSET_EVENTS) + def mutate( + self, graphene_info: ResolveInfo, eventParams: GrapheneReportRunlessAssetEventsParams + ): + event_type = eventParams.eventType.to_dagster_event_type() + asset_key = AssetKey.from_graphql_input(eventParams["assetKey"]) + + return report_runless_asset_events(graphene_info, AssetKey.from_graphql_input(eventParams)) + + class GrapheneLogTelemetrySuccess(graphene.ObjectType): """Output indicating that telemetry was logged.""" @@ -818,6 +864,7 @@ class Meta: reloadWorkspace = GrapheneReloadWorkspaceMutation.Field() shutdownRepositoryLocation = GrapheneShutdownRepositoryLocationMutation.Field() wipeAssets = GrapheneAssetWipeMutation.Field() + reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field() launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field() resumePartitionBackfill = GrapheneResumeBackfillMutation.Field() cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field() diff --git a/python_modules/dagster/dagster/_core/workspace/permissions.py b/python_modules/dagster/dagster/_core/workspace/permissions.py index 61e0103f55e9e..5089b198797e2 100644 --- a/python_modules/dagster/dagster/_core/workspace/permissions.py +++ b/python_modules/dagster/dagster/_core/workspace/permissions.py @@ -15,6 +15,7 @@ class Permissions(str, Enum): RELOAD_REPOSITORY_LOCATION = "reload_repository_location" RELOAD_WORKSPACE = "reload_workspace" WIPE_ASSETS = "wipe_assets" + REPORT_RUNLESS_ASSET_EVENTS = "report_runless_asset_events" LAUNCH_PARTITION_BACKFILL = "launch_partition_backfill" CANCEL_PARTITION_BACKFILL = "cancel_partition_backfill" EDIT_DYNAMIC_PARTITIONS = "edit_dynamic_partitions" @@ -37,6 +38,7 @@ def __str__(self) -> str: Permissions.RELOAD_REPOSITORY_LOCATION: False, Permissions.RELOAD_WORKSPACE: False, Permissions.WIPE_ASSETS: False, + Permissions.REPORT_RUNLESS_ASSET_EVENTS: False, Permissions.LAUNCH_PARTITION_BACKFILL: False, Permissions.CANCEL_PARTITION_BACKFILL: False, Permissions.EDIT_DYNAMIC_PARTITIONS: False, @@ -56,6 +58,7 @@ def __str__(self) -> str: Permissions.RELOAD_REPOSITORY_LOCATION: True, Permissions.RELOAD_WORKSPACE: True, Permissions.WIPE_ASSETS: True, + Permissions.REPORT_RUNLESS_ASSET_EVENTS: True, Permissions.LAUNCH_PARTITION_BACKFILL: True, Permissions.CANCEL_PARTITION_BACKFILL: True, Permissions.EDIT_DYNAMIC_PARTITIONS: True,