From 2567e2dcd87caf8d1c8a3512ccc84efccd8c65ba Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 9 Oct 2023 09:46:25 -0700 Subject: [PATCH] [gql] Report runless asset events (#17019) This PR adds graphQL to enable reporting asset materializations in the UI. It creates a `reportRunlessAssetEvents` mutation with the following arguments: - event type -- currently only accepts observations and materializations, though we can use this to support asset checks in the UI in the future - asset key -- our planned UIs currently only support targeting one asset at a time - partition keys (optional) - description (optional) In the next PR I plan on enabling displaying the creator of a runless event. --- .../packages/ui-core/src/app/Permissions.tsx | 1 + .../ui-core/src/graphql/permissions.json | 2 +- .../src/graphql/possibleTypes.generated.json | 2 +- .../ui-core/src/graphql/schema.graphql | 24 ++++ .../packages/ui-core/src/graphql/types.ts | 74 ++++++++++ .../implementation/execution/__init__.py | 50 ++++++- .../dagster_graphql/schema/inputs.py | 31 ++++ .../dagster_graphql/schema/roots/mutation.py | 54 +++++++ .../graphql/test_assets.py | 132 ++++++++++++++++++ .../dagster/_core/workspace/permissions.py | 3 + 10 files changed, 370 insertions(+), 3 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/app/Permissions.tsx b/js_modules/dagster-ui/packages/ui-core/src/app/Permissions.tsx index dc6ba9ccabdb3..eaf3f3463445c 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/app/Permissions.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/app/Permissions.tsx @@ -21,6 +21,7 @@ export const EXPECTED_PERMISSIONS = { reload_repository_location: true, reload_workspace: true, wipe_assets: true, + report_runless_asset_events: true, launch_partition_backfill: true, cancel_partition_backfill: true, edit_dynamic_partitions: true, diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/permissions.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/permissions.json index 5903ba86906aa..67183cb895eed 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/permissions.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/permissions.json @@ -1 +1 @@ -[{"permission":"launch_pipeline_execution"},{"permission":"launch_pipeline_reexecution"},{"permission":"start_schedule"},{"permission":"stop_running_schedule"},{"permission":"edit_sensor"},{"permission":"update_sensor_cursor"},{"permission":"terminate_pipeline_execution"},{"permission":"delete_pipeline_run"},{"permission":"reload_repository_location"},{"permission":"reload_workspace"},{"permission":"wipe_assets"},{"permission":"launch_partition_backfill"},{"permission":"cancel_partition_backfill"},{"permission":"edit_dynamic_partitions"},{"permission":"toggle_auto_materialize"},{"permission":"edit_concurrency_limit"}] \ No newline at end of file +[{"permission":"launch_pipeline_execution"},{"permission":"launch_pipeline_reexecution"},{"permission":"start_schedule"},{"permission":"stop_running_schedule"},{"permission":"edit_sensor"},{"permission":"update_sensor_cursor"},{"permission":"terminate_pipeline_execution"},{"permission":"delete_pipeline_run"},{"permission":"reload_repository_location"},{"permission":"reload_workspace"},{"permission":"wipe_assets"},{"permission":"report_runless_asset_events"},{"permission":"launch_partition_backfill"},{"permission":"cancel_partition_backfill"},{"permission":"edit_dynamic_partitions"},{"permission":"toggle_auto_materialize"},{"permission":"edit_concurrency_limit"}] \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index 3391240ca55ea..573722bd647f2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","NullMetadataEntry"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","AssetWipeSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"]} \ No newline at end of file +{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","NullMetadataEntry"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"]} \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 509bdbe931300..81d671c77bb78 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2244,6 +2244,18 @@ input InputTag { value: String! } +input ReportRunlessAssetEventsParams { + eventType: AssetEventType! + assetKey: AssetKeyInput! + partitionKeys: [String] + description: String +} + +enum AssetEventType { + ASSET_MATERIALIZATION + ASSET_OBSERVATION +} + type DaemonHealth { id: String! daemonStatus(daemonType: String): DaemonStatus! @@ -3331,6 +3343,9 @@ type Mutation { repositoryLocationName: String! ): ShutdownRepositoryLocationMutationResult! wipeAssets(assetKeys: [AssetKeyInput!]!): AssetWipeMutationResult! + reportRunlessAssetEvents( + eventParams: ReportRunlessAssetEventsParams! + ): ReportRunlessAssetEventsResult! launchPartitionBackfill(backfillParams: LaunchBackfillParams!): LaunchBackfillResult! resumePartitionBackfill(backfillId: String!): ResumeBackfillResult! cancelPartitionBackfill(backfillId: String!): CancelBackfillResult! @@ -3381,6 +3396,15 @@ type AssetWipeSuccess { assetKeys: [AssetKey!]! } +union ReportRunlessAssetEventsResult = + UnauthorizedError + | PythonError + | ReportRunlessAssetEventsSuccess + +type ReportRunlessAssetEventsSuccess { + assetKey: AssetKey! +} + union ResumeBackfillResult = ResumeBackfillSuccess | UnauthorizedError | PythonError type ResumeBackfillSuccess { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 328d96a73ebea..07dba0bd51c31 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -251,6 +251,11 @@ export type AssetDependency = { inputName: Scalars['String']; }; +export enum AssetEventType { + ASSET_MATERIALIZATION = 'ASSET_MATERIALIZATION', + ASSET_OBSERVATION = 'ASSET_OBSERVATION', +} + export type AssetFreshnessInfo = { __typename: 'AssetFreshnessInfo'; currentLagMinutes: Maybe; @@ -2025,6 +2030,7 @@ export type Mutation = { logTelemetry: LogTelemetryMutationResult; reloadRepositoryLocation: ReloadRepositoryLocationMutationResult; reloadWorkspace: ReloadWorkspaceMutationResult; + reportRunlessAssetEvents: ReportRunlessAssetEventsResult; resumePartitionBackfill: ResumeBackfillResult; scheduleDryRun: ScheduleDryRunResult; sensorDryRun: SensorDryRunResult; @@ -2098,6 +2104,10 @@ export type MutationReloadRepositoryLocationArgs = { repositoryLocationName: Scalars['String']; }; +export type MutationReportRunlessAssetEventsArgs = { + eventParams: ReportRunlessAssetEventsParams; +}; + export type MutationResumePartitionBackfillArgs = { backfillId: Scalars['String']; }; @@ -3154,6 +3164,23 @@ export type ReloadWorkspaceMutation = { export type ReloadWorkspaceMutationResult = PythonError | UnauthorizedError | Workspace; +export type ReportRunlessAssetEventsParams = { + assetKey: AssetKeyInput; + description?: InputMaybe; + eventType: AssetEventType; + partitionKeys?: InputMaybe>>; +}; + +export type ReportRunlessAssetEventsResult = + | PythonError + | ReportRunlessAssetEventsSuccess + | UnauthorizedError; + +export type ReportRunlessAssetEventsSuccess = { + __typename: 'ReportRunlessAssetEventsSuccess'; + assetKey: AssetKey; +}; + export type RepositoriesOrError = PythonError | RepositoryConnection; export type Repository = { @@ -8309,6 +8336,12 @@ export const buildMutation = ( : relationshipsToOmit.has('PythonError') ? ({} as PythonError) : buildPythonError({}, relationshipsToOmit), + reportRunlessAssetEvents: + overrides && overrides.hasOwnProperty('reportRunlessAssetEvents') + ? overrides.reportRunlessAssetEvents! + : relationshipsToOmit.has('PythonError') + ? ({} as PythonError) + : buildPythonError({}, relationshipsToOmit), resumePartitionBackfill: overrides && overrides.hasOwnProperty('resumePartitionBackfill') ? overrides.resumePartitionBackfill! @@ -10209,6 +10242,47 @@ export const buildReloadWorkspaceMutation = ( }; }; +export const buildReportRunlessAssetEventsParams = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): ReportRunlessAssetEventsParams => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('ReportRunlessAssetEventsParams'); + return { + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKeyInput') + ? ({} as AssetKeyInput) + : buildAssetKeyInput({}, relationshipsToOmit), + description: + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'dolores', + eventType: + overrides && overrides.hasOwnProperty('eventType') + ? overrides.eventType! + : AssetEventType.ASSET_MATERIALIZATION, + partitionKeys: + overrides && overrides.hasOwnProperty('partitionKeys') ? overrides.partitionKeys! : [], + }; +}; + +export const buildReportRunlessAssetEventsSuccess = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'ReportRunlessAssetEventsSuccess'} & ReportRunlessAssetEventsSuccess => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('ReportRunlessAssetEventsSuccess'); + return { + __typename: 'ReportRunlessAssetEventsSuccess', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + }; +}; + export const buildRepository = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 800f7f9f91f11..6f5dcdbebdb3d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -7,7 +7,12 @@ import dagster._check as check from dagster._annotations import deprecated from dagster._core.definitions.events import AssetKey -from dagster._core.events import EngineEventData +from dagster._core.events import ( + AssetMaterialization, + AssetObservation, + DagsterEventType, + EngineEventData, +) from dagster._core.instance import ( DagsterInstance, ) @@ -47,6 +52,7 @@ from ...schema.roots.mutation import ( GrapheneAssetWipeSuccess, GrapheneDeletePipelineRunSuccess, + GrapheneReportRunlessAssetEventsSuccess, GrapheneTerminateRunFailure, GrapheneTerminateRunsResult, GrapheneTerminateRunSuccess, @@ -370,3 +376,45 @@ def wipe_assets( instance = graphene_info.context.instance instance.wipe_assets(asset_keys) return GrapheneAssetWipeSuccess(assetKeys=asset_keys) + + +def create_asset_event( + event_type: DagsterEventType, + asset_key: AssetKey, + partition_key: Optional[str], + description: Optional[str], +) -> Union[AssetMaterialization, AssetObservation]: + if event_type == DagsterEventType.ASSET_MATERIALIZATION: + return AssetMaterialization( + asset_key=asset_key, partition=partition_key, description=description + ) + elif event_type == DagsterEventType.ASSET_OBSERVATION: + return AssetObservation( + asset_key=asset_key, partition=partition_key, description=description + ) + else: + check.failed(f"Unexpected event type {event_type}") + + +def report_runless_asset_events( + graphene_info: "ResolveInfo", + event_type: DagsterEventType, + asset_key: AssetKey, + partition_keys: Optional[Sequence[str]] = None, + description: Optional[str] = None, +) -> "GrapheneReportRunlessAssetEventsSuccess": + from ...schema.roots.mutation import GrapheneReportRunlessAssetEventsSuccess + + instance = graphene_info.context.instance + + if partition_keys is not None: + for partition_key in partition_keys: + instance.report_runless_asset_event( + create_asset_event(event_type, asset_key, partition_key, description) + ) + else: + instance.report_runless_asset_event( + create_asset_event(event_type, asset_key, None, description) + ) + + return GrapheneReportRunlessAssetEventsSuccess(assetKey=asset_key) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index a70cbb9f7cedf..fdb370caaeefa 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -1,6 +1,8 @@ import graphene import pendulum +from dagster._core.events import DagsterEventType from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter +from dagster._utils import check from .pipelines.status import GrapheneRunStatus from .runs import GrapheneRunConfigData @@ -188,6 +190,34 @@ class Meta: name = "LaunchBackfillParams" +class GrapheneRunlessAssetEventType(graphene.Enum): + """The event type of an asset event.""" + + ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" + ASSET_OBSERVATION = "ASSET_OBSERVATION" + + class Meta: + name = "AssetEventType" + + def to_dagster_event_type(self) -> DagsterEventType: + if self == GrapheneRunlessAssetEventType.ASSET_MATERIALIZATION: + return DagsterEventType.ASSET_MATERIALIZATION + elif self == GrapheneRunlessAssetEventType.ASSET_OBSERVATION: + return DagsterEventType.ASSET_OBSERVATION + else: + check.assert_never(self) + + +class GrapheneReportRunlessAssetEventsParams(graphene.InputObjectType): + eventType = graphene.NonNull(GrapheneRunlessAssetEventType) + assetKey = graphene.NonNull(GrapheneAssetKeyInput) + partitionKeys = graphene.InputField(graphene.List(graphene.String)) + description = graphene.String() + + class Meta: + name = "ReportRunlessAssetEventsParams" + + class GrapheneSensorSelector(graphene.InputObjectType): repositoryName = graphene.NonNull(graphene.String) repositoryLocationName = graphene.NonNull(graphene.String) @@ -348,4 +378,5 @@ class Meta: GrapheneStepExecution, GrapheneStepOutputHandle, GrapheneInputTag, + GrapheneReportRunlessAssetEventsParams, ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index f0cd518b3f602..ad56b3aa73a77 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -22,6 +22,7 @@ from ...implementation.execution import ( delete_pipeline_run, + report_runless_asset_events, terminate_pipeline_execution, terminate_pipeline_execution_for_runs, wipe_assets, @@ -61,6 +62,7 @@ GrapheneExecutionParams, GrapheneLaunchBackfillParams, GrapheneReexecutionParams, + GrapheneReportRunlessAssetEventsParams, GrapheneRepositorySelector, ) from ..partition_sets import GrapheneAddDynamicPartitionResult @@ -673,6 +675,57 @@ def mutate(self, graphene_info: ResolveInfo, assetKeys: Sequence[GrapheneAssetKe ) +class GrapheneReportRunlessAssetEventsSuccess(graphene.ObjectType): + """Output indicating that runless asset events were reported.""" + + assetKey = graphene.NonNull(GrapheneAssetKey) + + class Meta: + name = "ReportRunlessAssetEventsSuccess" + + +class GrapheneReportRunlessAssetEventsResult(graphene.Union): + """The output from reporting runless events.""" + + class Meta: + types = ( + GrapheneUnauthorizedError, + GraphenePythonError, + GrapheneReportRunlessAssetEventsSuccess, + ) + name = "ReportRunlessAssetEventsResult" + + +class GrapheneReportRunlessAssetEventsMutation(graphene.Mutation): + """Reports runless events for an asset or a subset of its partitions.""" + + Output = graphene.NonNull(GrapheneReportRunlessAssetEventsResult) + + class Arguments: + eventParams = graphene.Argument(graphene.NonNull(GrapheneReportRunlessAssetEventsParams)) + + class Meta: + name = "ReportRunlessAssetEventsMutation" + + @capture_error + @check_permission(Permissions.REPORT_RUNLESS_ASSET_EVENTS) + def mutate( + self, graphene_info: ResolveInfo, eventParams: GrapheneReportRunlessAssetEventsParams + ): + event_type = eventParams["eventType"].to_dagster_event_type() + asset_key = AssetKey.from_graphql_input(eventParams["assetKey"]) + partition_keys = eventParams.get("partitionKeys", None) + description = eventParams.get("description", None) + + return report_runless_asset_events( + graphene_info, + event_type=event_type, + asset_key=asset_key, + partition_keys=partition_keys, + description=description, + ) + + class GrapheneLogTelemetrySuccess(graphene.ObjectType): """Output indicating that telemetry was logged.""" @@ -818,6 +871,7 @@ class Meta: reloadWorkspace = GrapheneReloadWorkspaceMutation.Field() shutdownRepositoryLocation = GrapheneShutdownRepositoryLocationMutation.Field() wipeAssets = GrapheneAssetWipeMutation.Field() + reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field() launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field() resumePartitionBackfill = GrapheneResumeBackfillMutation.Field() cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field() diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index d32900f98af7c..385e5198e44f1 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -3,6 +3,7 @@ import time from typing import Dict, List, Optional, Sequence +import pytest from dagster import ( AssetKey, AssetMaterialization, @@ -19,6 +20,7 @@ from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey from dagster._core.events.log import EventLogEntry from dagster._core.storage.dagster_run import DagsterRunStatus +from dagster._core.storage.event_log.base import EventRecordsFilter from dagster._core.test_utils import instance_for_test, poll_for_finished_run from dagster._core.workspace.context import WorkspaceRequestContext from dagster._utils import Counter, safe_tempfile_path, traced_counter @@ -39,6 +41,7 @@ from dagster_graphql_tests.graphql.graphql_context_test_suite import ( AllRepositoryGraphQLContextTestMatrix, ExecutingGraphQLContextTestMatrix, + ReadonlyGraphQLContextTestMatrix, ) GET_ASSET_KEY_QUERY = """ @@ -99,6 +102,24 @@ } """ +REPORT_RUNLESS_ASSET_EVENTS = """ +mutation reportRunlessAssetEvents($eventParams: ReportRunlessAssetEventsParams!) { + reportRunlessAssetEvents(eventParams: $eventParams) { + __typename + ... on PythonError { + message + stack + } + ... on ReportRunlessAssetEventsSuccess { + assetKey { + path + } + } + } +} +""" + + GET_ASSET_MATERIALIZATION_TIMESTAMP = """ query AssetQuery($assetKey: AssetKeyInput!, $asOf: String) { assetOrError(assetKey: $assetKey) { @@ -774,6 +795,89 @@ def test_asset_wipe(self, graphql_context: WorkspaceRequestContext): asset_keys = graphql_context.instance.all_asset_keys() assert AssetKey("a") not in asset_keys + @pytest.mark.parametrize( + "event_type,asset_key,partitions,description", + [ + ( + DagsterEventType.ASSET_MATERIALIZATION, + AssetKey("asset1"), + None, + None, + ), + ( + DagsterEventType.ASSET_MATERIALIZATION, + AssetKey("asset1"), + None, + "runless materialization", + ), + ( + DagsterEventType.ASSET_MATERIALIZATION, + AssetKey("asset1"), + ["partition1", "partition2"], + None, + ), + ( + DagsterEventType.ASSET_OBSERVATION, + AssetKey("asset1"), + ["partition1", "partition2"], + "runless observation", + ), + ], + ) + def test_report_runless_asset_events( + self, + graphql_context: WorkspaceRequestContext, + event_type: DagsterEventType, + asset_key: AssetKey, + partitions: Optional[Sequence[str]], + description: Optional[str], + ): + assert graphql_context.instance.all_asset_keys() == [] + + result = execute_dagster_graphql( + graphql_context, + REPORT_RUNLESS_ASSET_EVENTS, + variables={ + "eventParams": { + "eventType": event_type.value, + "assetKey": {"path": asset_key.path}, + "partitionKeys": partitions, + "description": description, + } + }, + ) + + assert result.data + assert result.data["reportRunlessAssetEvents"] + assert ( + result.data["reportRunlessAssetEvents"]["__typename"] + == "ReportRunlessAssetEventsSuccess" + ) + + event_records = graphql_context.instance.get_event_records( + EventRecordsFilter( + event_type=event_type, + asset_key=asset_key, + ), + ascending=True, + ) + if partitions: + assert len(event_records) == len(partitions) + else: + assert len(event_records) == 1 + + for i in range(len(event_records)): + assert event_records[i].event_log_entry.dagster_event_type == event_type + assert event_records[i].partition_key == (partitions[i] if partitions else None) + if event_type == DagsterEventType.ASSET_MATERIALIZATION: + materialization = event_records[i].asset_materialization + assert materialization + assert materialization.description == description + else: + observation = event_records[i].asset_observation + assert observation + assert observation.description == description + def test_asset_asof_timestamp(self, graphql_context: WorkspaceRequestContext): _create_run(graphql_context, "asset_tag_job") result = execute_dagster_graphql( @@ -2222,6 +2326,34 @@ def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext): assert a["hasAssetChecks"] is False, f"Asset {a['assetKey']} has asset checks" +class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix): + def test_report_runless_asset_events_permissions( + self, + graphql_context: WorkspaceRequestContext, + ): + assert graphql_context.instance.all_asset_keys() == [] + + result = execute_dagster_graphql( + graphql_context, + REPORT_RUNLESS_ASSET_EVENTS, + variables={ + "eventParams": { + "eventType": DagsterEventType.ASSET_MATERIALIZATION, + "assetKey": {"path": ["asset1"]}, + } + }, + ) + + assert result.data + assert result.data["reportRunlessAssetEvents"] + assert result.data["reportRunlessAssetEvents"]["__typename"] == "UnauthorizedError" + + event_records = graphql_context.instance.get_event_records( + EventRecordsFilter(DagsterEventType.ASSET_MATERIALIZATION) + ) + assert len(event_records) == 0 + + class TestPersistentInstanceAssetInProgress(ExecutingGraphQLContextTestMatrix): def test_asset_in_progress(self, graphql_context: WorkspaceRequestContext): selector = infer_pipeline_selector(graphql_context, "hanging_job") diff --git a/python_modules/dagster/dagster/_core/workspace/permissions.py b/python_modules/dagster/dagster/_core/workspace/permissions.py index 61e0103f55e9e..5089b198797e2 100644 --- a/python_modules/dagster/dagster/_core/workspace/permissions.py +++ b/python_modules/dagster/dagster/_core/workspace/permissions.py @@ -15,6 +15,7 @@ class Permissions(str, Enum): RELOAD_REPOSITORY_LOCATION = "reload_repository_location" RELOAD_WORKSPACE = "reload_workspace" WIPE_ASSETS = "wipe_assets" + REPORT_RUNLESS_ASSET_EVENTS = "report_runless_asset_events" LAUNCH_PARTITION_BACKFILL = "launch_partition_backfill" CANCEL_PARTITION_BACKFILL = "cancel_partition_backfill" EDIT_DYNAMIC_PARTITIONS = "edit_dynamic_partitions" @@ -37,6 +38,7 @@ def __str__(self) -> str: Permissions.RELOAD_REPOSITORY_LOCATION: False, Permissions.RELOAD_WORKSPACE: False, Permissions.WIPE_ASSETS: False, + Permissions.REPORT_RUNLESS_ASSET_EVENTS: False, Permissions.LAUNCH_PARTITION_BACKFILL: False, Permissions.CANCEL_PARTITION_BACKFILL: False, Permissions.EDIT_DYNAMIC_PARTITIONS: False, @@ -56,6 +58,7 @@ def __str__(self) -> str: Permissions.RELOAD_REPOSITORY_LOCATION: True, Permissions.RELOAD_WORKSPACE: True, Permissions.WIPE_ASSETS: True, + Permissions.REPORT_RUNLESS_ASSET_EVENTS: True, Permissions.LAUNCH_PARTITION_BACKFILL: True, Permissions.CANCEL_PARTITION_BACKFILL: True, Permissions.EDIT_DYNAMIC_PARTITIONS: True,