From 9e5022b7d058fd3ac064969910898fae2fc7b3a4 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Wed, 13 Dec 2023 10:52:12 -0800 Subject: [PATCH] PoC: AssetConditionEvaluation GQL --- .../src/graphql/possibleTypes.generated.json | 687 +++++++++++++- .../ui-core/src/graphql/schema.graphql | 75 ++ .../packages/ui-core/src/graphql/types.ts | 268 ++++++ .../fetch_asset_condition_evaluations.py | 132 +++ .../schema/asset_condition_evaluations.py | 308 ++++++ .../dagster_graphql/schema/asset_graph.py | 4 +- .../dagster_graphql/schema/roots/query.py | 63 ++ .../test_asset_condition_evaluations.py | 895 ++++++++++++++++++ .../auto_materialize_rule_evaluation.py | 1 + 9 files changed, 2429 insertions(+), 4 deletions(-) create mode 100644 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py create mode 100644 python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py create mode 100644 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index 1a95e3474a195..b52ff13572230 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1,686 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","NullMetadataEntry"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"]} \ No newline at end of file +{ + "DisplayableEvent": [ + "EngineEvent", + "ExecutionStepOutputEvent", + "ExpectationResult", + "FailureMetadata", + "HandledOutputEvent", + "LoadedInputEvent", + "ObjectStoreOperationResult", + "ResourceInitFailureEvent", + "ResourceInitStartedEvent", + "ResourceInitSuccessEvent", + "StepWorkerStartedEvent", + "StepWorkerStartingEvent", + "MaterializationEvent", + "ObservationEvent", + "TypeCheck" + ], + "MarkerEvent": [ + "EngineEvent", + "ResourceInitFailureEvent", + "ResourceInitStartedEvent", + "ResourceInitSuccessEvent", + "StepWorkerStartedEvent", + "StepWorkerStartingEvent" + ], + "ErrorEvent": [ + "EngineEvent", + "ExecutionStepFailureEvent", + "ExecutionStepUpForRetryEvent", + "HookErroredEvent", + "RunFailureEvent", + "ResourceInitFailureEvent" + ], + "MessageEvent": [ + "EngineEvent", + "ExecutionStepFailureEvent", + "ExecutionStepInputEvent", + "ExecutionStepOutputEvent", + "ExecutionStepRestartEvent", + "ExecutionStepSkippedEvent", + "ExecutionStepStartEvent", + "ExecutionStepSuccessEvent", + "ExecutionStepUpForRetryEvent", + "HandledOutputEvent", + "HookCompletedEvent", + "HookErroredEvent", + "HookSkippedEvent", + "LoadedInputEvent", + "LogMessageEvent", + "ObjectStoreOperationEvent", + "RunCanceledEvent", + "RunCancelingEvent", + "RunDequeuedEvent", + "RunEnqueuedEvent", + "RunFailureEvent", + "ResourceInitFailureEvent", + "ResourceInitStartedEvent", + "ResourceInitSuccessEvent", + "RunStartEvent", + "RunStartingEvent", + "RunSuccessEvent", + "StepExpectationResultEvent", + "StepWorkerStartedEvent", + "StepWorkerStartingEvent", + "MaterializationEvent", + "ObservationEvent", + "AssetMaterializationPlannedEvent", + "LogsCapturedEvent", + "AlertStartEvent", + "AlertSuccessEvent", + "AlertFailureEvent", + "AssetCheckEvaluationPlannedEvent", + "AssetCheckEvaluationEvent" + ], + "RunEvent": [ + "RunCanceledEvent", + "RunCancelingEvent", + "RunDequeuedEvent", + "RunEnqueuedEvent", + "RunFailureEvent", + "RunStartEvent", + "RunStartingEvent", + "RunSuccessEvent", + "AssetMaterializationPlannedEvent", + "AlertStartEvent", + "AlertSuccessEvent", + "AlertFailureEvent" + ], + "PipelineRunStepStats": [ + "RunStepStats" + ], + "StepEvent": [ + "EngineEvent", + "ExecutionStepFailureEvent", + "ExecutionStepInputEvent", + "ExecutionStepOutputEvent", + "ExecutionStepRestartEvent", + "ExecutionStepSkippedEvent", + "ExecutionStepStartEvent", + "ExecutionStepSuccessEvent", + "ExecutionStepUpForRetryEvent", + "HandledOutputEvent", + "HookCompletedEvent", + "HookErroredEvent", + "HookSkippedEvent", + "LoadedInputEvent", + "ObjectStoreOperationEvent", + "ResourceInitFailureEvent", + "ResourceInitStartedEvent", + "ResourceInitSuccessEvent", + "StepExpectationResultEvent", + "StepWorkerStartedEvent", + "StepWorkerStartingEvent", + "MaterializationEvent", + "ObservationEvent", + "AssetCheckEvaluationPlannedEvent", + "AssetCheckEvaluationEvent" + ], + "AssetPartitionStatuses": [ + "DefaultPartitionStatuses", + "MultiPartitionStatuses", + "TimePartitionStatuses" + ], + "PartitionStatus1D": [ + "TimePartitionStatuses", + "DefaultPartitionStatuses" + ], + "AssetChecksOrError": [ + "AssetChecks", + "AssetCheckNeedsMigrationError", + "AssetCheckNeedsUserCodeUpgrade", + "AssetCheckNeedsAgentUpgradeError" + ], + "Instigator": [ + "Schedule", + "Sensor" + ], + "EvaluationStackEntry": [ + "EvaluationStackListItemEntry", + "EvaluationStackPathEntry", + "EvaluationStackMapKeyEntry", + "EvaluationStackMapValueEntry" + ], + "IPipelineSnapshot": [ + "Pipeline", + "PipelineSnapshot", + "Job" + ], + "PipelineConfigValidationError": [ + "FieldNotDefinedConfigError", + "FieldsNotDefinedConfigError", + "MissingFieldConfigError", + "MissingFieldsConfigError", + "RuntimeMismatchConfigError", + "SelectorTypeConfigError" + ], + "PipelineConfigValidationInvalid": [ + "RunConfigValidationInvalid" + ], + "PipelineConfigValidationResult": [ + "InvalidSubsetError", + "PipelineConfigValidationValid", + "RunConfigValidationInvalid", + "PipelineNotFoundError", + "PythonError" + ], + "PipelineReference": [ + "PipelineSnapshot", + "UnknownPipeline" + ], + "PipelineRun": [ + "Run" + ], + "DagsterRunEvent": [ + "ExecutionStepFailureEvent", + "ExecutionStepInputEvent", + "ExecutionStepOutputEvent", + "ExecutionStepSkippedEvent", + "ExecutionStepStartEvent", + "ExecutionStepSuccessEvent", + "ExecutionStepUpForRetryEvent", + "ExecutionStepRestartEvent", + "LogMessageEvent", + "ResourceInitFailureEvent", + "ResourceInitStartedEvent", + "ResourceInitSuccessEvent", + "RunFailureEvent", + "RunStartEvent", + "RunEnqueuedEvent", + "RunDequeuedEvent", + "RunStartingEvent", + "RunCancelingEvent", + "RunCanceledEvent", + "RunSuccessEvent", + "StepWorkerStartedEvent", + "StepWorkerStartingEvent", + "HandledOutputEvent", + "LoadedInputEvent", + "LogsCapturedEvent", + "ObjectStoreOperationEvent", + "StepExpectationResultEvent", + "MaterializationEvent", + "ObservationEvent", + "EngineEvent", + "HookCompletedEvent", + "HookSkippedEvent", + "HookErroredEvent", + "AlertStartEvent", + "AlertSuccessEvent", + "AlertFailureEvent", + "AssetMaterializationPlannedEvent", + "AssetCheckEvaluationPlannedEvent", + "AssetCheckEvaluationEvent" + ], + "PipelineRunLogsSubscriptionPayload": [ + "PipelineRunLogsSubscriptionSuccess", + "PipelineRunLogsSubscriptionFailure" + ], + "RunOrError": [ + "Run", + "RunNotFoundError", + "PythonError" + ], + "PipelineRunStatsSnapshot": [ + "RunStatsSnapshot" + ], + "RunStatsSnapshotOrError": [ + "RunStatsSnapshot", + "PythonError" + ], + "PipelineSnapshotOrError": [ + "PipelineNotFoundError", + "PipelineSnapshot", + "PipelineSnapshotNotFoundError", + "PythonError" + ], + "AssetOrError": [ + "Asset", + "AssetNotFoundError" + ], + "AssetsOrError": [ + "AssetConnection", + "PythonError" + ], + "DeletePipelineRunResult": [ + "DeletePipelineRunSuccess", + "UnauthorizedError", + "PythonError", + "RunNotFoundError" + ], + "ExecutionPlanOrError": [ + "ExecutionPlan", + "RunConfigValidationInvalid", + "PipelineNotFoundError", + "InvalidSubsetError", + "PythonError" + ], + "PipelineOrError": [ + "Pipeline", + "PipelineNotFoundError", + "InvalidSubsetError", + "PythonError" + ], + "ReloadRepositoryLocationMutationResult": [ + "WorkspaceLocationEntry", + "ReloadNotSupported", + "RepositoryLocationNotFound", + "UnauthorizedError", + "PythonError" + ], + "RepositoryLocationOrLoadError": [ + "RepositoryLocation", + "PythonError" + ], + "ReloadWorkspaceMutationResult": [ + "Workspace", + "UnauthorizedError", + "PythonError" + ], + "ShutdownRepositoryLocationMutationResult": [ + "ShutdownRepositoryLocationSuccess", + "RepositoryLocationNotFound", + "UnauthorizedError", + "PythonError" + ], + "TerminatePipelineExecutionFailure": [ + "TerminateRunFailure" + ], + "TerminatePipelineExecutionSuccess": [ + "TerminateRunSuccess" + ], + "TerminateRunResult": [ + "TerminateRunSuccess", + "TerminateRunFailure", + "RunNotFoundError", + "UnauthorizedError", + "PythonError" + ], + "ScheduleMutationResult": [ + "PythonError", + "UnauthorizedError", + "ScheduleStateResult" + ], + "ScheduleOrError": [ + "Schedule", + "ScheduleNotFoundError", + "PythonError" + ], + "SchedulerOrError": [ + "Scheduler", + "SchedulerNotDefinedError", + "PythonError" + ], + "SchedulesOrError": [ + "Schedules", + "RepositoryNotFoundError", + "PythonError" + ], + "ScheduleTickSpecificData": [ + "ScheduleTickSuccessData", + "ScheduleTickFailureData" + ], + "LaunchBackfillResult": [ + "LaunchBackfillSuccess", + "PartitionSetNotFoundError", + "InvalidStepError", + "InvalidOutputError", + "RunConfigValidationInvalid", + "PipelineNotFoundError", + "RunConflict", + "UnauthorizedError", + "PythonError", + "InvalidSubsetError", + "PresetNotFoundError", + "ConflictingExecutionParamsError", + "NoModeProvidedError" + ], + "ConfigTypeOrError": [ + "EnumConfigType", + "CompositeConfigType", + "RegularConfigType", + "PipelineNotFoundError", + "ConfigTypeNotFoundError", + "PythonError" + ], + "ConfigType": [ + "ArrayConfigType", + "CompositeConfigType", + "EnumConfigType", + "NullableConfigType", + "RegularConfigType", + "ScalarUnionConfigType", + "MapConfigType" + ], + "WrappingConfigType": [ + "ArrayConfigType", + "NullableConfigType" + ], + "DagsterType": [ + "ListDagsterType", + "NullableDagsterType", + "RegularDagsterType" + ], + "DagsterTypeOrError": [ + "RegularDagsterType", + "PipelineNotFoundError", + "DagsterTypeNotFoundError", + "PythonError" + ], + "WrappingDagsterType": [ + "ListDagsterType", + "NullableDagsterType" + ], + "Error": [ + "AssetCheckNeedsMigrationError", + "AssetCheckNeedsUserCodeUpgrade", + "AssetCheckNeedsAgentUpgradeError", + "AssetNotFoundError", + "ConflictingExecutionParamsError", + "ConfigTypeNotFoundError", + "DagsterTypeNotFoundError", + "InvalidPipelineRunsFilterError", + "InvalidSubsetError", + "ModeNotFoundError", + "NoModeProvidedError", + "PartitionSetNotFoundError", + "PipelineNotFoundError", + "RunConflict", + "PipelineSnapshotNotFoundError", + "PresetNotFoundError", + "PythonError", + "ErrorChainLink", + "UnauthorizedError", + "ReloadNotSupported", + "RepositoryLocationNotFound", + "RepositoryNotFoundError", + "ResourceNotFoundError", + "RunGroupNotFoundError", + "RunNotFoundError", + "ScheduleNotFoundError", + "SchedulerNotDefinedError", + "SensorNotFoundError", + "DuplicateDynamicPartitionError", + "InstigationStateNotFoundError", + "SolidStepStatusUnavailableError", + "GraphNotFoundError", + "BackfillNotFoundError", + "PartitionSubsetDeserializationError", + "AutoMaterializeAssetEvaluationNeedsMigrationError" + ], + "PipelineRunConflict": [ + "RunConflict" + ], + "PipelineRunNotFoundError": [ + "RunNotFoundError" + ], + "RepositoriesOrError": [ + "RepositoryConnection", + "PythonError" + ], + "RepositoryOrError": [ + "PythonError", + "Repository", + "RepositoryNotFoundError" + ], + "InstigationTypeSpecificData": [ + "SensorData", + "ScheduleData" + ], + "InstigationStateOrError": [ + "InstigationState", + "InstigationStateNotFoundError", + "PythonError" + ], + "InstigationStatesOrError": [ + "InstigationStates", + "PythonError" + ], + "MetadataEntry": [ + "TableSchemaMetadataEntry", + "TableMetadataEntry", + "FloatMetadataEntry", + "IntMetadataEntry", + "JsonMetadataEntry", + "BoolMetadataEntry", + "MarkdownMetadataEntry", + "PathMetadataEntry", + "NotebookMetadataEntry", + "PythonArtifactMetadataEntry", + "TextMetadataEntry", + "UrlMetadataEntry", + "PipelineRunMetadataEntry", + "AssetMetadataEntry", + "JobMetadataEntry", + "NullMetadataEntry" + ], + "PartitionRunConfigOrError": [ + "PartitionRunConfig", + "PythonError" + ], + "AssetBackfillStatus": [ + "AssetPartitionsStatusCounts", + "UnpartitionedAssetStatus" + ], + "PartitionSetOrError": [ + "PartitionSet", + "PartitionSetNotFoundError", + "PythonError" + ], + "PartitionSetsOrError": [ + "PartitionSets", + "PipelineNotFoundError", + "PythonError" + ], + "PartitionsOrError": [ + "Partitions", + "PythonError" + ], + "PartitionStatusesOrError": [ + "PartitionStatuses", + "PythonError" + ], + "PartitionTagsOrError": [ + "PartitionTags", + "PythonError" + ], + "RunConfigSchemaOrError": [ + "RunConfigSchema", + "PipelineNotFoundError", + "InvalidSubsetError", + "ModeNotFoundError", + "PythonError" + ], + "LaunchRunResult": [ + "LaunchRunSuccess", + "InvalidStepError", + "InvalidOutputError", + "RunConfigValidationInvalid", + "PipelineNotFoundError", + "RunConflict", + "UnauthorizedError", + "PythonError", + "InvalidSubsetError", + "PresetNotFoundError", + "ConflictingExecutionParamsError", + "NoModeProvidedError" + ], + "LaunchRunReexecutionResult": [ + "LaunchRunSuccess", + "InvalidStepError", + "InvalidOutputError", + "RunConfigValidationInvalid", + "PipelineNotFoundError", + "RunConflict", + "UnauthorizedError", + "PythonError", + "InvalidSubsetError", + "PresetNotFoundError", + "ConflictingExecutionParamsError", + "NoModeProvidedError" + ], + "LaunchPipelineRunSuccess": [ + "LaunchRunSuccess" + ], + "RunsOrError": [ + "Runs", + "InvalidPipelineRunsFilterError", + "PythonError" + ], + "PipelineRuns": [ + "Runs" + ], + "RunGroupOrError": [ + "RunGroup", + "RunGroupNotFoundError", + "PythonError" + ], + "SensorOrError": [ + "Sensor", + "SensorNotFoundError", + "UnauthorizedError", + "PythonError" + ], + "SensorsOrError": [ + "Sensors", + "RepositoryNotFoundError", + "PythonError" + ], + "StopSensorMutationResultOrError": [ + "StopSensorMutationResult", + "UnauthorizedError", + "PythonError" + ], + "ISolidDefinition": [ + "CompositeSolidDefinition", + "SolidDefinition" + ], + "SolidContainer": [ + "Pipeline", + "PipelineSnapshot", + "Job", + "CompositeSolidDefinition", + "Graph" + ], + "SolidStepStatsOrError": [ + "SolidStepStatsConnection", + "SolidStepStatusUnavailableError" + ], + "WorkspaceOrError": [ + "Workspace", + "PythonError" + ], + "WorkspaceLocationStatusEntriesOrError": [ + "WorkspaceLocationStatusEntries", + "PythonError" + ], + "GraphOrError": [ + "Graph", + "GraphNotFoundError", + "PythonError" + ], + "ResourceDetailsOrError": [ + "ResourceDetails", + "ResourceNotFoundError", + "PythonError" + ], + "ResourcesOrError": [ + "ResourceDetailsList", + "RepositoryNotFoundError", + "PythonError" + ], + "EnvVarWithConsumersOrError": [ + "EnvVarWithConsumersList", + "PythonError" + ], + "RunTagKeysOrError": [ + "PythonError", + "RunTagKeys" + ], + "RunTagsOrError": [ + "PythonError", + "RunTags" + ], + "RunIdsOrError": [ + "RunIds", + "InvalidPipelineRunsFilterError", + "PythonError" + ], + "AssetNodeOrError": [ + "AssetNode", + "AssetNotFoundError" + ], + "PartitionBackfillOrError": [ + "PartitionBackfill", + "BackfillNotFoundError", + "PythonError" + ], + "PartitionBackfillsOrError": [ + "PartitionBackfills", + "PythonError" + ], + "EventConnectionOrError": [ + "EventConnection", + "RunNotFoundError", + "PythonError" + ], + "AutoMaterializeAssetEvaluationRecordsOrError": [ + "AutoMaterializeAssetEvaluationRecords", + "AutoMaterializeAssetEvaluationNeedsMigrationError" + ], + "PartitionKeysOrError": [ + "PartitionKeys", + "PartitionSubsetDeserializationError" + ], + "AutoMaterializeRuleEvaluationData": [ + "TextRuleEvaluationData", + "ParentMaterializedRuleEvaluationData", + "WaitingOnKeysRuleEvaluationData" + ], + "SensorDryRunResult": [ + "PythonError", + "SensorNotFoundError", + "DryRunInstigationTick" + ], + "ScheduleDryRunResult": [ + "DryRunInstigationTick", + "PythonError", + "ScheduleNotFoundError" + ], + "TerminateRunsResultOrError": [ + "TerminateRunsResult", + "PythonError" + ], + "AssetWipeMutationResult": [ + "AssetNotFoundError", + "UnauthorizedError", + "PythonError", + "AssetWipeSuccess" + ], + "ReportRunlessAssetEventsResult": [ + "UnauthorizedError", + "PythonError", + "ReportRunlessAssetEventsSuccess" + ], + "ResumeBackfillResult": [ + "ResumeBackfillSuccess", + "UnauthorizedError", + "PythonError" + ], + "CancelBackfillResult": [ + "CancelBackfillSuccess", + "UnauthorizedError", + "PythonError" + ], + "LogTelemetryMutationResult": [ + "LogTelemetrySuccess", + "PythonError" + ], + "AddDynamicPartitionResult": [ + "AddDynamicPartitionSuccess", + "UnauthorizedError", + "PythonError", + "DuplicateDynamicPartitionError" + ] +} \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 441b2cdb5d8cf..a9eded8e57181 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3162,6 +3162,11 @@ type Query { limit: Int! cursor: String ): AutoMaterializeAssetEvaluationRecordsOrError + assetConditionEvaluationsOrError( + assetKey: AssetKeyInput! + limit: Int! + cursor: String + ): AssetConditionEvaluationRecordsOrError autoMaterializeEvaluationsForEvaluationId( evaluationId: Int! ): AutoMaterializeAssetEvaluationRecordsOrError @@ -3390,6 +3395,76 @@ type AutoMaterializeAssetEvaluationNeedsMigrationError implements Error { message: String! } +union AssetConditionEvaluationRecordsOrError = + AssetConditionEvaluationRecords + | AutoMaterializeAssetEvaluationNeedsMigrationError + +type AssetConditionEvaluationRecords { + records: [AssetConditionEvaluationRecord!]! +} + +type AssetConditionEvaluationRecord { + id: ID! + evaluationId: Int! + runIds: [String!]! + timestamp: Float! + assetKey: AssetKey! + numRequested: Int! + evaluation: AssetConditionEvaluation! +} + +union AssetConditionEvaluation = + UnpartitionedAssetConditionEvaluation + | PartitionedAssetConditionEvaluation + | SpecificPartitionAssetConditionEvaluation + +type UnpartitionedAssetConditionEvaluation { + description: String! + startTimestamp: Float! + endTimestamp: Float! + metadataEntries: [MetadataEntry!]! + status: AssetConditionEvaluationStatus! + childEvaluations: [UnpartitionedAssetConditionEvaluation!] +} + +enum AssetConditionEvaluationStatus { + TRUE + FALSE + SKIPPED +} + +type PartitionedAssetConditionEvaluation { + description: String! + startTimestamp: Float! + endTimestamp: Float! + trueSubset: AssetSubset! + falseSubset: AssetSubset! + candidateSubset: AssetSubset + numTrue: Int! + numFalse: Int! + numSkipped: Int! + childEvaluations: [PartitionedAssetConditionEvaluation!] +} + +type AssetSubset { + assetKey: AssetKey! + subsetValue: AssetSubsetValue! +} + +type AssetSubsetValue { + boolValue: Boolean + partitionKeys: [String!] + partitionKeyRanges: [PartitionKeyRange!] + isPartitioned: Boolean! +} + +type SpecificPartitionAssetConditionEvaluation { + description: String! + metadataEntries: [MetadataEntry!]! + status: AssetConditionEvaluationStatus! + childEvaluations: [UnpartitionedAssetConditionEvaluation!] +} + type Mutation { launchPipelineExecution(executionParams: ExecutionParams!): LaunchRunResult! launchRun(executionParams: ExecutionParams!): LaunchRunResult! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 8949dd53e7ba7..3d4cb117e2a02 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -245,6 +245,37 @@ export type AssetChecksOrError = | AssetCheckNeedsUserCodeUpgrade | AssetChecks; +export type AssetConditionEvaluation = + | PartitionedAssetConditionEvaluation + | SpecificPartitionAssetConditionEvaluation + | UnpartitionedAssetConditionEvaluation; + +export type AssetConditionEvaluationRecord = { + __typename: 'AssetConditionEvaluationRecord'; + assetKey: AssetKey; + evaluation: AssetConditionEvaluation; + evaluationId: Scalars['Int']; + id: Scalars['ID']; + numRequested: Scalars['Int']; + runIds: Array; + timestamp: Scalars['Float']; +}; + +export type AssetConditionEvaluationRecords = { + __typename: 'AssetConditionEvaluationRecords'; + records: Array; +}; + +export type AssetConditionEvaluationRecordsOrError = + | AssetConditionEvaluationRecords + | AutoMaterializeAssetEvaluationNeedsMigrationError; + +export enum AssetConditionEvaluationStatus { + FALSE = 'FALSE', + SKIPPED = 'SKIPPED', + TRUE = 'TRUE', +} + export type AssetConnection = { __typename: 'AssetConnection'; nodes: Array; @@ -476,6 +507,20 @@ export type AssetPartitionsStatusCounts = { numPartitionsTargeted: Scalars['Int']; }; +export type AssetSubset = { + __typename: 'AssetSubset'; + assetKey: AssetKey; + subsetValue: AssetSubsetValue; +}; + +export type AssetSubsetValue = { + __typename: 'AssetSubsetValue'; + boolValue: Maybe; + isPartitioned: Scalars['Boolean']; + partitionKeyRanges: Maybe>; + partitionKeys: Maybe>; +}; + export type AssetWipeMutationResult = | AssetNotFoundError | AssetWipeSuccess @@ -2609,6 +2654,20 @@ export type PartitionTags = { export type PartitionTagsOrError = PartitionTags | PythonError; +export type PartitionedAssetConditionEvaluation = { + __typename: 'PartitionedAssetConditionEvaluation'; + candidateSubset: Maybe; + childEvaluations: Maybe>; + description: Scalars['String']; + endTimestamp: Scalars['Float']; + falseSubset: AssetSubset; + numFalse: Scalars['Int']; + numSkipped: Scalars['Int']; + numTrue: Scalars['Int']; + startTimestamp: Scalars['Float']; + trueSubset: AssetSubset; +}; + export type Partitions = { __typename: 'Partitions'; results: Array; @@ -2943,6 +3002,7 @@ export type Query = { allTopLevelResourceDetailsOrError: ResourcesOrError; assetBackfillPreview: Array; assetCheckExecutions: Array; + assetConditionEvaluationsOrError: Maybe; assetNodeDefinitionCollisions: Array; assetNodeOrError: AssetNodeOrError; assetNodes: Array; @@ -3009,6 +3069,12 @@ export type QueryAssetCheckExecutionsArgs = { limit: Scalars['Int']; }; +export type QueryAssetConditionEvaluationsOrErrorArgs = { + assetKey: AssetKeyInput; + cursor?: InputMaybe; + limit: Scalars['Int']; +}; + export type QueryAssetNodeDefinitionCollisionsArgs = { assetKeys?: InputMaybe>; }; @@ -4091,6 +4157,14 @@ export type SolidStepStatusUnavailableError = Error & { message: Scalars['String']; }; +export type SpecificPartitionAssetConditionEvaluation = { + __typename: 'SpecificPartitionAssetConditionEvaluation'; + childEvaluations: Maybe>; + description: Scalars['String']; + metadataEntries: Array; + status: AssetConditionEvaluationStatus; +}; + export type StaleCause = { __typename: 'StaleCause'; category: StaleCauseCategory; @@ -4400,6 +4474,16 @@ export type UnknownPipeline = PipelineReference & { solidSelection: Maybe>; }; +export type UnpartitionedAssetConditionEvaluation = { + __typename: 'UnpartitionedAssetConditionEvaluation'; + childEvaluations: Maybe>; + description: Scalars['String']; + endTimestamp: Scalars['Float']; + metadataEntries: Array; + startTimestamp: Scalars['Float']; + status: AssetConditionEvaluationStatus; +}; + export type UnpartitionedAssetStatus = { __typename: 'UnpartitionedAssetStatus'; assetKey: AssetKey; @@ -4913,6 +4997,51 @@ export const buildAssetChecks = ( }; }; +export const buildAssetConditionEvaluationRecord = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetConditionEvaluationRecord'} & AssetConditionEvaluationRecord => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetConditionEvaluationRecord'); + return { + __typename: 'AssetConditionEvaluationRecord', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + evaluation: + overrides && overrides.hasOwnProperty('evaluation') + ? overrides.evaluation! + : relationshipsToOmit.has('PartitionedAssetConditionEvaluation') + ? ({} as PartitionedAssetConditionEvaluation) + : buildPartitionedAssetConditionEvaluation({}, relationshipsToOmit), + evaluationId: + overrides && overrides.hasOwnProperty('evaluationId') ? overrides.evaluationId! : 5501, + id: + overrides && overrides.hasOwnProperty('id') + ? overrides.id! + : '1c158e55-c1c1-43c2-9f14-8e369549e154', + numRequested: + overrides && overrides.hasOwnProperty('numRequested') ? overrides.numRequested! : 2364, + runIds: overrides && overrides.hasOwnProperty('runIds') ? overrides.runIds! : [], + timestamp: overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 6.88, + }; +}; + +export const buildAssetConditionEvaluationRecords = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetConditionEvaluationRecords'} & AssetConditionEvaluationRecords => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetConditionEvaluationRecords'); + return { + __typename: 'AssetConditionEvaluationRecords', + records: overrides && overrides.hasOwnProperty('records') ? overrides.records! : [], + }; +}; + export const buildAssetConnection = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -5413,6 +5542,49 @@ export const buildAssetPartitionsStatusCounts = ( }; }; +export const buildAssetSubset = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetSubset'} & AssetSubset => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetSubset'); + return { + __typename: 'AssetSubset', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + subsetValue: + overrides && overrides.hasOwnProperty('subsetValue') + ? overrides.subsetValue! + : relationshipsToOmit.has('AssetSubsetValue') + ? ({} as AssetSubsetValue) + : buildAssetSubsetValue({}, relationshipsToOmit), + }; +}; + +export const buildAssetSubsetValue = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetSubsetValue'} & AssetSubsetValue => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetSubsetValue'); + return { + __typename: 'AssetSubsetValue', + boolValue: overrides && overrides.hasOwnProperty('boolValue') ? overrides.boolValue! : false, + isPartitioned: + overrides && overrides.hasOwnProperty('isPartitioned') ? overrides.isPartitioned! : false, + partitionKeyRanges: + overrides && overrides.hasOwnProperty('partitionKeyRanges') + ? overrides.partitionKeyRanges! + : [], + partitionKeys: + overrides && overrides.hasOwnProperty('partitionKeys') ? overrides.partitionKeys! : [], + }; +}; + export const buildAssetWipeSuccess = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -9475,6 +9647,46 @@ export const buildPartitionTags = ( }; }; +export const buildPartitionedAssetConditionEvaluation = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'PartitionedAssetConditionEvaluation'} & PartitionedAssetConditionEvaluation => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PartitionedAssetConditionEvaluation'); + return { + __typename: 'PartitionedAssetConditionEvaluation', + candidateSubset: + overrides && overrides.hasOwnProperty('candidateSubset') + ? overrides.candidateSubset! + : relationshipsToOmit.has('AssetSubset') + ? ({} as AssetSubset) + : buildAssetSubset({}, relationshipsToOmit), + childEvaluations: + overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + description: + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'non', + endTimestamp: + overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 6.63, + falseSubset: + overrides && overrides.hasOwnProperty('falseSubset') + ? overrides.falseSubset! + : relationshipsToOmit.has('AssetSubset') + ? ({} as AssetSubset) + : buildAssetSubset({}, relationshipsToOmit), + numFalse: overrides && overrides.hasOwnProperty('numFalse') ? overrides.numFalse! : 7739, + numSkipped: overrides && overrides.hasOwnProperty('numSkipped') ? overrides.numSkipped! : 7712, + numTrue: overrides && overrides.hasOwnProperty('numTrue') ? overrides.numTrue! : 6991, + startTimestamp: + overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 3.43, + trueSubset: + overrides && overrides.hasOwnProperty('trueSubset') + ? overrides.trueSubset! + : relationshipsToOmit.has('AssetSubset') + ? ({} as AssetSubset) + : buildAssetSubset({}, relationshipsToOmit), + }; +}; + export const buildPartitions = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -10164,6 +10376,12 @@ export const buildQuery = ( overrides && overrides.hasOwnProperty('assetCheckExecutions') ? overrides.assetCheckExecutions! : [], + assetConditionEvaluationsOrError: + overrides && overrides.hasOwnProperty('assetConditionEvaluationsOrError') + ? overrides.assetConditionEvaluationsOrError! + : relationshipsToOmit.has('AssetConditionEvaluationRecords') + ? ({} as AssetConditionEvaluationRecords) + : buildAssetConditionEvaluationRecords({}, relationshipsToOmit), assetNodeDefinitionCollisions: overrides && overrides.hasOwnProperty('assetNodeDefinitionCollisions') ? overrides.assetNodeDefinitionCollisions! @@ -12266,6 +12484,29 @@ export const buildSolidStepStatusUnavailableError = ( }; }; +export const buildSpecificPartitionAssetConditionEvaluation = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): { + __typename: 'SpecificPartitionAssetConditionEvaluation'; +} & SpecificPartitionAssetConditionEvaluation => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('SpecificPartitionAssetConditionEvaluation'); + return { + __typename: 'SpecificPartitionAssetConditionEvaluation', + childEvaluations: + overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + description: + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'vel', + metadataEntries: + overrides && overrides.hasOwnProperty('metadataEntries') ? overrides.metadataEntries! : [], + status: + overrides && overrides.hasOwnProperty('status') + ? overrides.status! + : AssetConditionEvaluationStatus.FALSE, + }; +}; + export const buildStaleCause = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -12923,6 +13164,33 @@ export const buildUnknownPipeline = ( }; }; +export const buildUnpartitionedAssetConditionEvaluation = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): { + __typename: 'UnpartitionedAssetConditionEvaluation'; +} & UnpartitionedAssetConditionEvaluation => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('UnpartitionedAssetConditionEvaluation'); + return { + __typename: 'UnpartitionedAssetConditionEvaluation', + childEvaluations: + overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + description: + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'deserunt', + endTimestamp: + overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 7.57, + metadataEntries: + overrides && overrides.hasOwnProperty('metadataEntries') ? overrides.metadataEntries! : [], + startTimestamp: + overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 0.96, + status: + overrides && overrides.hasOwnProperty('status') + ? overrides.status! + : AssetConditionEvaluationStatus.FALSE, + }; +}; + export const buildUnpartitionedAssetStatus = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py new file mode 100644 index 0000000000000..b96f42a563e6e --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py @@ -0,0 +1,132 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +import dagster._check as check +from dagster import AssetKey +from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord + +from dagster_graphql.implementation.fetch_assets import get_asset_nodes_by_asset_key +from dagster_graphql.schema.asset_condition_evaluations import ( + GrapheneAssetConditionEvaluation, + GrapheneAssetConditionEvaluationRecord, + GrapheneAssetConditionEvaluationRecords, + GrapheneAssetConditionEvaluationRecordsOrError, + GrapheneSpecificPartitionAssetConditionEvaluation, +) +from dagster_graphql.schema.auto_materialize_asset_evaluations import ( + GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, +) +from dagster_graphql.schema.inputs import GrapheneAssetKeyInput + +if TYPE_CHECKING: + from ..schema.util import ResolveInfo + + +def _get_migration_error( + graphene_info: "ResolveInfo", +) -> Optional[GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError]: + if graphene_info.context.instance.schedule_storage is None: + return GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError( + message="Instance does not have schedule storage configured, cannot fetch evaluations." + ) + if not graphene_info.context.instance.schedule_storage.supports_auto_materialize_asset_evaluations: + return GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError( + message=( + "Auto materialize evaluations are not getting logged. Run `dagster instance" + " migrate` to enable." + ) + ) + return None + + +def _get_graphene_records_from_evaluations( + graphene_info: "ResolveInfo", + evaluation_records: Sequence[AutoMaterializeAssetEvaluationRecord], +) -> GrapheneAssetConditionEvaluationRecords: + asset_keys = {record.asset_key for record in evaluation_records} + + partitions_defs = {} + + nodes = get_asset_nodes_by_asset_key(graphene_info) + for asset_key in asset_keys: + asset_node = nodes.get(asset_key) + partitions_defs[asset_key] = ( + asset_node.external_asset_node.partitions_def_data.get_partitions_definition() + if asset_node and asset_node.external_asset_node.partitions_def_data + else None + ) + + return GrapheneAssetConditionEvaluationRecords( + records=[ + GrapheneAssetConditionEvaluationRecord( + evaluation, partitions_defs[evaluation.asset_key], graphene_info.context.instance + ) + for evaluation in evaluation_records + ] + ) + + +def fetch_asset_condition_evaluation_record_for_partition( + graphene_info: "ResolveInfo", + graphene_asset_key: GrapheneAssetKeyInput, + evaluation_id: int, + partition_key: str, +) -> GrapheneAssetConditionEvaluation: + asset_key = AssetKey.from_graphql_input(graphene_asset_key) + schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage) + record = next( + iter( + schedule_storage.get_auto_materialize_asset_evaluations( + asset_key, cursor=evaluation_id + 1, limit=1 + ) + ) + ) + asset_node = get_asset_nodes_by_asset_key(graphene_info).get(asset_key) + partitions_def = ( + asset_node.external_asset_node.partitions_def_data.get_partitions_definition() + if asset_node and asset_node.external_asset_node.partitions_def_data + else None + ) + return GrapheneSpecificPartitionAssetConditionEvaluation( + record.get_evaluation_with_run_ids(partitions_def).evaluation, partition_key + ) + + +def fetch_asset_condition_evaluation_records_for_asset_key( + graphene_info: "ResolveInfo", + graphene_asset_key: GrapheneAssetKeyInput, + limit: int, + cursor: Optional[str], +) -> GrapheneAssetConditionEvaluationRecordsOrError: + """Fetch asset policy evaluations from storage.""" + migration_error = _get_migration_error(graphene_info) + if migration_error: + return migration_error + + asset_key = AssetKey.from_graphql_input(graphene_asset_key) + + schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage) + return _get_graphene_records_from_evaluations( + graphene_info, + schedule_storage.get_auto_materialize_asset_evaluations( + asset_key=asset_key, + limit=limit, + cursor=int(cursor) if cursor else None, + ), + ) + + +def fetch_asset_condition_evaluation_records_for_evaluation_id( + graphene_info: "ResolveInfo", + evaluation_id: int, +) -> GrapheneAssetConditionEvaluationRecordsOrError: + migration_error = _get_migration_error(graphene_info) + if migration_error: + return migration_error + + schedule_storage = check.not_none(graphene_info.context.instance.schedule_storage) + return _get_graphene_records_from_evaluations( + graphene_info, + schedule_storage.get_auto_materialize_evaluations_for_evaluation_id( + evaluation_id=evaluation_id + ), + ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py new file mode 100644 index 0000000000000..9b81ff2204b70 --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py @@ -0,0 +1,308 @@ +import enum +from typing import Optional, Sequence, Union + +import graphene +import pendulum +from dagster._core.definitions.asset_condition import AssetConditionEvaluation +from dagster._core.definitions.asset_subset import AssetSubset +from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset +from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset +from dagster._core.instance import DynamicPartitionsStore +from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord + +from dagster_graphql.schema.auto_materialize_asset_evaluations import ( + GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, +) +from dagster_graphql.schema.metadata import GrapheneMetadataEntry + +from .asset_key import GrapheneAssetKey +from .partition_sets import GraphenePartitionKeyRange +from .util import ResolveInfo, non_null_list + + +class AssetConditionEvaluationStatus(enum.Enum): + TRUE = "TRUE" + FALSE = "FALSE" + SKIPPED = "SKIPPED" + + +GrapheneAssetConditionEvaluationStatus = graphene.Enum.from_enum(AssetConditionEvaluationStatus) + + +class GrapheneAssetSubsetValue(graphene.ObjectType): + class Meta: + name = "AssetSubsetValue" + + boolValue = graphene.Field(graphene.Boolean) + partitionKeys = graphene.List(graphene.NonNull(graphene.String)) + partitionKeyRanges = graphene.List(graphene.NonNull(GraphenePartitionKeyRange)) + + isPartitioned = graphene.NonNull(graphene.Boolean) + + def __init__(self, value: Union[bool, PartitionsSubset]): + bool_value, partition_keys, partition_key_ranges = None, None, None + if isinstance(value, bool): + bool_value = value + elif isinstance(value, BaseTimeWindowPartitionsSubset): + partition_key_ranges = [ + GraphenePartitionKeyRange(start, end) + for start, end in value.get_partition_key_ranges(value.partitions_def) + ] + else: + partition_keys = value.get_partition_keys() + + super().__init__( + boolValue=bool_value, + partitionKeys=partition_keys, + partitionKeyRanges=partition_key_ranges, + ) + + def resolve_isPartitioned(self, graphene_info: ResolveInfo) -> bool: + return self.boolValue is not None + + +class GrapheneAssetSubset(graphene.ObjectType): + assetKey = graphene.NonNull(GrapheneAssetKey) + subsetValue = graphene.NonNull(GrapheneAssetSubsetValue) + + class Meta: + name = "AssetSubset" + + def __init__(self, asset_subset: AssetSubset): + super().__init__( + assetKey=GrapheneAssetKey(path=asset_subset.asset_key.path), + subsetValue=GrapheneAssetSubsetValue(asset_subset.subset_value), + ) + + +class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType): + description = graphene.NonNull(graphene.String) + + startTimestamp = graphene.Field(graphene.Float) + endTimestamp = graphene.Field(graphene.Float) + + metadataEntries = non_null_list(GrapheneMetadataEntry) + status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) + + childEvaluations = graphene.Field( + graphene.List(graphene.NonNull(lambda: GrapheneUnpartitionedAssetConditionEvaluation)) + ) + + class Meta: + name = "UnpartitionedAssetConditionEvaluation" + + def __init__(self, evaluation: AssetConditionEvaluation): + if evaluation.true_subset.bool_value: + status = AssetConditionEvaluationStatus.TRUE + elif evaluation.candidate_subset and evaluation.candidate_subset.bool_value: + status = AssetConditionEvaluationStatus.FALSE + else: + status = AssetConditionEvaluationStatus.SKIPPED + + super().__init__( + description=evaluation.condition_snapshot.description, + startTimestamp=evaluation.start_timestamp, + endTimestamp=evaluation.end_timestamp, + status=status, + childEvaluations=[ + GrapheneUnpartitionedAssetConditionEvaluation(child) + for child in evaluation.child_evaluations + ], + ) + + def resolve_metadataEntries( + self, graphene_info: ResolveInfo + ) -> Sequence[GrapheneMetadataEntry]: + metadata = next( + (subset.metadata for subset in self._evaluation.subsets_with_metadata), + {}, + ) + return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + + +class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType): + description = graphene.NonNull(graphene.String) + + startTimestamp = graphene.Field(graphene.Float) + endTimestamp = graphene.Field(graphene.Float) + + trueSubset = graphene.NonNull(GrapheneAssetSubset) + falseSubset = graphene.NonNull(GrapheneAssetSubset) + candidateSubset = graphene.Field(GrapheneAssetSubset) + + numTrue = graphene.NonNull(graphene.Int) + numFalse = graphene.NonNull(graphene.Int) + numSkipped = graphene.NonNull(graphene.Int) + + childEvaluations = graphene.Field( + graphene.List(graphene.NonNull(lambda: GraphenePartitionedAssetConditionEvaluation)) + ) + + class Meta: + name = "PartitionedAssetConditionEvaluation" + + def __init__( + self, + evaluation: AssetConditionEvaluation, + partitions_def: Optional[PartitionsDefinition], + dynamic_partitions_store: DynamicPartitionsStore, + ): + self._partitions_def = partitions_def + self._true_subset = evaluation.true_subset + + self._all_subset = AssetSubset.all( + evaluation.asset_key, partitions_def, dynamic_partitions_store, pendulum.now("UTC") + ) + + # if the candidate_subset is unset, then we evaluated all partitions + self._candidate_subset = evaluation.candidate_subset or self._all_subset + + super().__init__( + description=evaluation.condition_snapshot.description, + startTimestamp=evaluation.start_timestamp, + endTimestamp=evaluation.end_timestamp, + trueSubset=GrapheneAssetSubset(evaluation.true_subset), + candidateSubset=GrapheneAssetSubset(self._candidate_subset), + childEvaluations=[ + GraphenePartitionedAssetConditionEvaluation( + child, partitions_def, dynamic_partitions_store + ) + for child in evaluation.child_evaluations + ], + ) + + def resolve_numTrue(self, graphene_info: ResolveInfo) -> int: + return self._true_subset.size + + def resolve_numFalse(self, graphene_info: ResolveInfo) -> int: + return self._candidate_subset.size - self._true_subset.size + + def resolve_falseSubset(self, graphene_info: ResolveInfo) -> GrapheneAssetSubset: + return GrapheneAssetSubset(self._candidate_subset - self._true_subset) + + def resolve_numSkipped(self, graphene_info: ResolveInfo) -> int: + return self._all_subset.size - self._candidate_subset.size + + +class GrapheneSpecificPartitionAssetConditionEvaluation(graphene.ObjectType): + description = graphene.NonNull(graphene.String) + + metadataEntries = non_null_list(GrapheneMetadataEntry) + status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) + + childEvaluations = graphene.Field( + graphene.List(graphene.NonNull(lambda: GrapheneSpecificPartitionAssetConditionEvaluation)) + ) + + class Meta: + name = "SpecificPartitionAssetConditionEvaluation" + + def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): + self._evaluation = evaluation + self._partition_key = partition_key + + if partition_key in evaluation.true_subset.subset_value: + status = AssetConditionEvaluationStatus.TRUE + elif ( + evaluation.candidate_subset is None + or partition_key in evaluation.candidate_subset.subset_value + ): + status = AssetConditionEvaluationStatus.FALSE + else: + status = AssetConditionEvaluationStatus.SKIPPED + + super().__init__( + description=evaluation.condition_snapshot.description, + status=status, + childEvaluations=[ + GrapheneSpecificPartitionAssetConditionEvaluation(child, partition_key) + for child in evaluation.child_evaluations + ], + ) + + def resolve_metadataEntries( + self, graphene_info: ResolveInfo + ) -> Sequence[GrapheneMetadataEntry]: + # find the metadata associated with a subset that contains this partition key + metadata = next( + ( + subset.metadata + for subset in self._evaluation.subsets_with_metadata + if self._partition_key in subset.subset.subset_value + ), + {}, + ) + return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + + +class GrapheneAssetConditionEvaluation(graphene.Union): + class Meta: + types = ( + GrapheneUnpartitionedAssetConditionEvaluation, + GraphenePartitionedAssetConditionEvaluation, + GrapheneSpecificPartitionAssetConditionEvaluation, + ) + name = "AssetConditionEvaluation" + + +class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType): + id = graphene.NonNull(graphene.ID) + evaluationId = graphene.NonNull(graphene.Int) + runIds = non_null_list(graphene.String) + timestamp = graphene.NonNull(graphene.Float) + + assetKey = graphene.NonNull(GrapheneAssetKey) + numRequested = graphene.NonNull(graphene.Int) + + evaluation = graphene.NonNull(GrapheneAssetConditionEvaluation) + + class Meta: + name = "AssetConditionEvaluationRecord" + + def __init__( + self, + record: AutoMaterializeAssetEvaluationRecord, + partitions_def: Optional[PartitionsDefinition], + dynamic_partitions_store: DynamicPartitionsStore, + partition_key: Optional[str] = None, + ): + evaluation_with_run_ids = record.get_evaluation_with_run_ids(partitions_def) + if evaluation_with_run_ids.evaluation.true_subset.is_partitioned: + if partition_key is None: + evaluation = GraphenePartitionedAssetConditionEvaluation( + evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store + ) + else: + evaluation = GrapheneSpecificPartitionAssetConditionEvaluation( + evaluation_with_run_ids.evaluation, partition_key + ) + else: + evaluation = GrapheneUnpartitionedAssetConditionEvaluation( + evaluation_with_run_ids.evaluation + ) + + super().__init__( + id=record.id, + evaluationId=record.evaluation_id, + timestamp=record.timestamp, + runIds=evaluation_with_run_ids.run_ids, + assetKey=GrapheneAssetKey(path=record.asset_key.path), + numRequested=evaluation_with_run_ids.evaluation.true_subset.size, + evaluation=evaluation, + ) + + +class GrapheneAssetConditionEvaluationRecords(graphene.ObjectType): + records = non_null_list(GrapheneAssetConditionEvaluationRecord) + + class Meta: + name = "AssetConditionEvaluationRecords" + + +class GrapheneAssetConditionEvaluationRecordsOrError(graphene.Union): + class Meta: + types = ( + GrapheneAssetConditionEvaluationRecords, + GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, + ) + name = "AssetConditionEvaluationRecordsOrError" diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 403d092453874..2a3a2181f96ad 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -923,9 +923,7 @@ def _get_automation_policy_external_sensor(self) -> Optional[ExternalSensor]: return matching_sensors[0] def resolve_currentAutoMaterializeEvaluationId(self, graphene_info): - from dagster._daemon.asset_daemon import ( - get_current_evaluation_id, - ) + from dagster._daemon.asset_daemon import get_current_evaluation_id instance = graphene_info.context.instance if instance.auto_materialize_use_automation_policy_sensors: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 080d035c97711..e0baa9225b6ec 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -23,6 +23,11 @@ from dagster_graphql.implementation.asset_checks_loader import AssetChecksLoader from dagster_graphql.implementation.execution.backfill import get_asset_backfill_preview +from dagster_graphql.implementation.fetch_asset_condition_evaluations import ( + fetch_asset_condition_evaluation_record_for_partition, + fetch_asset_condition_evaluation_records_for_asset_key, + fetch_asset_condition_evaluation_records_for_evaluation_id, +) from dagster_graphql.implementation.fetch_auto_materialize_asset_evaluations import ( fetch_auto_materialize_asset_evaluations, fetch_auto_materialize_asset_evaluations_for_evaluation_id, @@ -30,6 +35,10 @@ from dagster_graphql.implementation.fetch_env_vars import get_utilized_env_vars_or_error from dagster_graphql.implementation.fetch_logs import get_captured_log_metadata from dagster_graphql.implementation.fetch_runs import get_assets_latest_info +from dagster_graphql.schema.asset_condition_evaluations import ( + GrapheneAssetConditionEvaluationRecordsOrError, + GrapheneSpecificPartitionAssetConditionEvaluation, +) from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationRecordsOrError, ) @@ -513,6 +522,28 @@ class Meta: ), ) + assetConditionEvaluationForPartition = graphene.Field( + GrapheneSpecificPartitionAssetConditionEvaluation, + assetKey=graphene.Argument(graphene.NonNull(GrapheneAssetKeyInput)), + evaluationId=graphene.Argument(graphene.NonNull(graphene.Int)), + partition=graphene.Argument(graphene.NonNull(graphene.String)), + description="Retrieve the condition evaluation for an asset and partition.", + ) + + assetConditionEvaluationRecordsOrError = graphene.Field( + GrapheneAssetConditionEvaluationRecordsOrError, + assetKey=graphene.Argument(graphene.NonNull(GrapheneAssetKeyInput)), + limit=graphene.Argument(graphene.NonNull(graphene.Int)), + cursor=graphene.Argument(graphene.String), + description="Retrieve the condition evaluation records for an asset.", + ) + + assetConditionEvaluationsForEvaluationId = graphene.Field( + GrapheneAssetConditionEvaluationRecordsOrError, + evaluationId=graphene.Argument(graphene.NonNull(graphene.Int)), + description=("Retrieve the condition evaluation records for a given evaluation ID."), + ) + autoMaterializeTicks = graphene.Field( non_null_list(GrapheneInstigationTick), dayRange=graphene.Int(), @@ -1092,6 +1123,38 @@ def resolve_autoMaterializeEvaluationsForEvaluationId( graphene_info=graphene_info, evaluation_id=evaluationId ) + def resolve_assetConditionEvaluationForPartition( + self, + graphene_info: ResolveInfo, + assetKey: GrapheneAssetKeyInput, + evaluationId: int, + partition: str, + ): + return fetch_asset_condition_evaluation_record_for_partition( + graphene_info=graphene_info, + graphene_asset_key=assetKey, + evaluation_id=evaluationId, + partition_key=partition, + ) + + def resolve_assetConditionEvaluationRecordsOrError( + self, + graphene_info: ResolveInfo, + assetKey: GrapheneAssetKeyInput, + limit: int, + cursor: Optional[str] = None, + ): + return fetch_asset_condition_evaluation_records_for_asset_key( + graphene_info=graphene_info, graphene_asset_key=assetKey, cursor=cursor, limit=limit + ) + + def resolve_assetConditionEvaluationsForEvaluationId( + self, graphene_info: ResolveInfo, evaluationId: int + ): + return fetch_asset_condition_evaluation_records_for_evaluation_id( + graphene_info=graphene_info, evaluation_id=evaluationId + ) + def resolve_autoMaterializeTicks( self, graphene_info, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py new file mode 100644 index 0000000000000..d62d4b8524bcf --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -0,0 +1,895 @@ +from typing import Optional, Sequence +from unittest.mock import PropertyMock, patch + +import dagster._check as check +import pendulum +from dagster import AssetKey, RunRequest +from dagster._core.definitions.asset_condition import ( + AssetConditionEvaluation, + AssetConditionEvaluationWithRunIds, + AssetConditionSnapshot, + HistoricalAllPartitionsSubset, +) +from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor +from dagster._core.definitions.asset_subset import AssetSubset +from dagster._core.definitions.auto_materialize_rule_evaluation import ( + deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids, +) +from dagster._core.definitions.partition import PartitionsDefinition, StaticPartitionsDefinition +from dagster._core.definitions.run_request import ( + InstigatorType, +) +from dagster._core.definitions.sensor_definition import SensorType +from dagster._core.host_representation.origin import ( + ExternalInstigatorOrigin, +) +from dagster._core.scheduler.instigation import ( + InstigatorState, + InstigatorStatus, + SensorInstigatorData, + TickData, + TickStatus, +) +from dagster._core.workspace.context import WorkspaceRequestContext +from dagster._daemon.asset_daemon import ( + _PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY, + _PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME, + _PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, + _PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, +) +from dagster._serdes.serdes import serialize_value +from dagster_graphql.test.utils import execute_dagster_graphql, infer_repository + +from dagster_graphql_tests.graphql.graphql_context_test_suite import ( + ExecutingGraphQLContextTestMatrix, +) + +TICKS_QUERY = """ +query AssetDameonTicksQuery($dayRange: Int, $dayOffset: Int, $statuses: [InstigationTickStatus!], $limit: Int, $cursor: String, $beforeTimestamp: Float, $afterTimestamp: Float) { + autoMaterializeTicks(dayRange: $dayRange, dayOffset: $dayOffset, statuses: $statuses, limit: $limit, cursor: $cursor, beforeTimestamp: $beforeTimestamp, afterTimestamp: $afterTimestamp) { + id + timestamp + endTimestamp + status + requestedAssetKeys { + path + } + requestedMaterializationsForAssets { + assetKey { + path + } + partitionKeys + } + requestedAssetMaterializationCount + autoMaterializeAssetEvaluationId + } +} +""" + + +def _create_tick(instance, status, timestamp, evaluation_id, run_requests=None, end_timestamp=None): + return instance.create_tick( + TickData( + instigator_origin_id=_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, + instigator_name=_PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME, + instigator_type=InstigatorType.AUTO_MATERIALIZE, + status=status, + timestamp=timestamp, + end_timestamp=end_timestamp, + selector_id=_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, + run_ids=[], + auto_materialize_evaluation_id=evaluation_id, + run_requests=run_requests, + ) + ) + + +class TestAutoMaterializeTicks(ExecutingGraphQLContextTestMatrix): + def test_get_tick_range(self, graphql_context): + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={"dayRange": None, "dayOffset": None}, + ) + assert len(result.data["autoMaterializeTicks"]) == 0 + + now = pendulum.now("UTC") + end_timestamp = now.timestamp() + 20 + + success_1 = _create_tick( + graphql_context.instance, + TickStatus.SUCCESS, + now.timestamp(), + end_timestamp=end_timestamp, + evaluation_id=3, + run_requests=[ + RunRequest(asset_selection=[AssetKey("foo"), AssetKey("bar")], partition_key="abc"), + RunRequest(asset_selection=[AssetKey("bar")], partition_key="def"), + RunRequest(asset_selection=[AssetKey("baz")], partition_key=None), + ], + ) + + success_2 = _create_tick( + graphql_context.instance, + TickStatus.SUCCESS, + now.subtract(days=1, hours=1).timestamp(), + evaluation_id=2, + ) + + _create_tick( + graphql_context.instance, + TickStatus.SKIPPED, + now.subtract(days=2, hours=1).timestamp(), + evaluation_id=1, + ) + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={"dayRange": None, "dayOffset": None}, + ) + assert len(result.data["autoMaterializeTicks"]) == 3 + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={"dayRange": 1, "dayOffset": None}, + ) + assert len(result.data["autoMaterializeTicks"]) == 1 + tick = result.data["autoMaterializeTicks"][0] + assert tick["endTimestamp"] == end_timestamp + assert tick["autoMaterializeAssetEvaluationId"] == 3 + assert sorted(tick["requestedAssetKeys"], key=lambda x: x["path"][0]) == [ + {"path": ["bar"]}, + {"path": ["baz"]}, + {"path": ["foo"]}, + ] + + asset_materializations = tick["requestedMaterializationsForAssets"] + by_asset_key = { + AssetKey.from_coercible(mat["assetKey"]["path"]).to_user_string(): mat["partitionKeys"] + for mat in asset_materializations + } + + assert {key: sorted(val) for key, val in by_asset_key.items()} == { + "foo": ["abc"], + "bar": ["abc", "def"], + "baz": [], + } + + assert tick["requestedAssetMaterializationCount"] == 4 + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={ + "beforeTimestamp": success_2.timestamp + 1, + "afterTimestamp": success_2.timestamp - 1, + }, + ) + assert len(result.data["autoMaterializeTicks"]) == 1 + tick = result.data["autoMaterializeTicks"][0] + assert ( + tick["autoMaterializeAssetEvaluationId"] + == success_2.tick_data.auto_materialize_evaluation_id + ) + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={"dayRange": None, "dayOffset": None, "statuses": ["SUCCESS"]}, + ) + assert len(result.data["autoMaterializeTicks"]) == 2 + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={"dayRange": None, "dayOffset": None, "statuses": ["SUCCESS"], "limit": 1}, + ) + ticks = result.data["autoMaterializeTicks"] + assert len(ticks) == 1 + assert ticks[0]["timestamp"] == success_1.timestamp + assert ( + ticks[0]["autoMaterializeAssetEvaluationId"] + == success_1.tick_data.auto_materialize_evaluation_id + ) + + cursor = ticks[0]["id"] + + result = execute_dagster_graphql( + graphql_context, + TICKS_QUERY, + variables={ + "dayRange": None, + "dayOffset": None, + "statuses": ["SUCCESS"], + "limit": 1, + "cursor": cursor, + }, + ) + ticks = result.data["autoMaterializeTicks"] + assert len(ticks) == 1 + assert ticks[0]["timestamp"] == success_2.timestamp + + +FRAGMENTS = """ +fragment unpartitionedEvaluationFields on UnpartitionedAssetConditionEvaluation { + description + startTimestamp + endTimestamp + status +} + +fragment partitionedEvaluationFields on PartitionedAssetConditionEvaluation { + description + startTimestamp + endTimestamp + numTrue + numFalse + numSkipped + trueSubset { + subsetValue { + isPartitioned + partitionKeys + } + } + falseSubset { + subsetValue { + isPartitioned + partitionKeys + } + } +} + +fragment evaluationFields on AssetConditionEvaluation { + ... on UnpartitionedAssetConditionEvaluation { + ...unpartitionedEvaluationFields + childEvaluations { + ...unpartitionedEvaluationFields + childEvaluations { + ...unpartitionedEvaluationFields + childEvaluations { + ...unpartitionedEvaluationFields + childEvaluations { + ...unpartitionedEvaluationFields + } + } + } + } + } + ... on PartitionedAssetConditionEvaluation { + ...partitionedEvaluationFields + childEvaluations { + ...partitionedEvaluationFields + childEvaluations { + ...partitionedEvaluationFields + childEvaluations { + ...partitionedEvaluationFields + childEvaluations { + ...partitionedEvaluationFields + } + } + } + } + } +} +""" +QUERY = ( + FRAGMENTS + + """ +query GetEvaluationsQuery($assetKey: AssetKeyInput!, $limit: Int!, $cursor: String) { + assetNodeOrError(assetKey: $assetKey) { + ... on AssetNode { + currentAutoMaterializeEvaluationId + automationPolicySensor { + name + } + } + } + assetConditionEvaluationRecordsOrError(assetKey: $assetKey, limit: $limit, cursor: $cursor) { + ... on AssetConditionEvaluationRecords { + records { + id + numRequested + assetKey { + path + } + evaluation { + ...evaluationFields + } + } + } + } +} +""" +) + +QUERY_FOR_SPECIFIC_PARTITION = """ +fragment specificPartitionEvaluationFields on SpecificPartitionAssetConditionEvaluation { + description + status +} +query GetPartitionEvaluationQuery($assetKey: AssetKeyInput!, $partition: String!, $evaluationId: Int!) { + assetConditionEvaluationForPartition(assetKey: $assetKey, partition: $partition, evaluationId: $evaluationId) { + ...specificPartitionEvaluationFields + childEvaluations { + ...specificPartitionEvaluationFields + childEvaluations { + ...specificPartitionEvaluationFields + childEvaluations { + ...specificPartitionEvaluationFields + childEvaluations { + ...specificPartitionEvaluationFields + } + } + } + } + } +} +""" + +QUERY_FOR_EVALUATION_ID = ( + FRAGMENTS + + """ +query GetEvaluationsForEvaluationIdQuery($evaluationId: Int!) { + assetConditionEvaluationsForEvaluationId(evaluationId: $evaluationId) { + ... on AssetConditionEvaluationRecords { + records { + id + numRequested + assetKey { + path + } + evaluation { + ...evaluationFields + } + } + } + } +} +""" +) + + +class TestAutoMaterializeAssetEvaluations(ExecutingGraphQLContextTestMatrix): + def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext): + sensor_origin = ExternalInstigatorOrigin( + external_repository_origin=infer_repository(graphql_context).get_external_origin(), + instigator_name="my_automation_policy_sensor", + ) + + check.not_none(graphql_context.instance.schedule_storage).add_instigator_state( + InstigatorState( + sensor_origin, + InstigatorType.SENSOR, + status=InstigatorStatus.RUNNING, + instigator_data=SensorInstigatorData( + sensor_type=SensorType.AUTOMATION_POLICY, + cursor=serialize_value(AssetDaemonCursor.empty(12345)), + ), + ) + ) + + with patch( + "dagster._core.instance.DagsterInstance.auto_materialize_use_automation_policy_sensors", + new_callable=PropertyMock, + ) as mock_my_property: + mock_my_property.return_value = True + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={ + "assetKey": {"path": ["fresh_diamond_bottom"]}, + "limit": 10, + "cursor": None, + }, + ) + assert ( + results.data["assetNodeOrError"]["automationPolicySensor"]["name"] + == "my_automation_policy_sensor" + ) + assert results.data["assetNodeOrError"]["currentAutoMaterializeEvaluationId"] == 12345 + + def test_get_historic_rules_without_evaluation_data( + self, graphql_context: WorkspaceRequestContext + ): + evaluation1 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_one"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 0, "partition_subsets_by_condition": [], "rule_snapshots": null, "run_ids": {"__set__": []}}', + None, + ) + evaluation2 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_two"]}, "num_discarded": 0, "num_requested": 1, "num_skipped": 0, "partition_subsets_by_condition": [], "rule_snapshots": [{"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "materialization is missing"}], "run_ids": {"__set__": []}}', + None, + ) + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=10, asset_evaluations=[evaluation1, evaluation2] + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_one"]}, "limit": 10, "cursor": None}, + ) + assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 + asset_one_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] + assert asset_one_record["assetKey"] == {"path": ["asset_one"]} + assert asset_one_record["evaluation"]["status"] == "SKIPPED" + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, + ) + assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 + asset_two_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] + assert asset_two_record["evaluation"]["description"] == "All of" + assert asset_two_record["evaluation"]["status"] == "SKIPPED" + asset_two_children = asset_two_record["evaluation"]["childEvaluations"] + assert len(asset_two_children) == 2 + assert asset_two_children[0]["description"] == "Any of" + assert asset_two_children[0]["status"] == "SKIPPED" + assert ( + asset_two_children[0]["childEvaluations"][0]["description"] + == "materialization is missing" + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY_FOR_EVALUATION_ID, + variables={"evaluationId": 10}, + ) + + records = results.data["assetConditionEvaluationsForEvaluationId"]["records"] + + assert len(records) == 2 + + # record from both previous queries are contained here + assert any(record == asset_one_record for record in records) + assert any(record == asset_two_record for record in records) + + results = execute_dagster_graphql( + graphql_context, + QUERY_FOR_EVALUATION_ID, + variables={"evaluationId": 12345}, + ) + + records = results.data["assetConditionEvaluationsForEvaluationId"]["records"] + assert len(records) == 0 + + def test_get_historic_evaluation_with_evaluation_data( + self, graphql_context: WorkspaceRequestContext + ): + evaluation = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["upstream_static_partitioned_asset"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 1, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "WaitingOnAssetsRuleEvaluationData", "waiting_on_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["blah"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnRequiredButNonexistentParentsRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "required parent partitions do not exist"}}, {"__class__": "SerializedPartitionsSubset", "serialized_partitions_def_class_name": "StaticPartitionsDefinition", "serialized_partitions_def_unique_id": "7c2047f8b02e90a69136c1a657bd99ad80b433a2", "serialized_subset": "{\\"version\\": 1, \\"subset\\": [\\"a\\"]}"}]], "rule_snapshots": null, "run_ids": {"__set__": []}}', + StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]), + ) + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=10, + asset_evaluations=[evaluation], + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={ + "assetKey": {"path": ["upstream_static_partitioned_asset"]}, + "limit": 10, + "cursor": None, + }, + ) + + records = results.data["assetConditionEvaluationRecordsOrError"]["records"] + assert len(records) == 1 + evaluation = records[0]["evaluation"] + assert evaluation["numTrue"] == 0 + assert evaluation["numFalse"] == 6 + assert evaluation["numSkipped"] == 0 + assert len(evaluation["childEvaluations"]) == 2 + not_skip_evaluation = evaluation["childEvaluations"][1] + assert not_skip_evaluation["description"] == "Not" + assert not_skip_evaluation["numTrue"] == 1 + assert len(not_skip_evaluation["childEvaluations"]) == 1 + assert not_skip_evaluation["childEvaluations"][0]["description"] == "Any of" + assert len(not_skip_evaluation["childEvaluations"][0]["childEvaluations"]) == 2 + + def test_get_evaluations(self, graphql_context: WorkspaceRequestContext): + evaluation1 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_one"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 0, "partition_subsets_by_condition": [], "rule_snapshots": null, "run_ids": {"__set__": []}}', + None, + ) + evaluation2 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_two"]}, "num_discarded": 0, "num_requested": 1, "num_skipped": 0, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": null, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "materialization is missing"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', + None, + ) + evaluation3 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_three"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 1, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "WaitingOnAssetsRuleEvaluationData", "waiting_on_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_two"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnParentOutdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "waiting on upstream data to be up to date"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', + None, + ) + evaluation4 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_four"]}, "num_discarded": 0, "num_requested": 1, "num_skipped": 0, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "ParentUpdatedRuleEvaluationData", "updated_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_two"]}]}, "will_update_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_three"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnParentUpdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "upstream data has changed since latest materialization"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', + None, + ) + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["foo"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "autoMaterializeAssetEvaluationsOrError": {"records": []}, + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + } + + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=10, asset_evaluations=[evaluation1, evaluation2, evaluation3, evaluation4] + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_one"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [ + { + "numRequested": 0, + "numSkipped": 0, + "numDiscarded": 0, + "rules": [], + "rulesWithRuleEvaluations": [], + "assetKey": {"path": ["asset_one"]}, + } + ], + }, + } + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [ + { + "numRequested": 1, + "numSkipped": 0, + "numDiscarded": 0, + "rulesWithRuleEvaluations": [ + { + "rule": {"decisionType": "MATERIALIZE"}, + "ruleEvaluations": [ + { + "evaluationData": None, + "partitionKeysOrError": None, + } + ], + } + ], + } + ], + }, + } + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_three"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [ + { + "numRequested": 0, + "numSkipped": 1, + "numDiscarded": 0, + "rulesWithRuleEvaluations": [ + { + "rule": {"decisionType": "SKIP"}, + "ruleEvaluations": [ + { + "evaluationData": { + "waitingOnAssetKeys": [{"path": ["asset_two"]}], + }, + "partitionKeysOrError": None, + } + ], + } + ], + } + ], + }, + } + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_four"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [ + { + "numRequested": 1, + "numSkipped": 0, + "numDiscarded": 0, + "rulesWithRuleEvaluations": [ + { + "rule": {"decisionType": "MATERIALIZE"}, + "ruleEvaluations": [ + { + "evaluationData": { + "updatedAssetKeys": [{"path": ["asset_two"]}], + "willUpdateAssetKeys": [{"path": ["asset_three"]}], + }, + "partitionKeysOrError": None, + } + ], + } + ], + } + ], + }, + } + + def _get_condition_evaluation( + self, + asset_key: AssetKey, + description: str, + partitions_def: PartitionsDefinition, + true_partition_keys: Sequence[str], + candidate_partition_keys: Optional[Sequence[str]] = None, + child_evaluations: Optional[Sequence[AssetConditionEvaluation]] = None, + ) -> AssetConditionEvaluation: + return AssetConditionEvaluation( + condition_snapshot=AssetConditionSnapshot("...", description, "a1b2"), + true_subset=AssetSubset( + asset_key=asset_key, + value=partitions_def.subset_with_partition_keys(true_partition_keys), + ), + candidate_subset=AssetSubset( + asset_key=asset_key, + value=partitions_def.subset_with_partition_keys(candidate_partition_keys), + ) + if candidate_partition_keys + else HistoricalAllPartitionsSubset(), + start_timestamp=123, + end_timestamp=456, + child_evaluations=child_evaluations or [], + ) + + def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequestContext): + asset_key = AssetKey("upstream_static_partitioned_asset") + partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]) + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={ + "assetKey": {"path": ["upstream_static_partitioned_asset"]}, + "limit": 10, + "cursor": None, + }, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": None, + "automationPolicySensor": None, + }, + "assetConditionEvaluationRecordsOrError": {"records": []}, + } + + evaluation = self._get_condition_evaluation( + asset_key, + "All of", + partitions_def, + ["a", "b"], + child_evaluations=[ + self._get_condition_evaluation( + asset_key, + "Any of", + partitions_def, + ["a", "b", "c"], + child_evaluations=[ + self._get_condition_evaluation( + asset_key, "parent_updated", partitions_def, ["a", "c"] + ), + self._get_condition_evaluation(asset_key, "missing", partitions_def, ["b"]), + self._get_condition_evaluation(asset_key, "other", partitions_def, []), + ], + ), + self._get_condition_evaluation( + asset_key, + "Not", + partitions_def, + ["a", "b"], + candidate_partition_keys=["a", "b", "c"], + child_evaluations=[ + self._get_condition_evaluation( + asset_key, + "Any of", + partitions_def, + ["c"], + ["a", "b", "c"], + child_evaluations=[ + self._get_condition_evaluation( + asset_key, + "parent missing", + partitions_def, + ["c"], + ["a", "b", "c"], + ), + self._get_condition_evaluation( + asset_key, + "parent outdated", + partitions_def, + [], + ["a", "b", "c"], + ), + ], + ), + ], + ), + ], + ) + + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=10, + asset_evaluations=[ + AssetConditionEvaluationWithRunIds(evaluation, frozenset({"runid1", "runid2"})) + ], + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={ + "assetKey": {"path": ["upstream_static_partitioned_asset"]}, + "limit": 10, + "cursor": None, + }, + ) + + records = results.data["assetConditionEvaluationRecordsOrError"]["records"] + assert len(records) == 1 + + assert records[0]["numRequested"] == 2 + evaluation = records[0]["evaluation"] + assert evaluation["description"] == "All of" + assert evaluation["numTrue"] == 2 + assert evaluation["numFalse"] == 4 + assert evaluation["numSkipped"] == 0 + assert set(evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + assert len(evaluation["childEvaluations"]) == 2 + + not_evaluation = evaluation["childEvaluations"][1] + assert not_evaluation["description"] == "Not" + assert not_evaluation["numTrue"] == 2 + assert not_evaluation["numFalse"] == 1 + assert not_evaluation["numSkipped"] == 3 + assert set(not_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + + skip_evaluation = not_evaluation["childEvaluations"][0] + assert skip_evaluation["description"] == "Any of" + assert skip_evaluation["numTrue"] == 1 + assert skip_evaluation["numFalse"] == 2 + assert skip_evaluation["numSkipped"] == 3 + assert set(skip_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"c"} + + # test one of the true partitions + specific_result = execute_dagster_graphql( + graphql_context, + QUERY_FOR_SPECIFIC_PARTITION, + variables={ + "assetKey": {"path": ["upstream_static_partitioned_asset"]}, + "partition": "b", + "evaluationId": 10, + }, + ) + + evaluation = specific_result.data["assetConditionEvaluationForPartition"] + assert evaluation["description"] == "All of" + assert evaluation["status"] == "TRUE" + assert len(evaluation["childEvaluations"]) == 2 + + not_evaluation = evaluation["childEvaluations"][1] + assert not_evaluation["description"] == "Not" + assert not_evaluation["status"] == "TRUE" + + skip_evaluation = not_evaluation["childEvaluations"][0] + assert skip_evaluation["description"] == "Any of" + assert skip_evaluation["status"] == "FALSE" + + # test one of the false partitions + specific_result = execute_dagster_graphql( + graphql_context, + QUERY_FOR_SPECIFIC_PARTITION, + variables={ + "assetKey": {"path": ["upstream_static_partitioned_asset"]}, + "partition": "d", + "evaluationId": 10, + }, + ) + + evaluation = specific_result.data["assetConditionEvaluationForPartition"] + assert evaluation["description"] == "All of" + assert evaluation["status"] == "FALSE" + assert len(evaluation["childEvaluations"]) == 2 + + not_evaluation = evaluation["childEvaluations"][1] + assert not_evaluation["description"] == "Not" + assert not_evaluation["status"] == "SKIPPED" + + skip_evaluation = not_evaluation["childEvaluations"][0] + assert skip_evaluation["description"] == "Any of" + assert skip_evaluation["status"] == "SKIPPED" + + def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext): + graphql_context.instance.daemon_cursor_storage.set_cursor_values( + {_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: serialize_value(AssetDaemonCursor.empty(0))} + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": 0, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [], + }, + } + + graphql_context.instance.daemon_cursor_storage.set_cursor_values( + { + _PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY: ( + serialize_value(AssetDaemonCursor.empty(0).with_updates(0, 1.0, [], [])) + ) + } + ) + + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, + ) + assert results.data == { + "assetNodeOrError": { + "currentAutoMaterializeEvaluationId": 42, + "automationPolicySensor": None, + }, + "autoMaterializeAssetEvaluationsOrError": { + "records": [], + }, + } diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index 89ef6266181b4..bfa971e904202 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -310,6 +310,7 @@ def _get_child_decision_type_evaluation( rule_snapshot, ) for rule_snapshot in rule_snapshots + or set(partition_subsets_by_condition_by_rule_snapshot.keys()) if rule_snapshot.decision_type == decision_type ]