diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index c587ac9dea652..8db96aabc9d61 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -713,6 +713,12 @@ enum BackfillPolicyType { type AssetDependency { asset: AssetNode! inputName: String! + partitionMapping: PartitionMapping +} + +type PartitionMapping { + className: String! + description: String! } type AssetFreshnessInfo { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index e100bd492dba5..1caa06136569d 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -253,6 +253,7 @@ export type AssetDependency = { __typename: 'AssetDependency'; asset: AssetNode; inputName: Scalars['String']; + partitionMapping: Maybe; }; export enum AssetEventType { @@ -2454,6 +2455,12 @@ export type PartitionKeys = { export type PartitionKeysOrError = PartitionKeys | PartitionSubsetDeserializationError; +export type PartitionMapping = { + __typename: 'PartitionMapping'; + className: Scalars['String']; + description: Scalars['String']; +}; + export type PartitionRangeSelector = { end: Scalars['String']; start: Scalars['String']; @@ -4896,6 +4903,12 @@ export const buildAssetDependency = ( : buildAssetNode({}, relationshipsToOmit), inputName: overrides && overrides.hasOwnProperty('inputName') ? overrides.inputName! : 'aspernatur', + partitionMapping: + overrides && overrides.hasOwnProperty('partitionMapping') + ? overrides.partitionMapping! + : relationshipsToOmit.has('PartitionMapping') + ? ({} as PartitionMapping) + : buildPartitionMapping({}, relationshipsToOmit), }; }; @@ -9118,6 +9131,22 @@ export const buildPartitionKeys = ( }; }; +export const buildPartitionMapping = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'PartitionMapping'} & PartitionMapping => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PartitionMapping'); + return { + __typename: 'PartitionMapping', + className: overrides && overrides.hasOwnProperty('className') ? overrides.className! : 'quos', + description: + overrides && overrides.hasOwnProperty('description') + ? overrides.description! + : 'voluptatibus', + }; +}; + export const buildPartitionRangeSelector = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index ce5a2f937bb31..b4c3ee4d93c61 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -13,6 +13,7 @@ ) from dagster._core.definitions.external_asset_graph import ExternalAssetGraph from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition +from dagster._core.definitions.partition_mapping import PartitionMapping from dagster._core.errors import DagsterInvariantViolationError from dagster._core.event_api import EventRecordsFilter from dagster._core.events import DagsterEventType @@ -79,6 +80,7 @@ from .errors import GrapheneAssetNotFoundError from .freshness_policy import GrapheneAssetFreshnessInfo, GrapheneFreshnessPolicy from .logs.events import GrapheneMaterializationEvent, GrapheneObservationEvent +from .partition_mappings import GraphenePartitionMapping from .pipelines.pipeline import ( GrapheneAssetPartitionStatuses, GrapheneDefaultPartitionStatuses, @@ -117,6 +119,7 @@ class Meta: asset = graphene.NonNull("dagster_graphql.schema.asset_graph.GrapheneAssetNode") inputName = graphene.NonNull(graphene.String) + partitionMapping = graphene.Field(GraphenePartitionMapping) def __init__( self, @@ -127,6 +130,7 @@ def __init__( asset_checks_loader: AssetChecksLoader, materialization_loader: Optional[BatchMaterializationLoader] = None, depended_by_loader: Optional[CrossRepoAssetDependedByLoader] = None, + partition_mapping: Optional[PartitionMapping] = None, ): self._repository_location = check.inst_param( repository_location, "repository_location", CodeLocation @@ -144,6 +148,9 @@ def __init__( self._depended_by_loader = check.opt_inst_param( depended_by_loader, "depended_by_loader", CrossRepoAssetDependedByLoader ) + self._partition_mapping = check.opt_inst_param( + partition_mapping, "partition_mapping", PartitionMapping + ) super().__init__(inputName=input_name) def resolve_asset(self, _graphene_info: ResolveInfo): @@ -160,6 +167,13 @@ def resolve_asset(self, _graphene_info: ResolveInfo): materialization_loader=self._latest_materialization_loader, ) + def resolve_partitionMapping( + self, _graphene_info: ResolveInfo + ) -> Optional[GraphenePartitionMapping]: + if self._partition_mapping: + return GraphenePartitionMapping(self._partition_mapping) + return None + class GrapheneAssetLatestInfo(graphene.ObjectType): id = graphene.NonNull(graphene.ID) @@ -799,6 +813,7 @@ def resolve_dependencies(self, graphene_info: ResolveInfo) -> Sequence[GrapheneA asset_key=dep.upstream_asset_key, materialization_loader=materialization_loader, asset_checks_loader=asset_checks_loader, + partition_mapping=dep.partition_mapping, ) for dep in self._external_asset_node.dependencies ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py new file mode 100644 index 0000000000000..92cfe194b7b0d --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_mappings.py @@ -0,0 +1,19 @@ +import graphene +from dagster._core.definitions.partition_mapping import PartitionMapping + + +class GraphenePartitionMapping(graphene.ObjectType): + className = graphene.NonNull(graphene.String) + description = graphene.NonNull(graphene.String) + + class Meta: + name = "PartitionMapping" + + def __init__( + self, + partition_mapping: PartitionMapping, + ): + super().__init__( + className=type(partition_mapping).__name__, + description=partition_mapping.description, + ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index d20dc7cc40a67..e0ef4db05b9e0 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -16,6 +16,7 @@ AssetCheckResult, AssetCheckSpec, AssetExecutionContext, + AssetIn, AssetKey, AssetMaterialization, AssetObservation, @@ -59,6 +60,7 @@ TableConstraints, TableRecord, TableSchema, + TimeWindowPartitionMapping, WeeklyPartitionsDefinition, _check as check, asset, @@ -1464,7 +1466,12 @@ def upstream_time_partitioned_asset(): return 1 -@asset(partitions_def=hourly_partition) +@asset( + partitions_def=hourly_partition, + ins={ + "upstream_time_partitioned_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping()) + }, +) def downstream_time_partitioned_asset( upstream_time_partitioned_asset, ): diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 1473c99e48e00..b92d6efc7858c 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -646,6 +646,29 @@ } """ +GET_ASSET_DEPENDENCIES_PARTITION_MAPPING = """ + query AssetNodeQuery($assetKey: AssetKeyInput!) { + assetNodeOrError(assetKey: $assetKey) { + ...on AssetNode { + assetKey { + path + } + dependencies { + asset { + assetKey { + path + } + } + partitionMapping { + className + description + } + } + } + } + } +""" + def _create_run( graphql_context: WorkspaceRequestContext, @@ -2376,6 +2399,27 @@ def test_get_backfill_policy(self, graphql_context: WorkspaceRequestContext): == "Backfills in multiple runs, with a maximum of 10 partitions per run" ) + def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext): + result = execute_dagster_graphql( + graphql_context, + GET_ASSET_DEPENDENCIES_PARTITION_MAPPING, + variables={ + "assetKey": {"path": ["downstream_time_partitioned_asset"]}, + }, + ) + + assert result.data["assetNodeOrError"]["assetKey"]["path"] == [ + "downstream_time_partitioned_asset" + ] + dependencies = result.data["assetNodeOrError"]["dependencies"] + assert len(dependencies) == 1 + assert dependencies[0]["asset"]["assetKey"]["path"] == ["upstream_time_partitioned_asset"] + assert dependencies[0]["partitionMapping"]["className"] == "TimeWindowPartitionMapping" + assert ( + dependencies[0]["partitionMapping"]["description"] + == "Maps a downstream partition to any upstream partition with an overlapping time window." + ) + class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix): def test_report_runless_asset_events_permissions( diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index a5690fefc84be..66cc6092a7a56 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -1,6 +1,6 @@ import collections.abc import itertools -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty from collections import defaultdict from datetime import datetime from typing import ( @@ -99,6 +99,11 @@ def get_upstream_mapped_partitions_result_for_partitions( when downstream_partitions_subset contains 2023-05-01 and 2023-06-01. """ + @abstractproperty + def description(self) -> str: + """A human-readable description of the partition mapping, displayed in the Dagster UI.""" + raise NotImplementedError() + @whitelist_for_serdes class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionMapping", [])): @@ -159,6 +164,13 @@ def get_downstream_partitions_for_partitions( list(downstream_partition_keys & upstream_partition_keys) ) + @property + def description(self) -> str: + return ( + "Assumes upstream and downstream assets share the same partitions definition. " + "Maps each partition in the downstream asset to the same partition in the upstream asset." + ) + @whitelist_for_serdes class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [])): @@ -191,6 +203,10 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + @property + def description(self) -> str: + return "Each downstream partition depends on all partitions of the upstream asset." + @whitelist_for_serdes class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", [])): @@ -236,6 +252,10 @@ def get_downstream_partitions_for_partitions( else: return downstream_partitions_def.empty_subset() + @property + def description(self) -> str: + return "Each downstream partition depends on the last partition of the upstream asset." + @whitelist_for_serdes class SpecificPartitionsPartitionMapping( @@ -292,6 +312,10 @@ def get_downstream_partitions_for_partitions( ) return downstream_partitions_def.empty_subset() + @property + def description(self) -> str: + return f"Each downstream partition depends on the following upstream partitions: {self.partition_keys}" + class DimensionDependency(NamedTuple): partition_mapping: PartitionMapping @@ -579,6 +603,14 @@ def __new__(cls, partition_dimension_name: Optional[str] = None): ), ) + @property + def description(self) -> str: + return ( + "Assumes that the single-dimension partitions definition is a dimension of the " + "multi-partitions definition. For a single-dimension partition key X, any " + "multi-partition key with X in the matching dimension is a dependency." + ) + def get_dimension_dependencies( self, upstream_partitions_def: PartitionsDefinition, @@ -728,6 +760,18 @@ def __new__( ), ) + @property + def description(self) -> str: + return "\n ".join( + [ + ( + f"Upstream dimension '{upstream_dim}' mapped to downstream dimension " + f"'{downstream_mapping.dimension_name}' using {type(downstream_mapping.partition_mapping).__name__}." + ) + for upstream_dim, downstream_mapping in self.downstream_mappings_by_upstream_dimension.items() + ] + ) + def get_dimension_dependencies( self, upstream_partitions_def: PartitionsDefinition, @@ -903,6 +947,13 @@ def get_upstream_mapped_partitions_result_for_partitions( return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), []) + @property + def description(self) -> str: + return ( + f"Maps upstream partitions to their downstream dependencies according to the " + f"following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}" + ) + class InferSingleToMultiDimensionDepsResult( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index d2839f8e126d4..b7596a9fb8d29 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -105,6 +105,20 @@ def __new__( ), ) + @property + def description(self) -> str: + description_str = ( + "Maps a downstream partition to any upstream partition with an overlapping time window." + ) + + if self.start_offset != 0 or self.end_offset != 0: + description_str += ( + f" The start and end of the upstream time window is offsetted by " + f"{self.start_offset} and {self.end_offset} partitions respectively." + ) + + return description_str + def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 9ae28328803c3..7c1a07f8916a7 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -91,6 +91,10 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + class MyIOManager(IOManager): def handle_output(self, context, obj): assert context.asset_partition_key == "2" diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py index 25226168cdb21..17279c0e00926 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py @@ -953,3 +953,26 @@ def test_dynamic_dimension_multipartition_mapping(): dynamic_partitions_store=instance, ) assert result.partitions_subset == foo_bar.empty_subset().with_partition_keys(["2|a", "1|a"]) + + +def test_description(): + description = MultiPartitionMapping( + { + "abc": DimensionPartitionMapping( + dimension_name="abc", partition_mapping=IdentityPartitionMapping() + ), + "daily": DimensionPartitionMapping( + dimension_name="weekly", + partition_mapping=TimeWindowPartitionMapping(), + ), + } + ).description + assert ( + "'abc' mapped to downstream dimension 'abc' using IdentityPartitionMapping" in description + ) + assert ( + "'daily' mapped to downstream dimension 'weekly' using TimeWindowPartitionMapping" + in description + ) + + assert MultiPartitionMapping({}).description == "" diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 2b80ce3ca60f5..48fa89820ea1d 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -240,6 +240,10 @@ def get_downstream_partitions_for_partitions( ) -> PartitionsSubset: raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + @asset(partitions_def=StaticPartitionsDefinition(["1", "2", "3"])) def parent(): ... diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index 173c50405345b..ab5b225455b0b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -312,6 +312,10 @@ def get_downstream_partitions_for_partitions( ): raise NotImplementedError() + @property + def description(self) -> str: + raise NotImplementedError() + partitions_def = DailyPartitionsDefinition(start_date="2020-02-01") @asset(partitions_def=partitions_def)