Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gql] Expose partition mapping on AssetDependency #18119

Merged
merged 6 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
Expand Down Expand Up @@ -79,6 +80,7 @@
from .errors import GrapheneAssetNotFoundError
from .freshness_policy import GrapheneAssetFreshnessInfo, GrapheneFreshnessPolicy
from .logs.events import GrapheneMaterializationEvent, GrapheneObservationEvent
from .partition_mappings import GraphenePartitionMapping
from .pipelines.pipeline import (
GrapheneAssetPartitionStatuses,
GrapheneDefaultPartitionStatuses,
Expand Down Expand Up @@ -117,6 +119,7 @@ class Meta:

asset = graphene.NonNull("dagster_graphql.schema.asset_graph.GrapheneAssetNode")
inputName = graphene.NonNull(graphene.String)
partitionMapping = graphene.Field(GraphenePartitionMapping)

def __init__(
self,
Expand All @@ -127,6 +130,7 @@ def __init__(
asset_checks_loader: AssetChecksLoader,
materialization_loader: Optional[BatchMaterializationLoader] = None,
depended_by_loader: Optional[CrossRepoAssetDependedByLoader] = None,
partition_mapping: Optional[PartitionMapping] = None,
):
self._repository_location = check.inst_param(
repository_location, "repository_location", CodeLocation
Expand All @@ -144,6 +148,9 @@ def __init__(
self._depended_by_loader = check.opt_inst_param(
depended_by_loader, "depended_by_loader", CrossRepoAssetDependedByLoader
)
self._partition_mapping = check.opt_inst_param(
partition_mapping, "partition_mapping", PartitionMapping
)
super().__init__(inputName=input_name)

def resolve_asset(self, _graphene_info: ResolveInfo):
Expand All @@ -160,6 +167,13 @@ def resolve_asset(self, _graphene_info: ResolveInfo):
materialization_loader=self._latest_materialization_loader,
)

def resolve_partitionMapping(
self, _graphene_info: ResolveInfo
) -> Optional[GraphenePartitionMapping]:
if self._partition_mapping:
return GraphenePartitionMapping(self._partition_mapping)
return None


class GrapheneAssetLatestInfo(graphene.ObjectType):
id = graphene.NonNull(graphene.ID)
Expand Down Expand Up @@ -799,6 +813,7 @@ def resolve_dependencies(self, graphene_info: ResolveInfo) -> Sequence[GrapheneA
asset_key=dep.upstream_asset_key,
materialization_loader=materialization_loader,
asset_checks_loader=asset_checks_loader,
partition_mapping=dep.partition_mapping,
)
for dep in self._external_asset_node.dependencies
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import graphene
from dagster._core.definitions.partition_mapping import PartitionMapping


class GraphenePartitionMapping(graphene.ObjectType):
className = graphene.NonNull(graphene.String)
description = graphene.NonNull(graphene.String)

class Meta:
name = "PartitionMapping"

def __init__(
self,
partition_mapping: PartitionMapping,
):
super().__init__(
className=type(partition_mapping).__name__,
description=partition_mapping.description,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
AssetIn,
AssetKey,
AssetMaterialization,
AssetObservation,
Expand Down Expand Up @@ -59,6 +60,7 @@
TableConstraints,
TableRecord,
TableSchema,
TimeWindowPartitionMapping,
WeeklyPartitionsDefinition,
_check as check,
asset,
Expand Down Expand Up @@ -1464,7 +1466,12 @@ def upstream_time_partitioned_asset():
return 1


@asset(partitions_def=hourly_partition)
@asset(
partitions_def=hourly_partition,
ins={
"upstream_time_partitioned_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping())
},
)
def downstream_time_partitioned_asset(
upstream_time_partitioned_asset,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,29 @@
}
"""

GET_ASSET_DEPENDENCIES_PARTITION_MAPPING = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
...on AssetNode {
assetKey {
path
}
dependencies {
asset {
assetKey {
path
}
}
partitionMapping {
className
description
}
}
}
}
}
"""


def _create_run(
graphql_context: WorkspaceRequestContext,
Expand Down Expand Up @@ -2376,6 +2399,27 @@ def test_get_backfill_policy(self, graphql_context: WorkspaceRequestContext):
== "Backfills in multiple runs, with a maximum of 10 partitions per run"
)

def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(
graphql_context,
GET_ASSET_DEPENDENCIES_PARTITION_MAPPING,
variables={
"assetKey": {"path": ["downstream_time_partitioned_asset"]},
},
)

assert result.data["assetNodeOrError"]["assetKey"]["path"] == [
"downstream_time_partitioned_asset"
]
dependencies = result.data["assetNodeOrError"]["dependencies"]
assert len(dependencies) == 1
assert dependencies[0]["asset"]["assetKey"]["path"] == ["upstream_time_partitioned_asset"]
assert dependencies[0]["partitionMapping"]["className"] == "TimeWindowPartitionMapping"
assert (
dependencies[0]["partitionMapping"]["description"]
== "Maps a downstream partition to any upstream partition with an overlapping time window."
)


class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix):
def test_report_runless_asset_events_permissions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections.abc
import itertools
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -99,6 +99,11 @@ def get_upstream_mapped_partitions_result_for_partitions(
when downstream_partitions_subset contains 2023-05-01 and 2023-06-01.
"""

@abstractproperty
def description(self) -> str:
"""A human-readable description of the partition mapping, displayed in the Dagster UI."""
raise NotImplementedError()


@whitelist_for_serdes
class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionMapping", [])):
Expand Down Expand Up @@ -159,6 +164,13 @@ def get_downstream_partitions_for_partitions(
list(downstream_partition_keys & upstream_partition_keys)
)

@property
def description(self) -> str:
return (
"Assumes upstream and downstream assets share the same partitions definition. "
"Maps each partition in the downstream asset to the same partition in the upstream asset."
)


@whitelist_for_serdes
class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [])):
Expand Down Expand Up @@ -191,6 +203,10 @@ def get_downstream_partitions_for_partitions(
) -> PartitionsSubset:
raise NotImplementedError()

@property
def description(self) -> str:
return "Each downstream partition depends on all partitions of the upstream asset."


@whitelist_for_serdes
class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", [])):
Expand Down Expand Up @@ -236,6 +252,10 @@ def get_downstream_partitions_for_partitions(
else:
return downstream_partitions_def.empty_subset()

@property
def description(self) -> str:
return "Each downstream partition depends on the last partition of the upstream asset."


@whitelist_for_serdes
class SpecificPartitionsPartitionMapping(
Expand Down Expand Up @@ -292,6 +312,10 @@ def get_downstream_partitions_for_partitions(
)
return downstream_partitions_def.empty_subset()

@property
def description(self) -> str:
return f"Each downstream partition depends on the following upstream partitions: {self.partition_keys}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, this could be a large number of partitions. Should we be concerned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little concerning, but for StaticPartitionsDefinitions our description string is all of the keys concatenated with a comma, which is probably worse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just truncate and add an ellipsis on the front end if the string is too long?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. That sounds fine to me for now.



class DimensionDependency(NamedTuple):
partition_mapping: PartitionMapping
Expand Down Expand Up @@ -579,6 +603,14 @@ def __new__(cls, partition_dimension_name: Optional[str] = None):
),
)

@property
def description(self) -> str:
return (
"Assumes that the single-dimension partitions definition is a dimension of the "
"multi-partitions definition. For a single-dimension partition key X, any "
"multi-partition key with X in the matching dimension is a dependency."
)

def get_dimension_dependencies(
self,
upstream_partitions_def: PartitionsDefinition,
Expand Down Expand Up @@ -728,6 +760,18 @@ def __new__(
),
)

@property
def description(self) -> str:
return "\n ".join(
[
(
f"Upstream dimension '{upstream_dim}' mapped to downstream dimension "
f"'{downstream_mapping.dimension_name}' using {type(downstream_mapping.partition_mapping).__name__}."
)
for upstream_dim, downstream_mapping in self.downstream_mappings_by_upstream_dimension.items()
]
)

def get_dimension_dependencies(
self,
upstream_partitions_def: PartitionsDefinition,
Expand Down Expand Up @@ -903,6 +947,13 @@ def get_upstream_mapped_partitions_result_for_partitions(

return UpstreamPartitionsResult(upstream_subset.with_partition_keys(upstream_keys), [])

@property
def description(self) -> str:
return (
f"Maps upstream partitions to their downstream dependencies according to the "
f"following mapping: \n{self.downstream_partition_keys_by_upstream_partition_key}"
)


class InferSingleToMultiDimensionDepsResult(
NamedTuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ def __new__(
),
)

@property
def description(self) -> str:
description_str = (
"Maps a downstream partition to any upstream partition with an overlapping time window."
)

if self.start_offset != 0 or self.end_offset != 0:
description_str += (
f" The start and end of the upstream time window is offsetted by "
f"{self.start_offset} and {self.end_offset} partitions respectively."
)

return description_str

def get_upstream_mapped_partitions_result_for_partitions(
self,
downstream_partitions_subset: Optional[PartitionsSubset],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def get_downstream_partitions_for_partitions(
) -> PartitionsSubset:
raise NotImplementedError()

@property
def description(self) -> str:
raise NotImplementedError()

class MyIOManager(IOManager):
def handle_output(self, context, obj):
assert context.asset_partition_key == "2"
Expand Down
Loading