diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py index b96f42a563e6e..b04af439c46ac 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_condition_evaluations.py @@ -10,7 +10,6 @@ GrapheneAssetConditionEvaluationRecord, GrapheneAssetConditionEvaluationRecords, GrapheneAssetConditionEvaluationRecordsOrError, - GrapheneSpecificPartitionAssetConditionEvaluation, ) from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, @@ -86,8 +85,11 @@ def fetch_asset_condition_evaluation_record_for_partition( if asset_node and asset_node.external_asset_node.partitions_def_data else None ) - return GrapheneSpecificPartitionAssetConditionEvaluation( - record.get_evaluation_with_run_ids(partitions_def).evaluation, partition_key + return GrapheneAssetConditionEvaluation( + record.get_evaluation_with_run_ids(partitions_def).evaluation, + partitions_def, + graphene_info.context.instance, + partition_key, ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py index 9b81ff2204b70..e5e14d39519f4 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py @@ -1,4 +1,5 @@ import enum +import itertools from typing import Optional, Sequence, Union import graphene @@ -10,6 +11,7 @@ from dagster._core.instance import DynamicPartitionsStore from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord +from dagster_graphql.implementation.events import iterate_metadata_entries from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationNeedsMigrationError, ) @@ -48,6 +50,7 @@ def __init__(self, value: Union[bool, PartitionsSubset]): GraphenePartitionKeyRange(start, end) for start, end in value.get_partition_key_ranges(value.partitions_def) ] + partition_keys = value.get_partition_keys() else: partition_keys = value.get_partition_keys() @@ -75,7 +78,8 @@ def __init__(self, asset_subset: AssetSubset): ) -class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType): +class GrapheneUnpartitionedAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) startTimestamp = graphene.Field(graphene.Float) @@ -84,29 +88,31 @@ class GrapheneUnpartitionedAssetConditionEvaluation(graphene.ObjectType): metadataEntries = non_null_list(GrapheneMetadataEntry) status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GrapheneUnpartitionedAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "UnpartitionedAssetConditionEvaluation" + name = "UnpartitionedAssetConditionEvaluationNode" def __init__(self, evaluation: AssetConditionEvaluation): + self._evaluation = evaluation if evaluation.true_subset.bool_value: status = AssetConditionEvaluationStatus.TRUE - elif evaluation.candidate_subset and evaluation.candidate_subset.bool_value: + elif ( + isinstance(evaluation.candidate_subset, AssetSubset) + and evaluation.candidate_subset.bool_value + ): status = AssetConditionEvaluationStatus.FALSE else: status = AssetConditionEvaluationStatus.SKIPPED super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, startTimestamp=evaluation.start_timestamp, endTimestamp=evaluation.end_timestamp, status=status, - childEvaluations=[ - GrapheneUnpartitionedAssetConditionEvaluation(child) - for child in evaluation.child_evaluations + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) @@ -117,10 +123,11 @@ def resolve_metadataEntries( (subset.metadata for subset in self._evaluation.subsets_with_metadata), {}, ) - return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + return list(iterate_metadata_entries(metadata)) -class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType): +class GraphenePartitionedAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) startTimestamp = graphene.Field(graphene.Float) @@ -134,12 +141,10 @@ class GraphenePartitionedAssetConditionEvaluation(graphene.ObjectType): numFalse = graphene.NonNull(graphene.Int) numSkipped = graphene.NonNull(graphene.Int) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GraphenePartitionedAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "PartitionedAssetConditionEvaluation" + name = "PartitionedAssetConditionEvaluationNode" def __init__( self, @@ -154,20 +159,22 @@ def __init__( evaluation.asset_key, partitions_def, dynamic_partitions_store, pendulum.now("UTC") ) - # if the candidate_subset is unset, then we evaluated all partitions - self._candidate_subset = evaluation.candidate_subset or self._all_subset + # if the candidate_subset is a HistoricalAssetSubset, then we evaluated all partitions + self._candidate_subset = ( + evaluation.candidate_subset + if isinstance(evaluation.candidate_subset, AssetSubset) + else self._all_subset + ) super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, startTimestamp=evaluation.start_timestamp, endTimestamp=evaluation.end_timestamp, trueSubset=GrapheneAssetSubset(evaluation.true_subset), candidateSubset=GrapheneAssetSubset(self._candidate_subset), - childEvaluations=[ - GraphenePartitionedAssetConditionEvaluation( - child, partitions_def, dynamic_partitions_store - ) - for child in evaluation.child_evaluations + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) @@ -184,18 +191,17 @@ def resolve_numSkipped(self, graphene_info: ResolveInfo) -> int: return self._all_subset.size - self._candidate_subset.size -class GrapheneSpecificPartitionAssetConditionEvaluation(graphene.ObjectType): +class GrapheneSpecificPartitionAssetConditionEvaluationNode(graphene.ObjectType): + uniqueId = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) metadataEntries = non_null_list(GrapheneMetadataEntry) status = graphene.NonNull(GrapheneAssetConditionEvaluationStatus) - childEvaluations = graphene.Field( - graphene.List(graphene.NonNull(lambda: GrapheneSpecificPartitionAssetConditionEvaluation)) - ) + childUniqueIds = non_null_list(graphene.String) class Meta: - name = "SpecificPartitionAssetConditionEvaluation" + name = "SpecificPartitionAssetConditionEvaluationNode" def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): self._evaluation = evaluation @@ -204,7 +210,7 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): if partition_key in evaluation.true_subset.subset_value: status = AssetConditionEvaluationStatus.TRUE elif ( - evaluation.candidate_subset is None + not isinstance(evaluation.candidate_subset, AssetSubset) or partition_key in evaluation.candidate_subset.subset_value ): status = AssetConditionEvaluationStatus.FALSE @@ -212,11 +218,11 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str): status = AssetConditionEvaluationStatus.SKIPPED super().__init__( + uniqueId=evaluation.condition_snapshot.unique_id, description=evaluation.condition_snapshot.description, status=status, - childEvaluations=[ - GrapheneSpecificPartitionAssetConditionEvaluation(child, partition_key) - for child in evaluation.child_evaluations + childUniqueIds=[ + child.condition_snapshot.unique_id for child in evaluation.child_evaluations ], ) @@ -232,18 +238,63 @@ def resolve_metadataEntries( ), {}, ) - return [GrapheneMetadataEntry(key=key, value=value) for key, value in metadata.items()] + return list(iterate_metadata_entries(metadata)) -class GrapheneAssetConditionEvaluation(graphene.Union): +class GrapheneAssetConditionEvaluationNode(graphene.Union): class Meta: types = ( - GrapheneUnpartitionedAssetConditionEvaluation, - GraphenePartitionedAssetConditionEvaluation, - GrapheneSpecificPartitionAssetConditionEvaluation, + GrapheneUnpartitionedAssetConditionEvaluationNode, + GraphenePartitionedAssetConditionEvaluationNode, + GrapheneSpecificPartitionAssetConditionEvaluationNode, ) + name = "AssetConditionEvaluationNode" + + +class GrapheneAssetConditionEvaluation(graphene.ObjectType): + rootUniqueId = graphene.NonNull(graphene.String) + evaluationNodes = non_null_list(GrapheneAssetConditionEvaluationNode) + + class Meta: name = "AssetConditionEvaluation" + def __init__( + self, + evaluation: AssetConditionEvaluation, + partitions_def: Optional[PartitionsDefinition], + dynamic_partitions_store: DynamicPartitionsStore, + partition_key: Optional[str] = None, + ): + # flatten the evaluation tree into a list of nodes + def _flatten(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]: + return list(itertools.chain([e], *(_flatten(ce) for ce in e.child_evaluations))) + + all_nodes = _flatten(evaluation) + + if evaluation.true_subset.is_partitioned: + if partition_key is None: + evaluationNodes = [ + GraphenePartitionedAssetConditionEvaluationNode( + evaluation, partitions_def, dynamic_partitions_store + ) + for evaluation in all_nodes + ] + else: + evaluationNodes = [ + GrapheneSpecificPartitionAssetConditionEvaluationNode(evaluation, partition_key) + for evaluation in all_nodes + ] + else: + evaluationNodes = [ + GrapheneUnpartitionedAssetConditionEvaluationNode(evaluation) + for evaluation in all_nodes + ] + + super().__init__( + rootUniqueId=evaluation.condition_snapshot.unique_id, + evaluationNodes=evaluationNodes, + ) + class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType): id = graphene.NonNull(graphene.ID) @@ -254,6 +305,9 @@ class GrapheneAssetConditionEvaluationRecord(graphene.ObjectType): assetKey = graphene.NonNull(GrapheneAssetKey) numRequested = graphene.NonNull(graphene.Int) + startTimestamp = graphene.Field(graphene.Float) + endTimestamp = graphene.Field(graphene.Float) + evaluation = graphene.NonNull(GrapheneAssetConditionEvaluation) class Meta: @@ -264,22 +318,8 @@ def __init__( record: AutoMaterializeAssetEvaluationRecord, partitions_def: Optional[PartitionsDefinition], dynamic_partitions_store: DynamicPartitionsStore, - partition_key: Optional[str] = None, ): evaluation_with_run_ids = record.get_evaluation_with_run_ids(partitions_def) - if evaluation_with_run_ids.evaluation.true_subset.is_partitioned: - if partition_key is None: - evaluation = GraphenePartitionedAssetConditionEvaluation( - evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store - ) - else: - evaluation = GrapheneSpecificPartitionAssetConditionEvaluation( - evaluation_with_run_ids.evaluation, partition_key - ) - else: - evaluation = GrapheneUnpartitionedAssetConditionEvaluation( - evaluation_with_run_ids.evaluation - ) super().__init__( id=record.id, @@ -288,7 +328,11 @@ def __init__( runIds=evaluation_with_run_ids.run_ids, assetKey=GrapheneAssetKey(path=record.asset_key.path), numRequested=evaluation_with_run_ids.evaluation.true_subset.size, - evaluation=evaluation, + startTimestamp=evaluation_with_run_ids.evaluation.start_timestamp, + endTimestamp=evaluation_with_run_ids.evaluation.end_timestamp, + evaluation=GrapheneAssetConditionEvaluation( + evaluation_with_run_ids.evaluation, partitions_def, dynamic_partitions_store + ), ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index e0baa9225b6ec..9228b6a19cf28 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -36,8 +36,8 @@ from dagster_graphql.implementation.fetch_logs import get_captured_log_metadata from dagster_graphql.implementation.fetch_runs import get_assets_latest_info from dagster_graphql.schema.asset_condition_evaluations import ( + GrapheneAssetConditionEvaluation, GrapheneAssetConditionEvaluationRecordsOrError, - GrapheneSpecificPartitionAssetConditionEvaluation, ) from dagster_graphql.schema.auto_materialize_asset_evaluations import ( GrapheneAutoMaterializeAssetEvaluationRecordsOrError, @@ -523,7 +523,7 @@ class Meta: ) assetConditionEvaluationForPartition = graphene.Field( - GrapheneSpecificPartitionAssetConditionEvaluation, + GrapheneAssetConditionEvaluation, assetKey=graphene.Argument(graphene.NonNull(GrapheneAssetKeyInput)), evaluationId=graphene.Argument(graphene.NonNull(graphene.Int)), partition=graphene.Argument(graphene.NonNull(graphene.String)), diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index d62d4b8524bcf..f45f718058a9d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -1,4 +1,5 @@ -from typing import Optional, Sequence +import random +from typing import Any, Mapping, Optional, Sequence from unittest.mock import PropertyMock, patch import dagster._check as check @@ -213,63 +214,44 @@ def test_get_tick_range(self, graphql_context): FRAGMENTS = """ -fragment unpartitionedEvaluationFields on UnpartitionedAssetConditionEvaluation { - description - startTimestamp - endTimestamp - status -} - -fragment partitionedEvaluationFields on PartitionedAssetConditionEvaluation { - description - startTimestamp - endTimestamp - numTrue - numFalse - numSkipped - trueSubset { - subsetValue { - isPartitioned - partitionKeys - } - } - falseSubset { - subsetValue { - isPartitioned - partitionKeys - } - } -} - fragment evaluationFields on AssetConditionEvaluation { - ... on UnpartitionedAssetConditionEvaluation { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - childEvaluations { - ...unpartitionedEvaluationFields - } + rootUniqueId + evaluationNodes { + ... on UnpartitionedAssetConditionEvaluationNode { + description + startTimestamp + endTimestamp + status + uniqueId + childUniqueIds + } + ... on PartitionedAssetConditionEvaluationNode { + description + startTimestamp + endTimestamp + numTrue + numFalse + numSkipped + trueSubset { + subsetValue { + isPartitioned + partitionKeys } } - } - } - ... on PartitionedAssetConditionEvaluation { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - childEvaluations { - ...partitionedEvaluationFields - } + falseSubset { + subsetValue { + isPartitioned + partitionKeys } } + uniqueId + childUniqueIds + } + ... on SpecificPartitionAssetConditionEvaluationNode { + description + status + uniqueId + childUniqueIds } } } @@ -304,29 +286,16 @@ def test_get_tick_range(self, graphql_context): """ ) -QUERY_FOR_SPECIFIC_PARTITION = """ -fragment specificPartitionEvaluationFields on SpecificPartitionAssetConditionEvaluation { - description - status -} +QUERY_FOR_SPECIFIC_PARTITION = ( + FRAGMENTS + + """ query GetPartitionEvaluationQuery($assetKey: AssetKeyInput!, $partition: String!, $evaluationId: Int!) { assetConditionEvaluationForPartition(assetKey: $assetKey, partition: $partition, evaluationId: $evaluationId) { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - childEvaluations { - ...specificPartitionEvaluationFields - } - } - } - } + ...evaluationFields } } """ +) QUERY_FOR_EVALUATION_ID = ( FRAGMENTS @@ -416,7 +385,7 @@ def test_get_historic_rules_without_evaluation_data( assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 asset_one_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] assert asset_one_record["assetKey"] == {"path": ["asset_one"]} - assert asset_one_record["evaluation"]["status"] == "SKIPPED" + assert asset_one_record["evaluation"]["evaluationNodes"][0]["status"] == "SKIPPED" results = execute_dagster_graphql( graphql_context, @@ -425,16 +394,22 @@ def test_get_historic_rules_without_evaluation_data( ) assert len(results.data["assetConditionEvaluationRecordsOrError"]["records"]) == 1 asset_two_record = results.data["assetConditionEvaluationRecordsOrError"]["records"][0] - assert asset_two_record["evaluation"]["description"] == "All of" - assert asset_two_record["evaluation"]["status"] == "SKIPPED" - asset_two_children = asset_two_record["evaluation"]["childEvaluations"] - assert len(asset_two_children) == 2 - assert asset_two_children[0]["description"] == "Any of" - assert asset_two_children[0]["status"] == "SKIPPED" - assert ( - asset_two_children[0]["childEvaluations"][0]["description"] - == "materialization is missing" + asset_two_root = asset_two_record["evaluation"]["evaluationNodes"][0] + + assert asset_two_root["description"] == "All of" + assert asset_two_root["status"] == "SKIPPED" + assert len(asset_two_root["childUniqueIds"]) == 2 + + asset_two_child = self._get_node( + asset_two_root["childUniqueIds"][0], asset_two_record["evaluation"]["evaluationNodes"] ) + assert asset_two_child["description"] == "Any of" + assert asset_two_child["status"] == "SKIPPED" + + asset_two_missing_node = self._get_node( + asset_two_child["childUniqueIds"][0], asset_two_record["evaluation"]["evaluationNodes"] + ) + assert asset_two_missing_node["description"] == "materialization is missing" results = execute_dagster_graphql( graphql_context, @@ -450,6 +425,7 @@ def test_get_historic_rules_without_evaluation_data( assert any(record == asset_one_record for record in records) assert any(record == asset_two_record for record in records) + # this evaluationId doesn't exist results = execute_dagster_graphql( graphql_context, QUERY_FOR_EVALUATION_ID, @@ -485,17 +461,24 @@ def test_get_historic_evaluation_with_evaluation_data( records = results.data["assetConditionEvaluationRecordsOrError"]["records"] assert len(records) == 1 + evaluation = records[0]["evaluation"] - assert evaluation["numTrue"] == 0 - assert evaluation["numFalse"] == 6 - assert evaluation["numSkipped"] == 0 - assert len(evaluation["childEvaluations"]) == 2 - not_skip_evaluation = evaluation["childEvaluations"][1] - assert not_skip_evaluation["description"] == "Not" - assert not_skip_evaluation["numTrue"] == 1 - assert len(not_skip_evaluation["childEvaluations"]) == 1 - assert not_skip_evaluation["childEvaluations"][0]["description"] == "Any of" - assert len(not_skip_evaluation["childEvaluations"][0]["childEvaluations"]) == 2 + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + + assert rootNode["numTrue"] == 0 + assert rootNode["numFalse"] == 6 + assert rootNode["numSkipped"] == 0 + assert len(rootNode["childUniqueIds"]) == 2 + + notSkipNode = self._get_node(rootNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert notSkipNode["description"] == "Not" + assert notSkipNode["numTrue"] == 1 + assert len(notSkipNode["childUniqueIds"]) == 1 + + skipNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert len(skipNode["childUniqueIds"]) == 2 def test_get_evaluations(self, graphql_context: WorkspaceRequestContext): evaluation1 = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( @@ -658,6 +641,11 @@ def test_get_evaluations(self, graphql_context: WorkspaceRequestContext): }, } + def _get_node( + self, unique_id: str, evaluations: Sequence[Mapping[str, Any]] + ) -> Mapping[str, Any]: + return next(iter([node for node in evaluations if node["uniqueId"] == unique_id])) + def _get_condition_evaluation( self, asset_key: AssetKey, @@ -668,7 +656,9 @@ def _get_condition_evaluation( child_evaluations: Optional[Sequence[AssetConditionEvaluation]] = None, ) -> AssetConditionEvaluation: return AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("...", description, "a1b2"), + condition_snapshot=AssetConditionSnapshot( + "...", description, str(random.randint(0, 100000000)) + ), true_subset=AssetSubset( asset_key=asset_key, value=partitions_def.subset_with_partition_keys(true_partition_keys), @@ -782,26 +772,32 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest assert records[0]["numRequested"] == 2 evaluation = records[0]["evaluation"] - assert evaluation["description"] == "All of" - assert evaluation["numTrue"] == 2 - assert evaluation["numFalse"] == 4 - assert evaluation["numSkipped"] == 0 - assert set(evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} - assert len(evaluation["childEvaluations"]) == 2 - - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["numTrue"] == 2 - assert not_evaluation["numFalse"] == 1 - assert not_evaluation["numSkipped"] == 3 - assert set(not_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} - - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["numTrue"] == 1 - assert skip_evaluation["numFalse"] == 2 - assert skip_evaluation["numSkipped"] == 3 - assert set(skip_evaluation["trueSubset"]["subsetValue"]["partitionKeys"]) == {"c"} + + # all nodes in the tree + assert len(evaluation["evaluationNodes"]) == 9 + + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + assert rootNode["description"] == "All of" + assert rootNode["numTrue"] == 2 + assert rootNode["numFalse"] == 4 + assert rootNode["numSkipped"] == 0 + assert set(rootNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + assert len(rootNode["childUniqueIds"]) == 2 + + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["numTrue"] == 2 + assert notNode["numFalse"] == 1 + assert notNode["numSkipped"] == 3 + assert set(notNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"a", "b"} + + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["numTrue"] == 1 + assert skipNode["numFalse"] == 2 + assert skipNode["numSkipped"] == 3 + assert set(skipNode["trueSubset"]["subsetValue"]["partitionKeys"]) == {"c"} # test one of the true partitions specific_result = execute_dagster_graphql( @@ -815,17 +811,22 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest ) evaluation = specific_result.data["assetConditionEvaluationForPartition"] - assert evaluation["description"] == "All of" - assert evaluation["status"] == "TRUE" - assert len(evaluation["childEvaluations"]) == 2 + assert len(evaluation["evaluationNodes"]) == 9 + + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + + assert rootNode["description"] == "All of" + assert rootNode["status"] == "TRUE" + assert len(rootNode["childUniqueIds"]) == 2 - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["status"] == "TRUE" + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["status"] == "TRUE" - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["status"] == "FALSE" + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["status"] == "FALSE" # test one of the false partitions specific_result = execute_dagster_graphql( @@ -839,17 +840,22 @@ def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequest ) evaluation = specific_result.data["assetConditionEvaluationForPartition"] - assert evaluation["description"] == "All of" - assert evaluation["status"] == "FALSE" - assert len(evaluation["childEvaluations"]) == 2 + assert len(evaluation["evaluationNodes"]) == 9 + + rootNode = evaluation["evaluationNodes"][0] + assert rootNode["uniqueId"] == evaluation["rootUniqueId"] + + assert rootNode["description"] == "All of" + assert rootNode["status"] == "FALSE" + assert len(rootNode["childUniqueIds"]) == 2 - not_evaluation = evaluation["childEvaluations"][1] - assert not_evaluation["description"] == "Not" - assert not_evaluation["status"] == "SKIPPED" + notNode = self._get_node(rootNode["childUniqueIds"][1], evaluation["evaluationNodes"]) + assert notNode["description"] == "Not" + assert notNode["status"] == "SKIPPED" - skip_evaluation = not_evaluation["childEvaluations"][0] - assert skip_evaluation["description"] == "Any of" - assert skip_evaluation["status"] == "SKIPPED" + skipNode = self._get_node(notNode["childUniqueIds"][0], evaluation["evaluationNodes"]) + assert skipNode["description"] == "Any of" + assert skipNode["status"] == "SKIPPED" def _test_current_evaluation_id(self, graphql_context: WorkspaceRequestContext): graphql_context.instance.daemon_cursor_storage.set_cursor_values(