From 3e884b3bc6355f0652f51705968e03d9c2c41570 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Wed, 3 Jan 2024 10:11:17 -0500 Subject: [PATCH] Return tree instead of recursive resolver --- .../src/graphql/possibleTypes.generated.json | 2 +- .../ui-core/src/graphql/schema.graphql | 89 ++-- .../packages/ui-core/src/graphql/types.ts | 153 +++--- .../fetch_asset_condition_evaluations.py | 9 +- .../schema/asset_condition_evaluations.py | 164 +++--- .../dagster_graphql/schema/roots/query.py | 4 +- .../test_asset_condition_evaluations.py | 473 ++++++------------ ...test_auto_materialize_asset_evaluations.py | 241 +-------- .../_core/definitions/asset_condition.py | 5 +- .../_core/definitions/asset_daemon_cursor.py | 2 +- .../auto_materialize_rule_evaluation.py | 30 +- 11 files changed, 420 insertions(+), 752 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index 8a37a392790a0..4d529a469d3a4 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","NullMetadataEntry"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"AssetConditionEvaluation":["UnpartitionedAssetConditionEvaluation","PartitionedAssetConditionEvaluation","SpecificPartitionAssetConditionEvaluation"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"]} \ No newline at end of file +{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","NullMetadataEntry"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"]} \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 63e00745f9c39..afa97e7306daa 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3182,7 +3182,7 @@ type Query { assetKey: AssetKeyInput! evaluationId: Int! partition: String! - ): SpecificPartitionAssetConditionEvaluation + ): AssetConditionEvaluation assetConditionEvaluationRecordsOrError( assetKey: AssetKeyInput! limit: Int! @@ -3416,11 +3416,24 @@ type AutoMaterializeAssetEvaluationNeedsMigrationError implements Error { message: String! } -type SpecificPartitionAssetConditionEvaluation { +type AssetConditionEvaluation { + rootUniqueId: String! + evaluationNodes: [AssetConditionEvaluationNode!]! +} + +union AssetConditionEvaluationNode = + UnpartitionedAssetConditionEvaluationNode + | PartitionedAssetConditionEvaluationNode + | SpecificPartitionAssetConditionEvaluationNode + +type UnpartitionedAssetConditionEvaluationNode { + uniqueId: String! description: String! + startTimestamp: Float + endTimestamp: Float metadataEntries: [MetadataEntry!]! status: AssetConditionEvaluationStatus! - childEvaluations: [SpecificPartitionAssetConditionEvaluation!] + childUniqueIds: [String!]! } enum AssetConditionEvaluationStatus { @@ -3429,49 +3442,17 @@ enum AssetConditionEvaluationStatus { SKIPPED } -union AssetConditionEvaluationRecordsOrError = - AssetConditionEvaluationRecords - | AutoMaterializeAssetEvaluationNeedsMigrationError - -type AssetConditionEvaluationRecords { - records: [AssetConditionEvaluationRecord!]! -} - -type AssetConditionEvaluationRecord { - id: ID! - evaluationId: Int! - runIds: [String!]! - timestamp: Float! - assetKey: AssetKey! - numRequested: Int! - evaluation: AssetConditionEvaluation! -} - -union AssetConditionEvaluation = - UnpartitionedAssetConditionEvaluation - | PartitionedAssetConditionEvaluation - | SpecificPartitionAssetConditionEvaluation - -type UnpartitionedAssetConditionEvaluation { - description: String! - startTimestamp: Float - endTimestamp: Float - metadataEntries: [MetadataEntry!]! - status: AssetConditionEvaluationStatus! - childEvaluations: [UnpartitionedAssetConditionEvaluation!] -} - -type PartitionedAssetConditionEvaluation { +type PartitionedAssetConditionEvaluationNode { + uniqueId: String! description: String! startTimestamp: Float endTimestamp: Float trueSubset: AssetSubset! - falseSubset: AssetSubset! candidateSubset: AssetSubset numTrue: Int! - numFalse: Int! - numSkipped: Int! - childEvaluations: [PartitionedAssetConditionEvaluation!] + numFalse: Int + numSkipped: Int + childUniqueIds: [String!]! } type AssetSubset { @@ -3486,6 +3467,34 @@ type AssetSubsetValue { isPartitioned: Boolean! } +type SpecificPartitionAssetConditionEvaluationNode { + uniqueId: String! + description: String! + metadataEntries: [MetadataEntry!]! + status: AssetConditionEvaluationStatus! + childUniqueIds: [String!]! +} + +union AssetConditionEvaluationRecordsOrError = + AssetConditionEvaluationRecords + | AutoMaterializeAssetEvaluationNeedsMigrationError + +type AssetConditionEvaluationRecords { + records: [AssetConditionEvaluationRecord!]! +} + +type AssetConditionEvaluationRecord { + id: ID! + evaluationId: Int! + runIds: [String!]! + timestamp: Float! + assetKey: AssetKey! + numRequested: Int! + startTimestamp: Float + endTimestamp: Float + evaluation: AssetConditionEvaluation! +} + type Mutation { launchPipelineExecution(executionParams: ExecutionParams!): LaunchRunResult! launchRun(executionParams: ExecutionParams!): LaunchRunResult! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 2018b8539765d..e6313a312410e 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -245,19 +245,27 @@ export type AssetChecksOrError = | AssetCheckNeedsUserCodeUpgrade | AssetChecks; -export type AssetConditionEvaluation = - | PartitionedAssetConditionEvaluation - | SpecificPartitionAssetConditionEvaluation - | UnpartitionedAssetConditionEvaluation; +export type AssetConditionEvaluation = { + __typename: 'AssetConditionEvaluation'; + evaluationNodes: Array; + rootUniqueId: Scalars['String']; +}; + +export type AssetConditionEvaluationNode = + | PartitionedAssetConditionEvaluationNode + | SpecificPartitionAssetConditionEvaluationNode + | UnpartitionedAssetConditionEvaluationNode; export type AssetConditionEvaluationRecord = { __typename: 'AssetConditionEvaluationRecord'; assetKey: AssetKey; + endTimestamp: Maybe; evaluation: AssetConditionEvaluation; evaluationId: Scalars['Int']; id: Scalars['ID']; numRequested: Scalars['Int']; runIds: Array; + startTimestamp: Maybe; timestamp: Scalars['Float']; }; @@ -2674,18 +2682,18 @@ export type PartitionTags = { export type PartitionTagsOrError = PartitionTags | PythonError; -export type PartitionedAssetConditionEvaluation = { - __typename: 'PartitionedAssetConditionEvaluation'; +export type PartitionedAssetConditionEvaluationNode = { + __typename: 'PartitionedAssetConditionEvaluationNode'; candidateSubset: Maybe; - childEvaluations: Maybe>; + childUniqueIds: Array; description: Scalars['String']; endTimestamp: Maybe; - falseSubset: AssetSubset; - numFalse: Scalars['Int']; - numSkipped: Scalars['Int']; + numFalse: Maybe; + numSkipped: Maybe; numTrue: Scalars['Int']; startTimestamp: Maybe; trueSubset: AssetSubset; + uniqueId: Scalars['String']; }; export type Partitions = { @@ -3022,7 +3030,7 @@ export type Query = { allTopLevelResourceDetailsOrError: ResourcesOrError; assetBackfillPreview: Array; assetCheckExecutions: Array; - assetConditionEvaluationForPartition: Maybe; + assetConditionEvaluationForPartition: Maybe; assetConditionEvaluationRecordsOrError: Maybe; assetConditionEvaluationsForEvaluationId: Maybe; assetNodeDefinitionCollisions: Array; @@ -4200,12 +4208,13 @@ export type SolidStepStatusUnavailableError = Error & { message: Scalars['String']; }; -export type SpecificPartitionAssetConditionEvaluation = { - __typename: 'SpecificPartitionAssetConditionEvaluation'; - childEvaluations: Maybe>; +export type SpecificPartitionAssetConditionEvaluationNode = { + __typename: 'SpecificPartitionAssetConditionEvaluationNode'; + childUniqueIds: Array; description: Scalars['String']; metadataEntries: Array; status: AssetConditionEvaluationStatus; + uniqueId: Scalars['String']; }; export type StaleCause = { @@ -4517,14 +4526,15 @@ export type UnknownPipeline = PipelineReference & { solidSelection: Maybe>; }; -export type UnpartitionedAssetConditionEvaluation = { - __typename: 'UnpartitionedAssetConditionEvaluation'; - childEvaluations: Maybe>; +export type UnpartitionedAssetConditionEvaluationNode = { + __typename: 'UnpartitionedAssetConditionEvaluationNode'; + childUniqueIds: Array; description: Scalars['String']; endTimestamp: Maybe; metadataEntries: Array; startTimestamp: Maybe; status: AssetConditionEvaluationStatus; + uniqueId: Scalars['String']; }; export type UnpartitionedAssetStatus = { @@ -5040,6 +5050,21 @@ export const buildAssetChecks = ( }; }; +export const buildAssetConditionEvaluation = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetConditionEvaluation'} & AssetConditionEvaluation => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetConditionEvaluation'); + return { + __typename: 'AssetConditionEvaluation', + evaluationNodes: + overrides && overrides.hasOwnProperty('evaluationNodes') ? overrides.evaluationNodes! : [], + rootUniqueId: + overrides && overrides.hasOwnProperty('rootUniqueId') ? overrides.rootUniqueId! : 'eos', + }; +}; + export const buildAssetConditionEvaluationRecord = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -5054,12 +5079,14 @@ export const buildAssetConditionEvaluationRecord = ( : relationshipsToOmit.has('AssetKey') ? ({} as AssetKey) : buildAssetKey({}, relationshipsToOmit), + endTimestamp: + overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 4.33, evaluation: overrides && overrides.hasOwnProperty('evaluation') ? overrides.evaluation! - : relationshipsToOmit.has('PartitionedAssetConditionEvaluation') - ? ({} as PartitionedAssetConditionEvaluation) - : buildPartitionedAssetConditionEvaluation({}, relationshipsToOmit), + : relationshipsToOmit.has('AssetConditionEvaluation') + ? ({} as AssetConditionEvaluation) + : buildAssetConditionEvaluation({}, relationshipsToOmit), evaluationId: overrides && overrides.hasOwnProperty('evaluationId') ? overrides.evaluationId! : 5501, id: @@ -5069,6 +5096,8 @@ export const buildAssetConditionEvaluationRecord = ( numRequested: overrides && overrides.hasOwnProperty('numRequested') ? overrides.numRequested! : 2364, runIds: overrides && overrides.hasOwnProperty('runIds') ? overrides.runIds! : [], + startTimestamp: + overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 6.66, timestamp: overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 6.88, }; }; @@ -9721,43 +9750,40 @@ export const buildPartitionTags = ( }; }; -export const buildPartitionedAssetConditionEvaluation = ( - overrides?: Partial, +export const buildPartitionedAssetConditionEvaluationNode = ( + overrides?: Partial, _relationshipsToOmit: Set = new Set(), -): {__typename: 'PartitionedAssetConditionEvaluation'} & PartitionedAssetConditionEvaluation => { +): { + __typename: 'PartitionedAssetConditionEvaluationNode'; +} & PartitionedAssetConditionEvaluationNode => { const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('PartitionedAssetConditionEvaluation'); + relationshipsToOmit.add('PartitionedAssetConditionEvaluationNode'); return { - __typename: 'PartitionedAssetConditionEvaluation', + __typename: 'PartitionedAssetConditionEvaluationNode', candidateSubset: overrides && overrides.hasOwnProperty('candidateSubset') ? overrides.candidateSubset! : relationshipsToOmit.has('AssetSubset') ? ({} as AssetSubset) : buildAssetSubset({}, relationshipsToOmit), - childEvaluations: - overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + childUniqueIds: + overrides && overrides.hasOwnProperty('childUniqueIds') ? overrides.childUniqueIds! : [], description: - overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'non', + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'quam', endTimestamp: - overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 6.63, - falseSubset: - overrides && overrides.hasOwnProperty('falseSubset') - ? overrides.falseSubset! - : relationshipsToOmit.has('AssetSubset') - ? ({} as AssetSubset) - : buildAssetSubset({}, relationshipsToOmit), - numFalse: overrides && overrides.hasOwnProperty('numFalse') ? overrides.numFalse! : 7739, - numSkipped: overrides && overrides.hasOwnProperty('numSkipped') ? overrides.numSkipped! : 7712, - numTrue: overrides && overrides.hasOwnProperty('numTrue') ? overrides.numTrue! : 6991, + overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 9.74, + numFalse: overrides && overrides.hasOwnProperty('numFalse') ? overrides.numFalse! : 4729, + numSkipped: overrides && overrides.hasOwnProperty('numSkipped') ? overrides.numSkipped! : 5678, + numTrue: overrides && overrides.hasOwnProperty('numTrue') ? overrides.numTrue! : 3015, startTimestamp: - overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 3.43, + overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 5.96, trueSubset: overrides && overrides.hasOwnProperty('trueSubset') ? overrides.trueSubset! : relationshipsToOmit.has('AssetSubset') ? ({} as AssetSubset) : buildAssetSubset({}, relationshipsToOmit), + uniqueId: overrides && overrides.hasOwnProperty('uniqueId') ? overrides.uniqueId! : 'sed', }; }; @@ -10453,9 +10479,9 @@ export const buildQuery = ( assetConditionEvaluationForPartition: overrides && overrides.hasOwnProperty('assetConditionEvaluationForPartition') ? overrides.assetConditionEvaluationForPartition! - : relationshipsToOmit.has('SpecificPartitionAssetConditionEvaluation') - ? ({} as SpecificPartitionAssetConditionEvaluation) - : buildSpecificPartitionAssetConditionEvaluation({}, relationshipsToOmit), + : relationshipsToOmit.has('AssetConditionEvaluation') + ? ({} as AssetConditionEvaluation) + : buildAssetConditionEvaluation({}, relationshipsToOmit), assetConditionEvaluationRecordsOrError: overrides && overrides.hasOwnProperty('assetConditionEvaluationRecordsOrError') ? overrides.assetConditionEvaluationRecordsOrError! @@ -12610,26 +12636,28 @@ export const buildSolidStepStatusUnavailableError = ( }; }; -export const buildSpecificPartitionAssetConditionEvaluation = ( - overrides?: Partial, +export const buildSpecificPartitionAssetConditionEvaluationNode = ( + overrides?: Partial, _relationshipsToOmit: Set = new Set(), ): { - __typename: 'SpecificPartitionAssetConditionEvaluation'; -} & SpecificPartitionAssetConditionEvaluation => { + __typename: 'SpecificPartitionAssetConditionEvaluationNode'; +} & SpecificPartitionAssetConditionEvaluationNode => { const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('SpecificPartitionAssetConditionEvaluation'); + relationshipsToOmit.add('SpecificPartitionAssetConditionEvaluationNode'); return { - __typename: 'SpecificPartitionAssetConditionEvaluation', - childEvaluations: - overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + __typename: 'SpecificPartitionAssetConditionEvaluationNode', + childUniqueIds: + overrides && overrides.hasOwnProperty('childUniqueIds') ? overrides.childUniqueIds! : [], description: - overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'vel', + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'ut', metadataEntries: overrides && overrides.hasOwnProperty('metadataEntries') ? overrides.metadataEntries! : [], status: overrides && overrides.hasOwnProperty('status') ? overrides.status! : AssetConditionEvaluationStatus.FALSE, + uniqueId: + overrides && overrides.hasOwnProperty('uniqueId') ? overrides.uniqueId! : 'repudiandae', }; }; @@ -13290,30 +13318,31 @@ export const buildUnknownPipeline = ( }; }; -export const buildUnpartitionedAssetConditionEvaluation = ( - overrides?: Partial, +export const buildUnpartitionedAssetConditionEvaluationNode = ( + overrides?: Partial, _relationshipsToOmit: Set = new Set(), ): { - __typename: 'UnpartitionedAssetConditionEvaluation'; -} & UnpartitionedAssetConditionEvaluation => { + __typename: 'UnpartitionedAssetConditionEvaluationNode'; +} & UnpartitionedAssetConditionEvaluationNode => { const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('UnpartitionedAssetConditionEvaluation'); + relationshipsToOmit.add('UnpartitionedAssetConditionEvaluationNode'); return { - __typename: 'UnpartitionedAssetConditionEvaluation', - childEvaluations: - overrides && overrides.hasOwnProperty('childEvaluations') ? overrides.childEvaluations! : [], + __typename: 'UnpartitionedAssetConditionEvaluationNode', + childUniqueIds: + overrides && overrides.hasOwnProperty('childUniqueIds') ? overrides.childUniqueIds! : [], description: - overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'deserunt', + overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'veniam', endTimestamp: - overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 7.57, + overrides && overrides.hasOwnProperty('endTimestamp') ? overrides.endTimestamp! : 3.21, metadataEntries: overrides && overrides.hasOwnProperty('metadataEntries') ? overrides.metadataEntries! : [], startTimestamp: - overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 0.96, + overrides && overrides.hasOwnProperty('startTimestamp') ? overrides.startTimestamp! : 2.94, status: overrides && overrides.hasOwnProperty('status') ? overrides.status! : AssetConditionEvaluationStatus.FALSE, + uniqueId: overrides && overrides.hasOwnProperty('uniqueId') ? overrides.uniqueId! : 'et', }; }; diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py index b96f42a563e6e..7f8358454b80f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py @@ -10,7 +10,6 @@ GrapheneAssetConditionEvaluationRecord, GrapheneAssetConditionEvaluationRecords, GrapheneAssetConditionEvaluationRecordsOrError, - GrapheneSpecificPartitionAssetConditionEvaluation, ) from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, @@ -58,7 +57,7 @@ def _get_graphene_records_from_evaluations( return GrapheneAssetConditionEvaluationRecords( records=[ GrapheneAssetConditionEvaluationRecord( - evaluation, partitions_defs[evaluation.asset_key], graphene_info.context.instance + evaluation, partitions_defs[evaluation.asset_key] ) for evaluation in evaluation_records ] @@ -86,8 +85,10 @@ def fetch_asset_condition_evaluation_record_for_partition( if asset_node and asset_node.external_asset_node.partitions_def_data else None ) - return GrapheneSpecificPartitionAssetConditionEvaluation( - record.get_evaluation_with_run_ids(partitions_def).evaluation, partition_key + return GrapheneAssetConditionEvaluation( + record.get_evaluation_with_run_ids(partitions_def).evaluation, + partitions_def, + partition_key, ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py index 9b81ff2204b70..02aca5d068d11 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py @@ -1,15 +1,15 @@ import enum +import itertools from typing import Optional, Sequence, Union import graphene -import pendulum from dagster._core.definitions.asset_condition import AssetConditionEvaluation from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset -from dagster._core.instance import DynamicPartitionsStore from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord +from dagster_graphql.implementation.events import iterate_metadata_entries from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, ) @@ -48,6 +48,7 @@ def __init__(self, value: Union[bool, PartitionsSubset]): GraphenePartitionKeyRange(start, end) for start, end in value.get_partition_key_ranges(value.partitions_def) ] + partition_keys = value.get_partition_keys() else: partition_keys = value.get_partition_keys() @@ -75,7 +76,8 @@ def __init__(self, asset_subset: AssetSubset): ) -class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType): +class GrapheneUnpartitionedAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) startTimestamp = graphene.Field(graphene.Float) @@ -84,29 +86,31 @@ class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType): metadataEntries = non_null_list(GrapheneMetadataEntry) status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GrapheneUnpartitionedAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "UnpartitionedAssetConditionEvaluation" + name = "UnpartitionedAssetConditionEvaluationNode" def __init__(self, evaluation: AssetConditionEvaluation): + self._evaluation = evaluation if evaluation.true_subset.bool_value: status = AssetConditionEvaluationStatus.TRUE - elif evaluation.candidate_subset and evaluation.candidate_subset.bool_value: + elif ( + isinstance(evaluation.candidate_subset, AssetSubset) + and evaluation.candidate_subset.bool_value + ): status = AssetConditionEvaluationStatus.FALSE else: status = AssetConditionEvaluationStatus.SKIPPED super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, startTimestamp=evaluation.start_timestamp, endTimestamp=evaluation.end_timestamp, status=status, - childEvaluations=[ - GrapheneUnpartitionedAssetConditionEvaluation(child) - for child in evaluation.child_evaluations + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) @@ -117,85 +121,65 @@ def resolve_metadataEntries( (subset.metadata for subset in self._evaluation.subsets_with_metadata), {}, ) - return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + return list(iterate_metadata_entries(metadata)) -class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType): +class GraphenePartitionedAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) startTimestamp = graphene.Field(graphene.Float) endTimestamp = graphene.Field(graphene.Float) trueSubset = graphene.NonNull(GrapheneAssetSubset) - falseSubset = graphene.NonNull(GrapheneAssetSubset) candidateSubset = graphene.Field(GrapheneAssetSubset) numTrue = graphene.NonNull(graphene.Int) - numFalse = graphene.NonNull(graphene.Int) - numSkipped = graphene.NonNull(graphene.Int) + numFalse = graphene.Field(graphene.Int) + numSkipped = graphene.Field(graphene.Int) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GraphenePartitionedAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "PartitionedAssetConditionEvaluation" + name = "PartitionedAssetConditionEvaluationNode" def __init__( self, evaluation: AssetConditionEvaluation, partitions_def: Optional[PartitionsDefinition], - dynamic_partitions_store: DynamicPartitionsStore, ): self._partitions_def = partitions_def self._true_subset = evaluation.true_subset - self._all_subset = AssetSubset.all( - evaluation.asset_key, partitions_def, dynamic_partitions_store, pendulum.now("UTC") - ) - - # if the candidate_subset is unset, then we evaluated all partitions - self._candidate_subset = evaluation.candidate_subset or self._all_subset - super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, startTimestamp=evaluation.start_timestamp, endTimestamp=evaluation.end_timestamp, trueSubset=GrapheneAssetSubset(evaluation.true_subset), - candidateSubset=GrapheneAssetSubset(self._candidate_subset), - childEvaluations=[ - GraphenePartitionedAssetConditionEvaluation( - child, partitions_def, dynamic_partitions_store - ) - for child in evaluation.child_evaluations + candidateSubset=GrapheneAssetSubset(evaluation.candidate_subset) + if isinstance(evaluation.candidate_subset, AssetSubset) + else None, + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) def resolve_numTrue(self, graphene_info: ResolveInfo) -> int: return self._true_subset.size - def resolve_numFalse(self, graphene_info: ResolveInfo) -> int: - return self._candidate_subset.size - self._true_subset.size - - def resolve_falseSubset(self, graphene_info: ResolveInfo) -> GrapheneAssetSubset: - return GrapheneAssetSubset(self._candidate_subset - self._true_subset) - - def resolve_numSkipped(self, graphene_info: ResolveInfo) -> int: - return self._all_subset.size - self._candidate_subset.size - -class GrapheneSpecificPartitionAssetConditionEvaluation(graphene.ObjectType): +class GrapheneSpecificPartitionAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) metadataEntries = non_null_list(GrapheneMetadataEntry) status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GrapheneSpecificPartitionAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "SpecificPartitionAssetConditionEvaluation" + name = "SpecificPartitionAssetConditionEvaluationNode" def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): self._evaluation = evaluation @@ -204,7 +188,7 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): if partition_key in evaluation.true_subset.subset_value: status = AssetConditionEvaluationStatus.TRUE elif ( - evaluation.candidate_subset is None + not isinstance(evaluation.candidate_subset, AssetSubset) or partition_key in evaluation.candidate_subset.subset_value ): status = AssetConditionEvaluationStatus.FALSE @@ -212,11 +196,11 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): status = AssetConditionEvaluationStatus.SKIPPED super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, status=status, - childEvaluations=[ - GrapheneSpecificPartitionAssetConditionEvaluation(child, partition_key) - for child in evaluation.child_evaluations + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) @@ -232,18 +216,60 @@ def resolve_metadataEntries( ), {}, ) - return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + return list(iterate_metadata_entries(metadata)) -class GrapheneAssetConditionEvaluation(graphene.Union): +class GrapheneAssetConditionEvaluationNode(graphene.Union): class Meta: types = ( - GrapheneUnpartitionedAssetConditionEvaluation, - GraphenePartitionedAssetConditionEvaluation, - GrapheneSpecificPartitionAssetConditionEvaluation, + GrapheneUnpartitionedAssetConditionEvaluationNode, + GraphenePartitionedAssetConditionEvaluationNode, + GrapheneSpecificPartitionAssetConditionEvaluationNode, ) + name = "AssetConditionEvaluationNode" + + +class GrapheneAssetConditionEvaluation(graphene.ObjectType): + rootUniqueId = graphene.NonNull(graphene.String) + evaluationNodes = non_null_list(GrapheneAssetConditionEvaluationNode) + + class Meta: name = "AssetConditionEvaluation" + def __init__( + self, + evaluation: AssetConditionEvaluation, + partitions_def: Optional[PartitionsDefinition], + partition_key: Optional[str] = None, + ): + # flatten the evaluation tree into a list of nodes + def _flatten(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]: + return list(itertools.chain([e], *(_flatten(ce) for ce in e.child_evaluations))) + + all_nodes = _flatten(evaluation) + + if evaluation.true_subset.is_partitioned: + if partition_key is None: + evaluationNodes = [ + GraphenePartitionedAssetConditionEvaluationNode(evaluation, partitions_def) + for evaluation in all_nodes + ] + else: + evaluationNodes = [ + GrapheneSpecificPartitionAssetConditionEvaluationNode(evaluation, partition_key) + for evaluation in all_nodes + ] + else: + evaluationNodes = [ + GrapheneUnpartitionedAssetConditionEvaluationNode(evaluation) + for evaluation in all_nodes + ] + + super().__init__( + rootUniqueId=evaluation.condition_snapshot.unique_id, + evaluationNodes=evaluationNodes, + ) + class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType): id = graphene.NonNull(graphene.ID) @@ -254,6 +280,9 @@ class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType): assetKey = graphene.NonNull(GrapheneAssetKey) numRequested = graphene.NonNull(graphene.Int) + startTimestamp = graphene.Field(graphene.Float) + endTimestamp = graphene.Field(graphene.Float) + evaluation = graphene.NonNull(GrapheneAssetConditionEvaluation) class Meta: @@ -263,23 +292,8 @@ def __init__( self, record: AutoMaterializeAssetEvaluationRecord, partitions_def: Optional[PartitionsDefinition], - dynamic_partitions_store: DynamicPartitionsStore, - partition_key: Optional[str] = None, ): evaluation_with_run_ids = record.get_evaluation_with_run_ids(partitions_def) - if evaluation_with_run_ids.evaluation.true_subset.is_partitioned: - if partition_key is None: - evaluation = GraphenePartitionedAssetConditionEvaluation( - evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store - ) - else: - evaluation = GrapheneSpecificPartitionAssetConditionEvaluation( - evaluation_with_run_ids.evaluation, partition_key - ) - else: - evaluation = GrapheneUnpartitionedAssetConditionEvaluation( - evaluation_with_run_ids.evaluation - ) super().__init__( id=record.id, @@ -288,7 +302,11 @@ def __init__( runIds=evaluation_with_run_ids.run_ids, assetKey=GrapheneAssetKey(path=record.asset_key.path), numRequested=evaluation_with_run_ids.evaluation.true_subset.size, - evaluation=evaluation, + startTimestamp=evaluation_with_run_ids.evaluation.start_timestamp, + endTimestamp=evaluation_with_run_ids.evaluation.end_timestamp, + evaluation=GrapheneAssetConditionEvaluation( + evaluation_with_run_ids.evaluation, partitions_def + ), ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 193b5e84b4a0a..431ac626f9867 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -36,8 +36,8 @@ from dagster_graphql.implementation.fetch_logs import get_captured_log_metadata from dagster_graphql.implementation.fetch_runs import get_assets_latest_info from dagster_graphql.schema.asset_condition_evaluations import ( + GrapheneAssetConditionEvaluation, GrapheneAssetConditionEvaluationRecordsOrError, - GrapheneSpecificPartitionAssetConditionEvaluation, ) from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationRecordsOrError, @@ -523,7 +523,7 @@ class Meta: ) assetConditionEvaluationForPartition = graphene.Field( - GrapheneSpecificPartitionAssetConditionEvaluation, + GrapheneAssetConditionEvaluation, assetKey=graphene.Argument(graphene.NonNull(GrapheneAssetKeyInput)), evaluationId=graphene.Argument(graphene.NonNull(graphene.Int)), partition=graphene.Argument(graphene.NonNull(graphene.String)), diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index d62d4b8524bcf..aa13fc7e764bd 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -1,4 +1,5 @@ -from typing import Optional, Sequence +import random +from typing import Any, Mapping, Optional, Sequence from unittest.mock import PropertyMock, patch import dagster._check as check @@ -8,7 +9,7 @@ AssetConditionEvaluation, AssetConditionEvaluationWithRunIds, AssetConditionSnapshot, - HistoricalAllPartitionsSubset, + HistoricalAllPartitionsSubsetSentinel, ) from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor from dagster._core.definitions.asset_subset import AssetSubset @@ -36,6 +37,7 @@ _PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME, _PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, _PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, + asset_daemon_cursor_to_instigator_serialized_cursor, ) from dagster._serdes.serdes import serialize_value from dagster_graphql.test.utils import execute_dagster_graphql, infer_repository @@ -213,67 +215,60 @@ def test_get_tick_range(self, graphql_context): FRAGMENTS = """ -fragment unpartitionedEvaluationFields on UnpartitionedAssetConditionEvaluation { - description - startTimestamp - endTimestamp - status -} - -fragment partitionedEvaluationFields on PartitionedAssetConditionEvaluation { - description - startTimestamp - endTimestamp - numTrue - numFalse - numSkipped - trueSubset { - subsetValue { - isPartitioned - partitionKeys +fragment evaluationFields on AssetConditionEvaluation { + rootUniqueId + evaluationNodes { + ... on UnpartitionedAssetConditionEvaluationNode { + description + startTimestamp + endTimestamp + status + uniqueId + childUniqueIds } - } - falseSubset { - subsetValue { - isPartitioned - partitionKeys + ... on PartitionedAssetConditionEvaluationNode { + description + startTimestamp + endTimestamp + numTrue + trueSubset { + subsetValue { + isPartitioned + partitionKeys + } + } + uniqueId + childUniqueIds + } + ... on SpecificPartitionAssetConditionEvaluationNode { + description + status + uniqueId + childUniqueIds } } } +""" -fragment evaluationFields on AssetConditionEvaluation { - ... on UnpartitionedAssetConditionEvaluation { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - } +AUTOMATION_POLICY_SENSORS_QUERY = """ +query GetEvaluationsQuery($assetKey: AssetKeyInput!) { + assetNodeOrError(assetKey: $assetKey) { + ... on AssetNode { + currentAutoMaterializeEvaluationId + targetingInstigators { + ... on Schedule { + name } - } - } - } - ... on PartitionedAssetConditionEvaluation { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - } + ... on Sensor { + name } } } } } """ + + QUERY = ( FRAGMENTS + """ @@ -281,9 +276,6 @@ def test_get_tick_range(self, graphql_context): assetNodeOrError(assetKey: $assetKey) { ... on AssetNode { currentAutoMaterializeEvaluationId - automationPolicySensor { - name - } } } assetConditionEvaluationRecordsOrError(assetKey: $assetKey, limit: $limit, cursor: $cursor) { @@ -304,29 +296,16 @@ def test_get_tick_range(self, graphql_context): """ ) -QUERY_FOR_SPECIFIC_PARTITION = """ -fragment specificPartitionEvaluationFields on SpecificPartitionAssetConditionEvaluation { - description - status -} +QUERY_FOR_SPECIFIC_PARTITION = ( + FRAGMENTS + + """ query GetPartitionEvaluationQuery($assetKey: AssetKeyInput!, $partition: String!, $evaluationId: Int!) { assetConditionEvaluationForPartition(assetKey: $assetKey, partition: $partition, evaluationId: $evaluationId) { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - } - } - } - } + ...evaluationFields } } """ +) QUERY_FOR_EVALUATION_ID = ( FRAGMENTS @@ -365,11 +344,22 @@ def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext status=InstigatorStatus.RUNNING, instigator_data=SensorInstigatorData( sensor_type=SensorType.AUTOMATION_POLICY, - cursor=serialize_value(AssetDaemonCursor.empty(12345)), + cursor=asset_daemon_cursor_to_instigator_serialized_cursor( + AssetDaemonCursor.empty(12345) + ), ), ) ) + results = execute_dagster_graphql( + graphql_context, + AUTOMATION_POLICY_SENSORS_QUERY, + variables={ + "assetKey": {"path": ["fresh_diamond_bottom"]}, + }, + ) + assert not results.data["assetNodeOrError"]["currentAutoMaterializeEvaluationId"] + with patch( "dagster._core.instance.DagsterInstance.auto_materialize_use_automation_policy_sensors", new_callable=PropertyMock, @@ -378,16 +368,15 @@ def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext results = execute_dagster_graphql( graphql_context, - QUERY, + AUTOMATION_POLICY_SENSORS_QUERY, variables={ "assetKey": {"path": ["fresh_diamond_bottom"]}, - "limit": 10, - "cursor": None, }, ) - assert ( - results.data["assetNodeOrError"]["automationPolicySensor"]["name"] - == "my_automation_policy_sensor" + + assert any( + instigator["name"] == "my_automation_policy_sensor" + for instigator in results.data["assetNodeOrError"]["targetingInstigators"] ) assert results.data["assetNodeOrError"]["currentAutoMaterializeEvaluationId"] == 12345 @@ -416,7 +405,7 @@ def test_get_historic_rules_without_evaluation_data( assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 asset_one_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] assert asset_one_record["assetKey"] == {"path": ["asset_one"]} - assert asset_one_record["evaluation"]["status"] == "SKIPPED" + assert asset_one_record["evaluation"]["evaluationNodes"][0]["status"] == "FALSE" results = execute_dagster_graphql( graphql_context, @@ -425,16 +414,22 @@ def test_get_historic_rules_without_evaluation_data( ) assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 asset_two_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] - assert asset_two_record["evaluation"]["description"] == "All of" - assert asset_two_record["evaluation"]["status"] == "SKIPPED" - asset_two_children = asset_two_record["evaluation"]["childEvaluations"] - assert len(asset_two_children) == 2 - assert asset_two_children[0]["description"] == "Any of" - assert asset_two_children[0]["status"] == "SKIPPED" - assert ( - asset_two_children[0]["childEvaluations"][0]["description"] - == "materialization is missing" + asset_two_root = asset_two_record["evaluation"]["evaluationNodes"][0] + + assert asset_two_root["description"] == "All of" + assert asset_two_root["status"] == "FALSE" + assert len(asset_two_root["childUniqueIds"]) == 2 + + asset_two_child = self._get_node( + asset_two_root["childUniqueIds"][0], asset_two_record["evaluation"]["evaluationNodes"] + ) + assert asset_two_child["description"] == "Any of" + assert asset_two_child["status"] == "FALSE" + + asset_two_missing_node = self._get_node( + asset_two_child["childUniqueIds"][0], asset_two_record["evaluation"]["evaluationNodes"] ) + assert asset_two_missing_node["description"] == "materialization is missing" results = execute_dagster_graphql( graphql_context, @@ -450,6 +445,7 @@ def test_get_historic_rules_without_evaluation_data( assert any(record == asset_one_record for record in records) assert any(record == asset_two_record for record in records) + # this evaluationId doesn't exist results = execute_dagster_graphql( graphql_context, QUERY_FOR_EVALUATION_ID, @@ -463,7 +459,7 @@ def test_get_historic_evaluation_with_evaluation_data( self, graphql_context: WorkspaceRequestContext ): evaluation = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( - '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["upstream_static_partitioned_asset"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 1, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "WaitingOnAssetsRuleEvaluationData", "waiting_on_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["blah"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnRequiredButNonexistentParentsRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "required parent partitions do not exist"}}, {"__class__": "SerializedPartitionsSubset", "serialized_partitions_def_class_name": "StaticPartitionsDefinition", "serialized_partitions_def_unique_id": "7c2047f8b02e90a69136c1a657bd99ad80b433a2", "serialized_subset": "{\\"version\\": 1, \\"subset\\": [\\"a\\"]}"}]], "rule_snapshots": null, "run_ids": {"__set__": []}}', + '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["upstream_static_partitioned_asset"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 1, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "WaitingOnAssetsRuleEvaluationData", "waiting_on_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["blah"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnRequiredButNonexistentParentsRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "required parent partitions do not exist"}}, {"__class__": "SerializedPartitionsSubset", "serialized_partitions_def_class_name": "StaticPartitionsDefinition", "serialized_partitions_def_unique_id": "7c2047f8b02e90a69136c1a657bd99ad80b433a2", "serialized_subset": "{\\"version\\": 1, \\"subset\\": [\\"a\\"]}"}]], "rule_snapshots": [{"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "materialization is missing"}], "run_ids": {"__set__": []}}', StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]), ) check.not_none( @@ -485,178 +481,27 @@ def test_get_historic_evaluation_with_evaluation_data( records = results.data["assetConditionEvaluationRecordsOrError"]["records"] assert len(records) == 1 - evaluation = records[0]["evaluation"] - assert evaluation["numTrue"] == 0 - assert evaluation["numFalse"] == 6 - assert evaluation["numSkipped"] == 0 - assert len(evaluation["childEvaluations"]) == 2 - not_skip_evaluation = evaluation["childEvaluations"][1] - assert not_skip_evaluation["description"] == "Not" - assert not_skip_evaluation["numTrue"] == 1 - assert len(not_skip_evaluation["childEvaluations"]) == 1 - assert not_skip_evaluation["childEvaluations"][0]["description"] == "Any of" - assert len(not_skip_evaluation["childEvaluations"][0]["childEvaluations"]) == 2 - - def test_get_evaluations(self, graphql_context: WorkspaceRequestContext): - evaluation1 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( - '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_one"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 0, "partition_subsets_by_condition": [], "rule_snapshots": null, "run_ids": {"__set__": []}}', - None, - ) - evaluation2 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( - '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_two"]}, "num_discarded": 0, "num_requested": 1, "num_skipped": 0, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": null, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "materialization is missing"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', - None, - ) - evaluation3 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( - '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_three"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 1, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "WaitingOnAssetsRuleEvaluationData", "waiting_on_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_two"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnParentOutdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "waiting on upstream data to be up to date"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', - None, - ) - evaluation4 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( - '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_four"]}, "num_discarded": 0, "num_requested": 1, "num_skipped": 0, "partition_subsets_by_condition": [[{"__class__": "AutoMaterializeRuleEvaluation", "evaluation_data": {"__class__": "ParentUpdatedRuleEvaluationData", "updated_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_two"]}]}, "will_update_asset_keys": {"__frozenset__": [{"__class__": "AssetKey", "path": ["asset_three"]}]}}, "rule_snapshot": {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnParentUpdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "upstream data has changed since latest materialization"}}, null]], "rule_snapshots": null, "run_ids": {"__set__": []}}', - None, - ) - results = execute_dagster_graphql( - graphql_context, - QUERY, - variables={"assetKey": {"path": ["foo"]}, "limit": 10, "cursor": None}, - ) - assert results.data == { - "autoMaterializeAssetEvaluationsOrError": {"records": []}, - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, - } - check.not_none( - graphql_context.instance.schedule_storage - ).add_auto_materialize_asset_evaluations( - evaluation_id=10, asset_evaluations=[evaluation1, evaluation2, evaluation3, evaluation4] - ) + evaluation = records[0]["evaluation"] + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] - results = execute_dagster_graphql( - graphql_context, - QUERY, - variables={"assetKey": {"path": ["asset_one"]}, "limit": 10, "cursor": None}, - ) - assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { - "records": [ - { - "numRequested": 0, - "numSkipped": 0, - "numDiscarded": 0, - "rules": [], - "rulesWithRuleEvaluations": [], - "assetKey": {"path": ["asset_one"]}, - } - ], - }, - } + assert rootNode["numTrue"] == 0 + assert len(rootNode["childUniqueIds"]) == 2 - results = execute_dagster_graphql( - graphql_context, - QUERY, - variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, - ) - assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { - "records": [ - { - "numRequested": 1, - "numSkipped": 0, - "numDiscarded": 0, - "rulesWithRuleEvaluations": [ - { - "rule": {"decisionType": "MATERIALIZE"}, - "ruleEvaluations": [ - { - "evaluationData": None, - "partitionKeysOrError": None, - } - ], - } - ], - } - ], - }, - } + notSkipNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notSkipNode["description"] == "Not" + assert notSkipNode["numTrue"] == 0 + assert len(notSkipNode["childUniqueIds"]) == 1 - results = execute_dagster_graphql( - graphql_context, - QUERY, - variables={"assetKey": {"path": ["asset_three"]}, "limit": 10, "cursor": None}, - ) - assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { - "records": [ - { - "numRequested": 0, - "numSkipped": 1, - "numDiscarded": 0, - "rulesWithRuleEvaluations": [ - { - "rule": {"decisionType": "SKIP"}, - "ruleEvaluations": [ - { - "evaluationData": { - "waitingOnAssetKeys": [{"path": ["asset_two"]}], - }, - "partitionKeysOrError": None, - } - ], - } - ], - } - ], - }, - } + skipNode = self._get_node(notSkipNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert len(skipNode["childUniqueIds"]) == 1 - results = execute_dagster_graphql( - graphql_context, - QUERY, - variables={"assetKey": {"path": ["asset_four"]}, "limit": 10, "cursor": None}, - ) - assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { - "records": [ - { - "numRequested": 1, - "numSkipped": 0, - "numDiscarded": 0, - "rulesWithRuleEvaluations": [ - { - "rule": {"decisionType": "MATERIALIZE"}, - "ruleEvaluations": [ - { - "evaluationData": { - "updatedAssetKeys": [{"path": ["asset_two"]}], - "willUpdateAssetKeys": [{"path": ["asset_three"]}], - }, - "partitionKeysOrError": None, - } - ], - } - ], - } - ], - }, - } + def _get_node( + self, unique_id: str, evaluations: Sequence[Mapping[str, Any]] + ) -> Mapping[str, Any]: + return next(iter([node for node in evaluations if node["uniqueId"] == unique_id])) def _get_condition_evaluation( self, @@ -668,7 +513,9 @@ def _get_condition_evaluation( child_evaluations: Optional[Sequence[AssetConditionEvaluation]] = None, ) -> AssetConditionEvaluation: return AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("...", description, "a1b2"), + condition_snapshot=AssetConditionSnapshot( + "...", description, str(random.randint(0, 100000000)) + ), true_subset=AssetSubset( asset_key=asset_key, value=partitions_def.subset_with_partition_keys(true_partition_keys), @@ -678,10 +525,11 @@ def _get_condition_evaluation( value=partitions_def.subset_with_partition_keys(candidate_partition_keys), ) if candidate_partition_keys - else HistoricalAllPartitionsSubset(), + else HistoricalAllPartitionsSubsetSentinel(), start_timestamp=123, end_timestamp=456, child_evaluations=child_evaluations or [], + subsets_with_metadata=[], ) def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequestContext): @@ -697,10 +545,7 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest }, ) assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": None, - "automationPolicySensor": None, - }, + "assetNodeOrError": {"currentAutoMaterializeEvaluationId": None}, "assetConditionEvaluationRecordsOrError": {"records": []}, } @@ -782,26 +627,26 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest assert records[0]["numRequested"] == 2 evaluation = records[0]["evaluation"] - assert evaluation["description"] == "All of" - assert evaluation["numTrue"] == 2 - assert evaluation["numFalse"] == 4 - assert evaluation["numSkipped"] == 0 - assert set(evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} - assert len(evaluation["childEvaluations"]) == 2 - - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["numTrue"] == 2 - assert not_evaluation["numFalse"] == 1 - assert not_evaluation["numSkipped"] == 3 - assert set(not_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} - - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["numTrue"] == 1 - assert skip_evaluation["numFalse"] == 2 - assert skip_evaluation["numSkipped"] == 3 - assert set(skip_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"c"} + + # all nodes in the tree + assert len(evaluation["evaluationNodes"]) == 9 + + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + assert rootNode["description"] == "All of" + assert rootNode["numTrue"] == 2 + assert set(rootNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + assert len(rootNode["childUniqueIds"]) == 2 + + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["numTrue"] == 2 + assert set(notNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["numTrue"] == 1 + assert set(skipNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"c"} # test one of the true partitions specific_result = execute_dagster_graphql( @@ -815,17 +660,22 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest ) evaluation = specific_result.data["assetConditionEvaluationForPartition"] - assert evaluation["description"] == "All of" - assert evaluation["status"] == "TRUE" - assert len(evaluation["childEvaluations"]) == 2 + assert len(evaluation["evaluationNodes"]) == 9 + + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + + assert rootNode["description"] == "All of" + assert rootNode["status"] == "TRUE" + assert len(rootNode["childUniqueIds"]) == 2 - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["status"] == "TRUE" + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["status"] == "TRUE" - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["status"] == "FALSE" + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["status"] == "FALSE" # test one of the false partitions specific_result = execute_dagster_graphql( @@ -839,17 +689,22 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest ) evaluation = specific_result.data["assetConditionEvaluationForPartition"] - assert evaluation["description"] == "All of" - assert evaluation["status"] == "FALSE" - assert len(evaluation["childEvaluations"]) == 2 + assert len(evaluation["evaluationNodes"]) == 9 - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["status"] == "SKIPPED" + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["status"] == "SKIPPED" + assert rootNode["description"] == "All of" + assert rootNode["status"] == "FALSE" + assert len(rootNode["childUniqueIds"]) == 2 + + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["status"] == "SKIPPED" + + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["status"] == "SKIPPED" def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext): graphql_context.instance.daemon_cursor_storage.set_cursor_values( @@ -862,13 +717,8 @@ def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext): variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, ) assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": 0, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { - "records": [], - }, + "assetNodeOrError": {"currentAutoMaterializeEvaluationId": 0}, + "assetConditionEvaluationRecordsOrError": {"records": []}, } graphql_context.instance.daemon_cursor_storage.set_cursor_values( @@ -885,11 +735,8 @@ def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext): variables={"assetKey": {"path": ["asset_two"]}, "limit": 10, "cursor": None}, ) assert results.data == { - "assetNodeOrError": { - "currentAutoMaterializeEvaluationId": 42, - "automationPolicySensor": None, - }, - "autoMaterializeAssetEvaluationsOrError": { + "assetNodeOrError": {"currentAutoMaterializeEvaluationId": 42}, + "assetConditionEvaluationRecordsOrError": { "records": [], }, } diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_auto_materialize_asset_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_auto_materialize_asset_evaluations.py index b39307ad339dd..cd404c34d66d8 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_auto_materialize_asset_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_auto_materialize_asset_evaluations.py @@ -1,8 +1,4 @@ -from unittest.mock import PropertyMock, patch - import dagster._check as check -import pendulum -from dagster import AssetKey, RunRequest from dagster._core.definitions.asset_daemon_cursor import ( AssetDaemonCursor, ) @@ -11,203 +7,17 @@ deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids, ) from dagster._core.definitions.partition import StaticPartitionsDefinition -from dagster._core.definitions.run_request import ( - InstigatorType, -) -from dagster._core.definitions.sensor_definition import SensorType -from dagster._core.host_representation.origin import ( - ExternalInstigatorOrigin, -) -from dagster._core.scheduler.instigation import ( - InstigatorState, - InstigatorStatus, - SensorInstigatorData, - TickData, - TickStatus, -) from dagster._core.workspace.context import WorkspaceRequestContext from dagster._daemon.asset_daemon import ( _PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY, - _PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME, - _PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, - _PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, - asset_daemon_cursor_to_instigator_serialized_cursor, ) from dagster._serdes.serdes import serialize_value -from dagster_graphql.test.utils import execute_dagster_graphql, infer_repository +from dagster_graphql.test.utils import execute_dagster_graphql from dagster_graphql_tests.graphql.graphql_context_test_suite import ( ExecutingGraphQLContextTestMatrix, ) -TICKS_QUERY = """ -query AssetDameonTicksQuery($dayRange: Int, $dayOffset: Int, $statuses: [InstigationTickStatus!], $limit: Int, $cursor: String, $beforeTimestamp: Float, $afterTimestamp: Float) { - autoMaterializeTicks(dayRange: $dayRange, dayOffset: $dayOffset, statuses: $statuses, limit: $limit, cursor: $cursor, beforeTimestamp: $beforeTimestamp, afterTimestamp: $afterTimestamp) { - id - timestamp - endTimestamp - status - requestedAssetKeys { - path - } - requestedMaterializationsForAssets { - assetKey { - path - } - partitionKeys - } - requestedAssetMaterializationCount - autoMaterializeAssetEvaluationId - } -} -""" - - -def _create_tick(instance, status, timestamp, evaluation_id, run_requests=None, end_timestamp=None): - return instance.create_tick( - TickData( - instigator_origin_id=_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, - instigator_name=_PRE_SENSOR_AUTO_MATERIALIZE_INSTIGATOR_NAME, - instigator_type=InstigatorType.AUTO_MATERIALIZE, - status=status, - timestamp=timestamp, - end_timestamp=end_timestamp, - selector_id=_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, - run_ids=[], - auto_materialize_evaluation_id=evaluation_id, - run_requests=run_requests, - ) - ) - - -class TestAutoMaterializeTicks(ExecutingGraphQLContextTestMatrix): - def test_get_tick_range(self, graphql_context): - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={"dayRange": None, "dayOffset": None}, - ) - assert len(result.data["autoMaterializeTicks"]) == 0 - - now = pendulum.now("UTC") - end_timestamp = now.timestamp() + 20 - - success_1 = _create_tick( - graphql_context.instance, - TickStatus.SUCCESS, - now.timestamp(), - end_timestamp=end_timestamp, - evaluation_id=3, - run_requests=[ - RunRequest(asset_selection=[AssetKey("foo"), AssetKey("bar")], partition_key="abc"), - RunRequest(asset_selection=[AssetKey("bar")], partition_key="def"), - RunRequest(asset_selection=[AssetKey("baz")], partition_key=None), - ], - ) - - success_2 = _create_tick( - graphql_context.instance, - TickStatus.SUCCESS, - now.subtract(days=1, hours=1).timestamp(), - evaluation_id=2, - ) - - _create_tick( - graphql_context.instance, - TickStatus.SKIPPED, - now.subtract(days=2, hours=1).timestamp(), - evaluation_id=1, - ) - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={"dayRange": None, "dayOffset": None}, - ) - assert len(result.data["autoMaterializeTicks"]) == 3 - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={"dayRange": 1, "dayOffset": None}, - ) - assert len(result.data["autoMaterializeTicks"]) == 1 - tick = result.data["autoMaterializeTicks"][0] - assert tick["endTimestamp"] == end_timestamp - assert tick["autoMaterializeAssetEvaluationId"] == 3 - assert sorted(tick["requestedAssetKeys"], key=lambda x: x["path"][0]) == [ - {"path": ["bar"]}, - {"path": ["baz"]}, - {"path": ["foo"]}, - ] - - asset_materializations = tick["requestedMaterializationsForAssets"] - by_asset_key = { - AssetKey.from_coercible(mat["assetKey"]["path"]).to_user_string(): mat["partitionKeys"] - for mat in asset_materializations - } - - assert {key: sorted(val) for key, val in by_asset_key.items()} == { - "foo": ["abc"], - "bar": ["abc", "def"], - "baz": [], - } - - assert tick["requestedAssetMaterializationCount"] == 4 - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={ - "beforeTimestamp": success_2.timestamp + 1, - "afterTimestamp": success_2.timestamp - 1, - }, - ) - assert len(result.data["autoMaterializeTicks"]) == 1 - tick = result.data["autoMaterializeTicks"][0] - assert ( - tick["autoMaterializeAssetEvaluationId"] - == success_2.tick_data.auto_materialize_evaluation_id - ) - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={"dayRange": None, "dayOffset": None, "statuses": ["SUCCESS"]}, - ) - assert len(result.data["autoMaterializeTicks"]) == 2 - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={"dayRange": None, "dayOffset": None, "statuses": ["SUCCESS"], "limit": 1}, - ) - ticks = result.data["autoMaterializeTicks"] - assert len(ticks) == 1 - assert ticks[0]["timestamp"] == success_1.timestamp - assert ( - ticks[0]["autoMaterializeAssetEvaluationId"] - == success_1.tick_data.auto_materialize_evaluation_id - ) - - cursor = ticks[0]["id"] - - result = execute_dagster_graphql( - graphql_context, - TICKS_QUERY, - variables={ - "dayRange": None, - "dayOffset": None, - "statuses": ["SUCCESS"], - "limit": 1, - "cursor": cursor, - }, - ) - ticks = result.data["autoMaterializeTicks"] - assert len(ticks) == 1 - assert ticks[0]["timestamp"] == success_2.timestamp - - AUTOMATION_POLICY_SENSORS_QUERY = """ query GetEvaluationsQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { @@ -343,55 +153,6 @@ def test_get_tick_range(self, graphql_context): class TestAutoMaterializeAssetEvaluations(ExecutingGraphQLContextTestMatrix): - def test_automation_policy_sensor(self, graphql_context: WorkspaceRequestContext): - sensor_origin = ExternalInstigatorOrigin( - external_repository_origin=infer_repository(graphql_context).get_external_origin(), - instigator_name="my_automation_policy_sensor", - ) - - check.not_none(graphql_context.instance.schedule_storage).add_instigator_state( - InstigatorState( - sensor_origin, - InstigatorType.SENSOR, - status=InstigatorStatus.RUNNING, - instigator_data=SensorInstigatorData( - sensor_type=SensorType.AUTOMATION_POLICY, - cursor=asset_daemon_cursor_to_instigator_serialized_cursor( - AssetDaemonCursor.empty(12345) - ), - ), - ) - ) - - results = execute_dagster_graphql( - graphql_context, - AUTOMATION_POLICY_SENSORS_QUERY, - variables={ - "assetKey": {"path": ["fresh_diamond_bottom"]}, - }, - ) - assert not results.data["assetNodeOrError"]["currentAutoMaterializeEvaluationId"] - - with patch( - "dagster._core.instance.DagsterInstance.auto_materialize_use_automation_policy_sensors", - new_callable=PropertyMock, - ) as mock_my_property: - mock_my_property.return_value = True - - results = execute_dagster_graphql( - graphql_context, - AUTOMATION_POLICY_SENSORS_QUERY, - variables={ - "assetKey": {"path": ["fresh_diamond_bottom"]}, - }, - ) - - assert any( - instigator["name"] == "my_automation_policy_sensor" - for instigator in results.data["assetNodeOrError"]["targetingInstigators"] - ) - assert results.data["assetNodeOrError"]["currentAutoMaterializeEvaluationId"] == 12345 - def test_get_historic_rules(self, graphql_context: WorkspaceRequestContext): evaluation1 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( '{"__class__": "AutoMaterializeAssetEvaluation", "asset_key": {"__class__": "AssetKey", "path": ["asset_one"]}, "num_discarded": 0, "num_requested": 0, "num_skipped": 0, "partition_subsets_by_condition": [], "rule_snapshots": [{"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnParentMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "waiting on upstream data to be present"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnParentUpdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "upstream data has changed since latest materialization"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnRequiredButNonexistentParentsRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "required parent partitions do not exist"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnBackfillInProgressRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "targeted by an in-progress backfill"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "SkipOnParentOutdatedRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.SKIP"}, "description": "waiting on upstream data to be up to date"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnRequiredForFreshnessRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "required to meet this or downstream asset\'s freshness policy"}, {"__class__": "AutoMaterializeRuleSnapshot", "class_name": "MaterializeOnMissingRule", "decision_type": {"__enum__": "AutoMaterializeDecisionType.MATERIALIZE"}, "description": "materialization is missing"}], "run_ids": {"__set__": []}}', diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 91a57e0efe5d5..40136daaea996 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -167,10 +167,7 @@ def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluat return ( other is not None and self.condition_snapshot == other.condition_snapshot - # if any partitions are requested, then the state of the world must have meaninfully - # changed since the previous evaluation - and self.true_subset.size == 0 - and other.true_subset.size == 0 + and self.true_subset == other.true_subset # the candidate subset gets modified during serialization and get_serializable_candidate_subset(self.candidate_subset) == get_serializable_candidate_subset(other.candidate_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 163f0714e0a45..1d2587eafe53e 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -255,7 +255,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( previous_evaluation_state.append(backcompat_evaluation_state) return AssetDaemonCursor( - evaluation_id=default_evaluation_id, + evaluation_id=data.get("evaluation_id") or default_evaluation_id, previous_evaluation_state=previous_evaluation_state, last_observe_request_timestamp_by_asset_key=last_observe_request_timestamp_by_asset_key, ) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index b3fd30e875665..1a25874a0ea8d 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -200,9 +200,11 @@ def partitions_def(self) -> Optional[PartitionsDefinition]: raise NotImplementedError() def _get_empty_subset(self, asset_key: AssetKey, is_partitioned: bool) -> AssetSubset: - # We know this asset is partitioned, but we don't know what its partitions def is, so we - # just use a DefaultPartitionsSubset - if is_partitioned and self.partitions_def is None: + if not is_partitioned: + return AssetSubset(asset_key, False) + elif self.partitions_def is None: + # We know this asset is partitioned, but we don't know what its partitions def was, so we + # just use a DefaultPartitionsSubset return AssetSubset(asset_key, DefaultPartitionsSubset(set())) else: return AssetSubset.empty(asset_key, self.partitions_def) @@ -310,8 +312,9 @@ def _get_child_decision_type_evaluation( is_partitioned, rule_snapshot, ) - for rule_snapshot in rule_snapshots - or set(partition_subsets_by_condition_by_rule_snapshot.keys()) + for rule_snapshot in ( + set(rule_snapshots) | set(partition_subsets_by_condition_by_rule_snapshot.keys()) + ) if rule_snapshot.decision_type == decision_type ] @@ -329,11 +332,7 @@ def _get_child_decision_type_evaluation( decision_type_snapshot = AssetConditionSnapshot( class_name=OrAssetCondition.__name__, description="Any of", unique_id=unique_id ) - initial = ( - AssetSubset(asset_key, DefaultPartitionsSubset(set())) - if is_partitioned - else AssetSubset.empty(asset_key, None) - ) + initial = self._get_empty_subset(asset_key, is_partitioned) evaluation = AssetConditionEvaluation( condition_snapshot=decision_type_snapshot, true_subset=reduce( @@ -362,7 +361,7 @@ def _get_child_decision_type_evaluation( # In reality, we'd like to invert the inner true_subset here, but this is an # expensive operation, and error-prone as the set of all partitions may have changed # since the evaluation was stored. Instead, we just use an empty subset. - true_subset = AssetSubset(asset_key, evaluation.true_subset.subset_value.empty_subset()) + true_subset = self._get_empty_subset(asset_key, is_partitioned) else: true_subset = evaluation.true_subset._replace( value=not evaluation.true_subset.bool_value @@ -403,7 +402,14 @@ def unpack( cast(Sequence[AutoMaterializeRuleSnapshot], unpacked_dict.get("rule_snapshots", [])) or [] ) - is_partitioned = any(tup[1] is not None for tup in partition_subsets_by_condition) + is_partitioned = ( + any(tup[1] is not None for tup in partition_subsets_by_condition) + if partition_subsets_by_condition + # if we don't have any partition_subsets_by_condition to look at, we can't tell if this + # asset was partitioned at the time that the evaluation was stored, so instead we assume + # that its current partition status is the same as its partition status at storage time. + else self.partitions_def is not None + ) # get the sub-evaluations for each decision type materialize_evaluation = check.not_none(