Skip to content

Commit

Permalink
[gql] Expose partition mapping on AssetDependency (#18119)
Browse files Browse the repository at this point in the history
Adds backend support to resolve
#15720

This PR exposes the partition mapping on `GrapheneAssetDependency`,
which represents the "edge" in between an asset and its upstream. The
partition mapping is only returned if it is defined on the asset (and
not inferred). The reasoning here is to avoid introducing complexity in
the UI when the user hasn't provided an explicit definition.

It adds descriptions for partition mappings in case if we'd like to
display that.
  • Loading branch information
clairelin135 authored Nov 27, 2023
1 parent b8ebe0a commit c21b2af
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 2 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
Expand Down Expand Up @@ -79,6 +80,7 @@
from .errors import GrapheneAssetNotFoundError
from .freshness_policy import GrapheneAssetFreshnessInfo, GrapheneFreshnessPolicy
from .logs.events import GrapheneMaterializationEvent, GrapheneObservationEvent
from .partition_mappings import GraphenePartitionMapping
from .pipelines.pipeline import (
GrapheneAssetPartitionStatuses,
GrapheneDefaultPartitionStatuses,
Expand Down Expand Up @@ -117,6 +119,7 @@ class Meta:

asset = graphene.NonNull("dagster_graphql.schema.asset_graph.GrapheneAssetNode")
inputName = graphene.NonNull(graphene.String)
partitionMapping = graphene.Field(GraphenePartitionMapping)

def __init__(
self,
Expand All @@ -127,6 +130,7 @@ def __init__(
asset_checks_loader: AssetChecksLoader,
materialization_loader: Optional[BatchMaterializationLoader] = None,
depended_by_loader: Optional[CrossRepoAssetDependedByLoader] = None,
partition_mapping: Optional[PartitionMapping] = None,
):
self._repository_location = check.inst_param(
repository_location, "repository_location", CodeLocation
Expand All @@ -144,6 +148,9 @@ def __init__(
self._depended_by_loader = check.opt_inst_param(
depended_by_loader, "depended_by_loader", CrossRepoAssetDependedByLoader
)
self._partition_mapping = check.opt_inst_param(
partition_mapping, "partition_mapping", PartitionMapping
)
super().__init__(inputName=input_name)

def resolve_asset(self, _graphene_info: ResolveInfo):
Expand All @@ -160,6 +167,13 @@ def resolve_asset(self, _graphene_info: ResolveInfo):
materialization_loader=self._latest_materialization_loader,
)

def resolve_partitionMapping(
self, _graphene_info: ResolveInfo
) -> Optional[GraphenePartitionMapping]:
if self._partition_mapping:
return GraphenePartitionMapping(self._partition_mapping)
return None


class GrapheneAssetLatestInfo(graphene.ObjectType):
id = graphene.NonNull(graphene.ID)
Expand Down Expand Up @@ -799,6 +813,7 @@ def resolve_dependencies(self, graphene_info: ResolveInfo) -> Sequence[GrapheneA
asset_key=dep.upstream_asset_key,
materialization_loader=materialization_loader,
asset_checks_loader=asset_checks_loader,
partition_mapping=dep.partition_mapping,
)
for dep in self._external_asset_node.dependencies
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import graphene
from dagster._core.definitions.partition_mapping import PartitionMapping


class GraphenePartitionMapping(graphene.ObjectType):
className = graphene.NonNull(graphene.String)
description = graphene.NonNull(graphene.String)

class Meta:
name = "PartitionMapping"

def __init__(
self,
partition_mapping: PartitionMapping,
):
super().__init__(
className=type(partition_mapping).__name__,
description=partition_mapping.description,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
AssetIn,
AssetKey,
AssetMaterialization,
AssetObservation,
Expand Down Expand Up @@ -59,6 +60,7 @@
TableConstraints,
TableRecord,
TableSchema,
TimeWindowPartitionMapping,
WeeklyPartitionsDefinition,
_check as check,
asset,
Expand Down Expand Up @@ -1464,7 +1466,12 @@ def upstream_time_partitioned_asset():
return 1


@asset(partitions_def=hourly_partition)
@asset(
partitions_def=hourly_partition,
ins={
"upstream_time_partitioned_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping())
},
)
def downstream_time_partitioned_asset(
upstream_time_partitioned_asset,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,29 @@
}
"""

GET_ASSET_DEPENDENCIES_PARTITION_MAPPING = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
...on AssetNode {
assetKey {
path
}
dependencies {
asset {
assetKey {
path
}
}
partitionMapping {
className
description
}
}
}
}
}
"""


def _create_run(
graphql_context: WorkspaceRequestContext,
Expand Down Expand Up @@ -2376,6 +2399,27 @@ def test_get_backfill_policy(self, graphql_context: WorkspaceRequestContext):
== "Backfills in multiple runs, with a maximum of 10 partitions per run"
)

def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(
graphql_context,
GET_ASSET_DEPENDENCIES_PARTITION_MAPPING,
variables={
"assetKey": {"path": ["downstream_time_partitioned_asset"]},
},
)

assert result.data["assetNodeOrError"]["assetKey"]["path"] == [
"downstream_time_partitioned_asset"
]
dependencies = result.data["assetNodeOrError"]["dependencies"]
assert len(dependencies) == 1
assert dependencies[0]["asset"]["assetKey"]["path"] == ["upstream_time_partitioned_asset"]
assert dependencies[0]["partitionMapping"]["className"] == "TimeWindowPartitionMapping"
assert (
dependencies[0]["partitionMapping"]["description"]
== "Maps a downstream partition to any upstream partition with an overlapping time window."
)


class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix):
def test_report_runless_asset_events_permissions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections.abc
import itertools
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -99,6 +99,11 @@ def get_upstream_mapped_partitions_result_for_partitions(
when downstream_partitions_subset contains 2023-05-01 and 2023-06-01.
"""

@abstractproperty
def description(self) -> str:
"""A human-readable description of the partition mapping, displayed in the Dagster UI."""
raise NotImplementedError()


@whitelist_for_serdes
class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionMapping", [])):
Expand Down Expand Up @@ -159,6 +164,13 @@ def get_downstream_partitions_for_partitions(
list(downstream_partition_keys & upstream_partition_keys)
)

@property
def description(self) -> str:
return (
"Assumes upstream and downstream assets share the same partitions definition. "
"Maps each partition in the downstream asset to the same partition in the upstream asset."
)


@whitelist_for_serdes
class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [])):
Expand Down Expand Up @@ -191,6 +203,10 @@ def get_downstream_partitions_for_partitions(
) -> PartitionsSubset:
raise NotImplementedError()

@property
def description(self) -> str:
return "Each downstream partition depends on all partitions of the upstream asset."


@whitelist_for_serdes
class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", [])):
Expand Down Expand Up @@ -236,6 +252,10 @@ def get_downstream_partitions_for_partitions(
else:
return downstream_partitions_def.empty_subset()

@property
def description(self) -> str:
return "Each downstream partition depends on the last partition of the upstream asset."


@whitelist_for_serdes
class SpecificPartitionsPartitionMapping(
Expand Down Expand Up @@ -292,6 +312,10 @@ def get_downstream_partitions_for_partitions(
)
return downstream_partitions_def.empty_subset()

@property
def description(self) -> str:
return f"Each downstream partition depends on the following upstream partitions: {self.partition_keys}"


class DimensionDependency(NamedTuple):
partition_mapping: PartitionMapping
Expand Down Expand Up @@ -579,6 +603,14 @@ def __new__(cls, partition_dimension_name: Optional[str] = None):
),
)

@property
def description(self) -> str:
return (
"Assumes that the single-dimension partitions definition is a dimension of the "
"multi-partitions definition. For a single-dimension partition key X, any "
"multi-partition key with X in the matching dimension is a dependency."
)

def get_dimension_dependencies(
self,
upstream_partitions_def: PartitionsDefinition,
Expand Down Expand Up @@ -728,6 +760,18 @@ def __new__(
),
)

@property
def description(self) -> str:
return "\n ".join(
[
(
f"Upstream dimension '{upstream_dim}' mapped to downstream dimension "
f"'{downstream_mapping.dimension_name}' using {type(downstream_mapping.partition_mapping).__name__}."
)
for upstream_dim, downstream_mapping in self.downstream_mappings_by_upstream_dimension.items()
]
)

def get_dimension_dependencies(
self,
upstream_partitions_def: PartitionsDefinition,
Expand Down Expand Up @@ -903,6 +947,13 @@ def get_upstream_mapped_partitions_result_for_partitions(

return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), [])

@property
def description(self) -> str:
return (
f"Maps upstream partitions to their downstream dependencies according to the "
f"following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}"
)


class InferSingleToMultiDimensionDepsResult(
NamedTuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ def __new__(
),
)

@property
def description(self) -> str:
description_str = (
"Maps a downstream partition to any upstream partition with an overlapping time window."
)

if self.start_offset != 0 or self.end_offset != 0:
description_str += (
f" The start and end of the upstream time window is offsetted by "
f"{self.start_offset} and {self.end_offset} partitions respectively."
)

return description_str

def get_upstream_mapped_partitions_result_for_partitions(
self,
downstream_partitions_subset: Optional[PartitionsSubset],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def get_downstream_partitions_for_partitions(
) -> PartitionsSubset:
raise NotImplementedError()

@property
def description(self) -> str:
raise NotImplementedError()

class MyIOManager(IOManager):
def handle_output(self, context, obj):
assert context.asset_partition_key == "2"
Expand Down
Loading

1 comment on commit c21b2af

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-lceqvd6e4-elementl.vercel.app

Built with commit c21b2af.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.