Skip to content

Commit

Permalink
add graphql
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Oct 30, 2023
1 parent 2285c96 commit 0be6e6f
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 0 deletions.
12 changes: 12 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
GrapheneRegularDagsterType,
to_dagster_type,
)
from .backfill import GrapheneBackfillPolicy
from .errors import GrapheneAssetNotFoundError
from .freshness_policy import GrapheneAssetFreshnessInfo, GrapheneFreshnessPolicy
from .logs.events import GrapheneMaterializationEvent, GrapheneObservationEvent
Expand Down Expand Up @@ -206,6 +207,7 @@ class GrapheneAssetNode(graphene.ObjectType):
beforeTimestampMillis=graphene.String(),
limit=graphene.Int(),
)
backfillPolicy = graphene.Field(GrapheneBackfillPolicy)
computeKind = graphene.String()
configField = graphene.Field(GrapheneConfigTypeField)
dataVersion = graphene.Field(graphene.String(), partition=graphene.String())
Expand Down Expand Up @@ -803,6 +805,13 @@ def resolve_autoMaterializePolicy(
return GrapheneAutoMaterializePolicy(self._external_asset_node.auto_materialize_policy)
return None

def resolve_backfillPolicy(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneBackfillPolicy]:
if self._external_asset_node.backfill_policy:
return GrapheneBackfillPolicy(self._external_asset_node.backfill_policy)
return None

def resolve_jobNames(self, _graphene_info: ResolveInfo) -> Sequence[str]:
return self._external_asset_node.job_names

Expand Down
27 changes: 27 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._core.workspace.permissions import Permissions

from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType

from ..implementation.fetch_partition_sets import (
partition_status_counts_from_run_partition_data,
partition_statuses_from_run_partition_data,
Expand Down Expand Up @@ -447,3 +449,28 @@ class GraphenePartitionBackfillsOrError(graphene.Union):
class Meta:
types = (GraphenePartitionBackfills, GraphenePythonError)
name = "PartitionBackfillsOrError"


GrapheneBackfillPolicyType = graphene.Enum.from_enum(BackfillPolicyType)


class GrapheneBackfillPolicy(graphene.ObjectType):
maxPartitionsPerRun = graphene.Field(graphene.Int())
description = graphene.NonNull(graphene.String)
policyType = graphene.NonNull(GrapheneBackfillPolicyType)

class Meta:
name = "BackfillPolicy"

def __init__(self, backfill_policy: BackfillPolicy):
self._backfill_policy = check.inst_param(backfill_policy, "backfill_policy", BackfillPolicy)
super().__init__(
maxPartitionsPerRun=backfill_policy.max_partitions_per_run,
policyType=backfill_policy.policy_type,
)

def resolve_description(self, _graphene_info: ResolveInfo) -> str:
if self._backfill_policy.max_partitions_per_run is None:
return "Backfills all partitions in a single run"
else:
return f"Backfills in multiple runs, with a maximum of {self._backfill_policy.max_partitions_per_run} partitions per run"
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
static_partitioned_config,
usable_as_dagster_type,
with_resources,
BackfillPolicy,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.decorators.sensor_decorator import sensor
Expand Down Expand Up @@ -1772,6 +1773,22 @@ def dynamic_in_multipartitions_fail(context, dynamic_in_multipartitions_success)
raise Exception("oops")


@asset(
partitions_def=DailyPartitionsDefinition("2023-01-01"),
backfill_policy=BackfillPolicy.single_run(),
)
def single_run_backfill_policy_asset(context):
pass


@asset(
partitions_def=DailyPartitionsDefinition("2023-01-03"),
backfill_policy=BackfillPolicy.multi_run(10),
)
def multi_run_backfill_policy_asset(context):
pass


named_groups_job = build_assets_job(
"named_groups_job",
[
Expand Down Expand Up @@ -1924,6 +1941,8 @@ def define_asset_jobs():
checked_multi_asset_job,
check_in_op_asset,
asset_check_job,
single_run_backfill_policy_asset,
multi_run_backfill_policy_asset,
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,23 @@
}
"""

GET_ASSET_BACKFILL_POLICY = """
query AssetNodeQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
...on AssetNode {
assetKey {
path
}
backfillPolicy {
maxPartitionsPerRun
policyType
description
}
}
}
}
"""


def _create_run(
graphql_context: WorkspaceRequestContext,
Expand Down Expand Up @@ -2322,6 +2339,43 @@ def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext):
else:
assert a["hasAssetChecks"] is False, f"Asset {a['assetKey']} has asset checks"

def test_get_backfill_policy(self, graphql_context: WorkspaceRequestContext):
result = execute_dagster_graphql(
graphql_context,
GET_ASSET_BACKFILL_POLICY,
variables={
"assetKey": {"path": ["single_run_backfill_policy_asset"]},
},
)

assert result.data["assetNodeOrError"]["assetKey"]["path"] == [
"single_run_backfill_policy_asset"
]
assert result.data["assetNodeOrError"]["backfillPolicy"]["policyType"] == "SINGLE_RUN"
assert result.data["assetNodeOrError"]["backfillPolicy"]["maxPartitionsPerRun"] == None
assert (
result.data["assetNodeOrError"]["backfillPolicy"]["description"]
== "Backfills all partitions in a single run"
)

result = execute_dagster_graphql(
graphql_context,
GET_ASSET_BACKFILL_POLICY,
variables={
"assetKey": {"path": ["multi_run_backfill_policy_asset"]},
},
)

assert result.data["assetNodeOrError"]["assetKey"]["path"] == [
"multi_run_backfill_policy_asset"
]
assert result.data["assetNodeOrError"]["backfillPolicy"]["policyType"] == "MULTI_RUN"
assert result.data["assetNodeOrError"]["backfillPolicy"]["maxPartitionsPerRun"] == 10
assert (
result.data["assetNodeOrError"]["backfillPolicy"]["description"]
== "Backfills in multiple runs, with a maximum of 10 partitions per run"
)


class TestAssetEventsReadOnly(ReadonlyGraphQLContextTestMatrix):
def test_report_runless_asset_events_permissions(
Expand Down

0 comments on commit 0be6e6f

Please sign in to comment.