From 5919ac6f4441482d726bc8f2b90f930071b1eed6 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Thu, 16 Nov 2023 16:10:46 -0800 Subject: [PATCH 1/6] claire/expose-partition-mapping-gql --- .../dagster_graphql/schema/asset_graph.py | 14 ++++++++++++++ .../schema/partition_mappings.py | 19 +++++++++++++++++++ .../_core/definitions/partition_mapping.py | 3 +++ .../time_window_partition_mapping.py | 10 ++++++++++ 4 files changed, 46 insertions(+) create mode 100644 python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index ce5a2f937bb31..761a55e9fcff5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -13,6 +13,7 @@ ) from dagster._core.definitions.external_asset_graph import ExternalAssetGraph from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition +from dagster._core.definitions.partition_mapping import PartitionMapping from dagster._core.errors import DagsterInvariantViolationError from dagster._core.event_api import EventRecordsFilter from dagster._core.events import DagsterEventType @@ -79,6 +80,7 @@ from .errors import GrapheneAssetNotFoundError from .freshness_policy import GrapheneAssetFreshnessInfo, GrapheneFreshnessPolicy from .logs.events import GrapheneMaterializationEvent, GrapheneObservationEvent +from .partition_mappings import GraphenePartitionMapping from .pipelines.pipeline import ( GrapheneAssetPartitionStatuses, GrapheneDefaultPartitionStatuses, @@ -117,6 +119,7 @@ class Meta: asset = graphene.NonNull("dagster_graphql.schema.asset_graph.GrapheneAssetNode") inputName = graphene.NonNull(graphene.String) + partitionMapping = graphene.Field(GraphenePartitionMapping) def __init__( self, @@ -127,7 +130,9 @@ def __init__( asset_checks_loader: AssetChecksLoader, materialization_loader: Optional[BatchMaterializationLoader] = None, depended_by_loader: Optional[CrossRepoAssetDependedByLoader] = None, + partition_mapping: Optional[PartitionMapping] = None, ): + print(partition_mapping) self._repository_location = check.inst_param( repository_location, "repository_location", CodeLocation ) @@ -144,6 +149,9 @@ def __init__( self._depended_by_loader = check.opt_inst_param( depended_by_loader, "depended_by_loader", CrossRepoAssetDependedByLoader ) + self._partition_mapping = check.opt_inst_param( + partition_mapping, "partition_mapping", PartitionMapping + ) super().__init__(inputName=input_name) def resolve_asset(self, _graphene_info: ResolveInfo): @@ -160,6 +168,11 @@ def resolve_asset(self, _graphene_info: ResolveInfo): materialization_loader=self._latest_materialization_loader, ) + def resolve_partitionMapping(self, _graphene_info: ResolveInfo): + if self._partition_mapping: + return GraphenePartitionMapping(self._partition_mapping) + return None + class GrapheneAssetLatestInfo(graphene.ObjectType): id = graphene.NonNull(graphene.ID) @@ -799,6 +812,7 @@ def resolve_dependencies(self, graphene_info: ResolveInfo) -> Sequence[GrapheneA asset_key=dep.upstream_asset_key, materialization_loader=materialization_loader, asset_checks_loader=asset_checks_loader, + partition_mapping=dep.partition_mapping, ) for dep in self._external_asset_node.dependencies ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py new file mode 100644 index 0000000000000..cae9920786354 --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py @@ -0,0 +1,19 @@ +import graphene +from dagster._core.definitions.partition_mapping import PartitionMapping + + +class GraphenePartitionMapping(graphene.ObjectType): + name = graphene.NonNull(graphene.String) + description = graphene.NonNull(graphene.String) + + class Meta: + name = "PartitionMapping" + + def __init__( + self, + partition_mapping: PartitionMapping, + ): + super().__init__( + name=type(partition_mapping).__name__, + description=str(partition_mapping), + ) diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index a5690fefc84be..5c734eb4a82d0 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -903,6 +903,9 @@ def get_upstream_mapped_partitions_result_for_partitions( return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), []) + def __str__(self) -> str: + return f"Upstream partitions are dependencies of downstream partitions according to the following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}" + class InferSingleToMultiDimensionDepsResult( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index d2839f8e126d4..280acece1d8c9 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -105,6 +105,16 @@ def __new__( ), ) + def __str__(self) -> str: + description = ( + "Maps a downstream partition to any upstream partition with an overlapping time window." + ) + + if self.start_offset != 0 or self.end_offset != 0: + description += f"The start and end of the upstream time window is offsetted by {self.start_offset} and {self.end_offset} partitions respectively." + + return description + def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], From 7f33d755ce950a4c33c4a4ea037dde1f22ffc593 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Thu, 16 Nov 2023 16:33:00 -0800 Subject: [PATCH 2/6] more descriptions --- .../dagster_graphql/schema/asset_graph.py | 1 - .../_core/definitions/partition_mapping.py | 35 +++++++++++++++++++ .../time_window_partition_mapping.py | 5 ++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 761a55e9fcff5..6ac77abf2cffa 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -132,7 +132,6 @@ def __init__( depended_by_loader: Optional[CrossRepoAssetDependedByLoader] = None, partition_mapping: Optional[PartitionMapping] = None, ): - print(partition_mapping) self._repository_location = check.inst_param( repository_location, "repository_location", CodeLocation ) diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 5c734eb4a82d0..2651840377677 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -159,6 +159,12 @@ def get_downstream_partitions_for_partitions( list(downstream_partition_keys & upstream_partition_keys) ) + def __str__(self) -> str: + return ( + "Assumes upstream and downstream assets share the same partitions definition. " + "Maps each partition in the downstream asset to the same partition in the upstream asset." + ) + @whitelist_for_serdes class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [])): @@ -191,6 +197,9 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + def __str__(self) -> str: + return "Each downstream partition depends on all partitions of the upstream asset." + @whitelist_for_serdes class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", [])): @@ -236,6 +245,9 @@ def get_downstream_partitions_for_partitions( else: return downstream_partitions_def.empty_subset() + def __str__(self) -> str: + return "Each downstream partition depends on the last partition of the upstream asset." + @whitelist_for_serdes class SpecificPartitionsPartitionMapping( @@ -292,6 +304,9 @@ def get_downstream_partitions_for_partitions( ) return downstream_partitions_def.empty_subset() + def __str__(self) -> str: + return f"Each downstream partition depends on the following upstream partitions: {self.partition_keys}" + class DimensionDependency(NamedTuple): partition_mapping: PartitionMapping @@ -579,6 +594,14 @@ def __new__(cls, partition_dimension_name: Optional[str] = None): ), ) + def __str__(self): + description = ( + "Assumes that the single-dimension partitions definition is a dimension of the " + "multipartitions definition. For a single-dimension partition key X, any " + "multi-partition key with X in the matching dimension is a dependency." + ) + return description + def get_dimension_dependencies( self, upstream_partitions_def: PartitionsDefinition, @@ -728,6 +751,18 @@ def __new__( ), ) + def __str__(self) -> str: + description = "\n ".join( + [ + ( + f"Upstream dimension '{upstream_dim}' mapped to downstream dimension " + f"'{downstream_mapping.dimension_name}' using '{type(downstream_mapping).__name__}." + ) + for upstream_dim, downstream_mapping in self.downstream_mappings_by_upstream_dimension.items() + ] + ) + return description + def get_dimension_dependencies( self, upstream_partitions_def: PartitionsDefinition, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index 280acece1d8c9..0195d8e238471 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -111,7 +111,10 @@ def __str__(self) -> str: ) if self.start_offset != 0 or self.end_offset != 0: - description += f"The start and end of the upstream time window is offsetted by {self.start_offset} and {self.end_offset} partitions respectively." + description += ( + f" The start and end of the upstream time window is offsetted by " + f"{self.start_offset} and {self.end_offset} partitions respectively." + ) return description From c26729f947c56f5fe0b1f51edf566a036d68a34f Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 17 Nov 2023 10:28:21 -0800 Subject: [PATCH 3/6] add test --- .../dagster_graphql_tests/graphql/repo.py | 9 +++- .../graphql/test_assets.py | 44 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index d20dc7cc40a67..e0ef4db05b9e0 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -16,6 +16,7 @@ AssetCheckResult, AssetCheckSpec, AssetExecutionContext, + AssetIn, AssetKey, AssetMaterialization, AssetObservation, @@ -59,6 +60,7 @@ TableConstraints, TableRecord, TableSchema, + TimeWindowPartitionMapping, WeeklyPartitionsDefinition, _check as check, asset, @@ -1464,7 +1466,12 @@ def upstream_time_partitioned_asset(): return 1 -@asset(partitions_def=hourly_partition) +@asset( + partitions_def=hourly_partition, + ins={ + "upstream_time_partitioned_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping()) + }, +) def downstream_time_partitioned_asset( upstream_time_partitioned_asset, ): diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 1473c99e48e00..0c3805768c945 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -646,6 +646,29 @@ } """ +GET_ASSET_DEPENDENCIES_PARTITION_MAPPING = """ + query AssetNodeQuery($assetKey: AssetKeyInput!) { + assetNodeOrError(assetKey: $assetKey) { + ...on AssetNode { + assetKey { + path + } + dependencies { + asset { + assetKey { + path + } + } + partitionMapping { + name + description + } + } + } + } + } +""" + def _create_run( graphql_context: WorkspaceRequestContext, @@ -2376,6 +2399,27 @@ def test_get_backfill_policy(self, graphql_context: WorkspaceRequestContext): == "Backfills in multiple runs, with a maximum of 10 partitions per run" ) + def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext): + result = execute_dagster_graphql( + graphql_context, + GET_ASSET_DEPENDENCIES_PARTITION_MAPPING, + variables={ + "assetKey": {"path": ["downstream_time_partitioned_asset"]}, + }, + ) + + assert result.data["assetNodeOrError"]["assetKey"]["path"] == [ + "downstream_time_partitioned_asset" + ] + dependencies = result.data["assetNodeOrError"]["dependencies"] + assert len(dependencies) == 1 + assert dependencies[0]["asset"]["assetKey"]["path"] == ["upstream_time_partitioned_asset"] + assert dependencies[0]["partitionMapping"]["name"] == "TimeWindowPartitionMapping" + assert ( + dependencies[0]["partitionMapping"]["description"] + == "Maps a downstream partition to any upstream partition with an overlapping time window." + ) + class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix): def test_report_runless_asset_events_permissions( From ea0969b6019b0e5f6afdf965f20f1be8a3d78e3d Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 17 Nov 2023 10:29:31 -0800 Subject: [PATCH 4/6] make gql --- .../ui-core/src/graphql/schema.graphql | 6 ++++ .../packages/ui-core/src/graphql/types.ts | 29 +++++++++++++++++++ .../_core/definitions/partition_mapping.py | 5 +++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index cb9fc3a5c90d6..c8a29eac6821a 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -713,6 +713,12 @@ enum BackfillPolicyType { type AssetDependency { asset: AssetNode! inputName: String! + partitionMapping: PartitionMapping +} + +type PartitionMapping { + name: String! + description: String! } type AssetFreshnessInfo { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 6af56ee9bd66d..1476d8fe026e1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -253,6 +253,7 @@ export type AssetDependency = { __typename: 'AssetDependency'; asset: AssetNode; inputName: Scalars['String']; + partitionMapping: Maybe; }; export enum AssetEventType { @@ -2454,6 +2455,12 @@ export type PartitionKeys = { export type PartitionKeysOrError = PartitionKeys | PartitionSubsetDeserializationError; +export type PartitionMapping = { + __typename: 'PartitionMapping'; + description: Scalars['String']; + name: Scalars['String']; +}; + export type PartitionRangeSelector = { end: Scalars['String']; start: Scalars['String']; @@ -4895,6 +4902,12 @@ export const buildAssetDependency = ( : buildAssetNode({}, relationshipsToOmit), inputName: overrides && overrides.hasOwnProperty('inputName') ? overrides.inputName! : 'aspernatur', + partitionMapping: + overrides && overrides.hasOwnProperty('partitionMapping') + ? overrides.partitionMapping! + : relationshipsToOmit.has('PartitionMapping') + ? ({} as PartitionMapping) + : buildPartitionMapping({}, relationshipsToOmit), }; }; @@ -9117,6 +9130,22 @@ export const buildPartitionKeys = ( }; }; +export const buildPartitionMapping = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'PartitionMapping'} & PartitionMapping => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PartitionMapping'); + return { + __typename: 'PartitionMapping', + description: + overrides && overrides.hasOwnProperty('description') + ? overrides.description! + : 'voluptatibus', + name: overrides && overrides.hasOwnProperty('name') ? overrides.name! : 'non', + }; +}; + export const buildPartitionRangeSelector = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 2651840377677..bac99e551eaca 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -939,7 +939,10 @@ def get_upstream_mapped_partitions_result_for_partitions( return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), []) def __str__(self) -> str: - return f"Upstream partitions are dependencies of downstream partitions according to the following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}" + return ( + f"Maps upstream partitions to their downstream dependencies according to the " + f"following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}" + ) class InferSingleToMultiDimensionDepsResult( From a02f8da976035dbb33b084724c2207ab0bd08227 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Wed, 22 Nov 2023 16:32:26 -0800 Subject: [PATCH 5/6] address pr feedback, use description property instead --- .../ui-core/src/graphql/schema.graphql | 2 +- .../packages/ui-core/src/graphql/types.ts | 4 +- .../dagster_graphql/schema/asset_graph.py | 4 +- .../schema/partition_mappings.py | 6 +-- .../graphql/test_assets.py | 4 +- .../_core/definitions/partition_mapping.py | 37 ++++++++++++------- .../time_window_partition_mapping.py | 9 +++-- .../test_multipartition_partition_mapping.py | 23 ++++++++++++ 8 files changed, 62 insertions(+), 27 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index c8a29eac6821a..8db184dedb942 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -717,7 +717,7 @@ type AssetDependency { } type PartitionMapping { - name: String! + className: String! description: String! } diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 1476d8fe026e1..a211c192f5c73 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2457,8 +2457,8 @@ export type PartitionKeysOrError = PartitionKeys | PartitionSubsetDeserializatio export type PartitionMapping = { __typename: 'PartitionMapping'; + className: Scalars['String']; description: Scalars['String']; - name: Scalars['String']; }; export type PartitionRangeSelector = { @@ -9138,11 +9138,11 @@ export const buildPartitionMapping = ( relationshipsToOmit.add('PartitionMapping'); return { __typename: 'PartitionMapping', + className: overrides && overrides.hasOwnProperty('className') ? overrides.className! : 'quos', description: overrides && overrides.hasOwnProperty('description') ? overrides.description! : 'voluptatibus', - name: overrides && overrides.hasOwnProperty('name') ? overrides.name! : 'non', }; }; diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 6ac77abf2cffa..b4c3ee4d93c61 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -167,7 +167,9 @@ def resolve_asset(self, _graphene_info: ResolveInfo): materialization_loader=self._latest_materialization_loader, ) - def resolve_partitionMapping(self, _graphene_info: ResolveInfo): + def resolve_partitionMapping( + self, _graphene_info: ResolveInfo + ) -> Optional[GraphenePartitionMapping]: if self._partition_mapping: return GraphenePartitionMapping(self._partition_mapping) return None diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py index cae9920786354..92cfe194b7b0d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py @@ -3,7 +3,7 @@ class GraphenePartitionMapping(graphene.ObjectType): - name = graphene.NonNull(graphene.String) + className = graphene.NonNull(graphene.String) description = graphene.NonNull(graphene.String) class Meta: @@ -14,6 +14,6 @@ def __init__( partition_mapping: PartitionMapping, ): super().__init__( - name=type(partition_mapping).__name__, - description=str(partition_mapping), + className=type(partition_mapping).__name__, + description=partition_mapping.description, ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 0c3805768c945..b92d6efc7858c 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -660,7 +660,7 @@ } } partitionMapping { - name + className description } } @@ -2414,7 +2414,7 @@ def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext): dependencies = result.data["assetNodeOrError"]["dependencies"] assert len(dependencies) == 1 assert dependencies[0]["asset"]["assetKey"]["path"] == ["upstream_time_partitioned_asset"] - assert dependencies[0]["partitionMapping"]["name"] == "TimeWindowPartitionMapping" + assert dependencies[0]["partitionMapping"]["className"] == "TimeWindowPartitionMapping" assert ( dependencies[0]["partitionMapping"]["description"] == "Maps a downstream partition to any upstream partition with an overlapping time window." diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index bac99e551eaca..b62a13ead1383 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -1,6 +1,6 @@ import collections.abc import itertools -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty from collections import defaultdict from datetime import datetime from typing import ( @@ -99,6 +99,10 @@ def get_upstream_mapped_partitions_result_for_partitions( when downstream_partitions_subset contains 2023-05-01 and 2023-06-01. """ + @abstractproperty + def description(self) -> str: + """A human-readable description of the partition mapping, displayed in the Dagster UI""" + @whitelist_for_serdes class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionMapping", [])): @@ -159,7 +163,8 @@ def get_downstream_partitions_for_partitions( list(downstream_partition_keys & upstream_partition_keys) ) - def __str__(self) -> str: + @property + def description(self) -> str: return ( "Assumes upstream and downstream assets share the same partitions definition. " "Maps each partition in the downstream asset to the same partition in the upstream asset." @@ -197,7 +202,8 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() - def __str__(self) -> str: + @property + def description(self) -> str: return "Each downstream partition depends on all partitions of the upstream asset." @@ -245,7 +251,8 @@ def get_downstream_partitions_for_partitions( else: return downstream_partitions_def.empty_subset() - def __str__(self) -> str: + @property + def description(self) -> str: return "Each downstream partition depends on the last partition of the upstream asset." @@ -304,7 +311,8 @@ def get_downstream_partitions_for_partitions( ) return downstream_partitions_def.empty_subset() - def __str__(self) -> str: + @property + def description(self) -> str: return f"Each downstream partition depends on the following upstream partitions: {self.partition_keys}" @@ -594,13 +602,13 @@ def __new__(cls, partition_dimension_name: Optional[str] = None): ), ) - def __str__(self): - description = ( + @property + def description(self) -> str: + return ( "Assumes that the single-dimension partitions definition is a dimension of the " - "multipartitions definition. For a single-dimension partition key X, any " + "multi-partitions definition. For a single-dimension partition key X, any " "multi-partition key with X in the matching dimension is a dependency." ) - return description def get_dimension_dependencies( self, @@ -751,17 +759,17 @@ def __new__( ), ) - def __str__(self) -> str: - description = "\n ".join( + @property + def description(self) -> str: + return "\n ".join( [ ( f"Upstream dimension '{upstream_dim}' mapped to downstream dimension " - f"'{downstream_mapping.dimension_name}' using '{type(downstream_mapping).__name__}." + f"'{downstream_mapping.dimension_name}' using {type(downstream_mapping.partition_mapping).__name__}." ) for upstream_dim, downstream_mapping in self.downstream_mappings_by_upstream_dimension.items() ] ) - return description def get_dimension_dependencies( self, @@ -938,7 +946,8 @@ def get_upstream_mapped_partitions_result_for_partitions( return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), []) - def __str__(self) -> str: + @property + def description(self) -> str: return ( f"Maps upstream partitions to their downstream dependencies according to the " f"following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}" diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index 0195d8e238471..b7596a9fb8d29 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -105,18 +105,19 @@ def __new__( ), ) - def __str__(self) -> str: - description = ( + @property + def description(self) -> str: + description_str = ( "Maps a downstream partition to any upstream partition with an overlapping time window." ) if self.start_offset != 0 or self.end_offset != 0: - description += ( + description_str += ( f" The start and end of the upstream time window is offsetted by " f"{self.start_offset} and {self.end_offset} partitions respectively." ) - return description + return description_str def get_upstream_mapped_partitions_result_for_partitions( self, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py index 25226168cdb21..17279c0e00926 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py @@ -953,3 +953,26 @@ def test_dynamic_dimension_multipartition_mapping(): dynamic_partitions_store=instance, ) assert result.partitions_subset == foo_bar.empty_subset().with_partition_keys(["2|a", "1|a"]) + + +def test_description(): + description = MultiPartitionMapping( + { + "abc": DimensionPartitionMapping( + dimension_name="abc", partition_mapping=IdentityPartitionMapping() + ), + "daily": DimensionPartitionMapping( + dimension_name="weekly", + partition_mapping=TimeWindowPartitionMapping(), + ), + } + ).description + assert ( + "'abc' mapped to downstream dimension 'abc' using IdentityPartitionMapping" in description + ) + assert ( + "'daily' mapped to downstream dimension 'weekly' using TimeWindowPartitionMapping" + in description + ) + + assert MultiPartitionMapping({}).description == "" From aeb1b721eb35d79b6792fa7f1c6fc64eed738a57 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 27 Nov 2023 09:30:15 -0800 Subject: [PATCH 6/6] fixes --- .../dagster/dagster/_core/definitions/partition_mapping.py | 3 ++- .../partition_mapping_tests/test_asset_partition_mappings.py | 4 ++++ .../dagster_tests/asset_defs_tests/test_asset_graph.py | 4 ++++ .../dagster/dagster_tests/storage_tests/test_fs_io_manager.py | 4 ++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index b62a13ead1383..66cc6092a7a56 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -101,7 +101,8 @@ def get_upstream_mapped_partitions_result_for_partitions( @abstractproperty def description(self) -> str: - """A human-readable description of the partition mapping, displayed in the Dagster UI""" + """A human-readable description of the partition mapping, displayed in the Dagster UI.""" + raise NotImplementedError() @whitelist_for_serdes diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 9ae28328803c3..7c1a07f8916a7 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -91,6 +91,10 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + class MyIOManager(IOManager): def handle_output(self, context, obj): assert context.asset_partition_key == "2" diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 2b80ce3ca60f5..48fa89820ea1d 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -240,6 +240,10 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + @asset(partitions_def=StaticPartitionsDefinition(["1", "2", "3"])) def parent(): ... diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index 173c50405345b..ab5b225455b0b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -312,6 +312,10 @@ def get_downstream_partitions_for_partitions( ): raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + partitions_def = DailyPartitionsDefinition(start_date="2020-02-01") @asset(partitions_def=partitions_def)