Skip to content

Commit

Permalink
filters all the way through to gql layer
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 15, 2024
1 parent d8b072b commit ba9202d
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 12 deletions.
23 changes: 15 additions & 8 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Optional

from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand All @@ -23,11 +23,12 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[BulkActionsFilter] = None,
) -> "GraphenePartitionBackfills":
from ..schema.backfill import GraphenePartitionBackfill, GraphenePartitionBackfills

backfills = graphene_info.context.instance.get_backfills(
status=status, cursor=cursor, limit=limit
status=status, cursor=cursor, limit=limit, filters=filters
)
return GraphenePartitionBackfills(
results=[GraphenePartitionBackfill(backfill) for backfill in backfills]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster._core.execution.backfill import BulkActionsFilter
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
Expand Down Expand Up @@ -490,7 +491,7 @@ def get_runs_feed_entries(
for backfill in instance.get_backfills(
cursor=runs_feed_cursor.backfill_cursor,
limit=limit,
created_before=created_before_cursor,
filters=BulkActionsFilter(created_before=created_before_cursor),
)
]
runs = [
Expand Down
29 changes: 29 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import graphene
from dagster._core.events import DagsterEventType
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._time import datetime_from_timestamp
from dagster._utils import check
Expand Down Expand Up @@ -373,6 +374,33 @@ class Meta:
name = "TagInput"


class GrapheneBulkActionsFilter(graphene.InputObjectType):
status = graphene.List(
graphene.NonNull("dagster_graphql.schema.backfill.GrapheneBulkActionStatus")
)
createdBefore = graphene.InputField(graphene.Float)
createdAfter = graphene.InputField(graphene.Float)

class Meta:
description = """This type represents a filter on Dagster Bulk Actions (backfills)."""
name = "BulkActionsFilter"

def to_selector(self):
if self.status:
status = BulkActionStatus.from_graphql_input(self.status)
else:
status = None

created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None
created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None

return BulkActionsFilter(
status=status,
created_before=created_before,
created_after=created_after,
)


types = [
GrapheneAssetKeyInput,
GrapheneExecutionMetadata,
Expand All @@ -394,4 +422,5 @@ class Meta:
GrapheneStepOutputHandle,
GrapheneTagInput,
GrapheneReportRunlessAssetEventsParams,
GrapheneBulkActionsFilter,
]
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
GrapheneAssetBackfillPreviewParams,
GrapheneAssetGroupSelector,
GrapheneAssetKeyInput,
GrapheneBulkActionsFilter,
GrapheneGraphSelector,
GrapheneInstigationSelector,
GraphenePipelineSelector,
Expand Down Expand Up @@ -474,6 +475,7 @@ class Meta:
status=graphene.Argument(GrapheneBulkActionStatus),
cursor=graphene.String(),
limit=graphene.Int(),
filters=graphene.Argument(GrapheneBulkActionsFilter),
description="Retrieve backfills after applying a status filter, cursor, and limit.",
)

Expand Down Expand Up @@ -1105,12 +1107,14 @@ def resolve_partitionBackfillsOrError(
status: Optional[GrapheneBulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[GrapheneBulkActionsFilter] = None,
):
return get_backfills(
graphene_info,
status=BulkActionStatus.from_graphql_input(status) if status else None,
cursor=cursor,
limit=limit,
filters=filters.to_selector() if filters else None,
)

def resolve_assetBackfillPreview(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@
}
"""

BACKFILLS_WITH_FILTERS_QUERY = """
query BackfillsWithFiltersQuery($filters: BulkActionsFilter) {
partitionBackfillsOrError(filters: $filters) {
... on PartitionBackfills {
results {
id
timestamp
status
}
}
}
}
"""


def get_repo() -> RepositoryDefinition:
partitions_def = StaticPartitionsDefinition(["a", "b", "c"])
Expand Down Expand Up @@ -1056,7 +1070,9 @@ def test_asset_backfill_error_raised_upon_invalid_params_provided():


def _get_backfill_data(
launch_backfill_result: GqlResult, instance: DagsterInstance, repo
launch_backfill_result: GqlResult,
instance: DagsterInstance,
repo,
) -> Tuple[str, AssetBackfillData]:
assert launch_backfill_result
assert launch_backfill_result.data
Expand Down Expand Up @@ -1131,3 +1147,56 @@ def override_backfill_storage_setting(self):
)

assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0


def test_get_backfills_with_filters():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as context:
# launchPartitionBackfill
all_backfills = []
for _ in range(5):
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "b"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
assert (
"backfillId" in launch_backfill_result.data["launchPartitionBackfill"]
), _get_error_message(launch_backfill_result)

backfill_id = launch_backfill_result.data["launchPartitionBackfill"]["backfillId"]
backfill = instance.get_backfill(backfill_id)
all_backfills.append(backfill)

# on PartitionBackfills
get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"createdBefore": all_backfills[3].backfill_timestamp}},
)
assert not get_backfills_result.errors
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]

for result in backfill_results:
assert result["timestamp"] < all_backfills[3].backfill_timestamp

get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"createdAfter": all_backfills[3].backfill_timestamp}},
)
assert not get_backfills_result.errors
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]

for result in backfill_results:
assert result["timestamp"] > all_backfills[3].backfill_timestamp

0 comments on commit ba9202d

Please sign in to comment.