Skip to content

Commit

Permalink
[gql] Report runless asset events (#17019)
Browse files Browse the repository at this point in the history
This PR adds graphQL to enable reporting asset materializations in the
UI.

It creates a `reportRunlessAssetEvents` mutation with the following
arguments:
- event type -- currently only accepts observations and
materializations, though we can use this to support asset checks in the
UI in the future
- asset key -- our planned UIs currently only support targeting one
asset at a time
- partition keys (optional)
- description (optional)

In the next PR I plan on enabling displaying the creator of a runless
event.
  • Loading branch information
clairelin135 authored Oct 9, 2023
1 parent 56a9006 commit 2567e2d
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const EXPECTED_PERMISSIONS = {
reload_repository_location: true,
reload_workspace: true,
wipe_assets: true,
report_runless_asset_events: true,
launch_partition_backfill: true,
cancel_partition_backfill: true,
edit_dynamic_partitions: true,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
import dagster._check as check
from dagster._annotations import deprecated
from dagster._core.definitions.events import AssetKey
from dagster._core.events import EngineEventData
from dagster._core.events import (
AssetMaterialization,
AssetObservation,
DagsterEventType,
EngineEventData,
)
from dagster._core.instance import (
DagsterInstance,
)
Expand Down Expand Up @@ -47,6 +52,7 @@
from ...schema.roots.mutation import (
GrapheneAssetWipeSuccess,
GrapheneDeletePipelineRunSuccess,
GrapheneReportRunlessAssetEventsSuccess,
GrapheneTerminateRunFailure,
GrapheneTerminateRunsResult,
GrapheneTerminateRunSuccess,
Expand Down Expand Up @@ -370,3 +376,45 @@ def wipe_assets(
instance = graphene_info.context.instance
instance.wipe_assets(asset_keys)
return GrapheneAssetWipeSuccess(assetKeys=asset_keys)


def create_asset_event(
event_type: DagsterEventType,
asset_key: AssetKey,
partition_key: Optional[str],
description: Optional[str],
) -> Union[AssetMaterialization, AssetObservation]:
if event_type == DagsterEventType.ASSET_MATERIALIZATION:
return AssetMaterialization(
asset_key=asset_key, partition=partition_key, description=description
)
elif event_type == DagsterEventType.ASSET_OBSERVATION:
return AssetObservation(
asset_key=asset_key, partition=partition_key, description=description
)
else:
check.failed(f"Unexpected event type {event_type}")


def report_runless_asset_events(
graphene_info: "ResolveInfo",
event_type: DagsterEventType,
asset_key: AssetKey,
partition_keys: Optional[Sequence[str]] = None,
description: Optional[str] = None,
) -> "GrapheneReportRunlessAssetEventsSuccess":
from ...schema.roots.mutation import GrapheneReportRunlessAssetEventsSuccess

instance = graphene_info.context.instance

if partition_keys is not None:
for partition_key in partition_keys:
instance.report_runless_asset_event(
create_asset_event(event_type, asset_key, partition_key, description)
)
else:
instance.report_runless_asset_event(
create_asset_event(event_type, asset_key, None, description)
)

return GrapheneReportRunlessAssetEventsSuccess(assetKey=asset_key)
31 changes: 31 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import graphene
import pendulum
from dagster._core.events import DagsterEventType
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._utils import check

from .pipelines.status import GrapheneRunStatus
from .runs import GrapheneRunConfigData
Expand Down Expand Up @@ -188,6 +190,34 @@ class Meta:
name = "LaunchBackfillParams"


class GrapheneRunlessAssetEventType(graphene.Enum):
"""The event type of an asset event."""

ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION"
ASSET_OBSERVATION = "ASSET_OBSERVATION"

class Meta:
name = "AssetEventType"

def to_dagster_event_type(self) -> DagsterEventType:
if self == GrapheneRunlessAssetEventType.ASSET_MATERIALIZATION:
return DagsterEventType.ASSET_MATERIALIZATION
elif self == GrapheneRunlessAssetEventType.ASSET_OBSERVATION:
return DagsterEventType.ASSET_OBSERVATION
else:
check.assert_never(self)


class GrapheneReportRunlessAssetEventsParams(graphene.InputObjectType):
eventType = graphene.NonNull(GrapheneRunlessAssetEventType)
assetKey = graphene.NonNull(GrapheneAssetKeyInput)
partitionKeys = graphene.InputField(graphene.List(graphene.String))
description = graphene.String()

class Meta:
name = "ReportRunlessAssetEventsParams"


class GrapheneSensorSelector(graphene.InputObjectType):
repositoryName = graphene.NonNull(graphene.String)
repositoryLocationName = graphene.NonNull(graphene.String)
Expand Down Expand Up @@ -348,4 +378,5 @@ class Meta:
GrapheneStepExecution,
GrapheneStepOutputHandle,
GrapheneInputTag,
GrapheneReportRunlessAssetEventsParams,
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from ...implementation.execution import (
delete_pipeline_run,
report_runless_asset_events,
terminate_pipeline_execution,
terminate_pipeline_execution_for_runs,
wipe_assets,
Expand Down Expand Up @@ -61,6 +62,7 @@
GrapheneExecutionParams,
GrapheneLaunchBackfillParams,
GrapheneReexecutionParams,
GrapheneReportRunlessAssetEventsParams,
GrapheneRepositorySelector,
)
from ..partition_sets import GrapheneAddDynamicPartitionResult
Expand Down Expand Up @@ -673,6 +675,57 @@ def mutate(self, graphene_info: ResolveInfo, assetKeys: Sequence[GrapheneAssetKe
)


class GrapheneReportRunlessAssetEventsSuccess(graphene.ObjectType):
"""Output indicating that runless asset events were reported."""

assetKey = graphene.NonNull(GrapheneAssetKey)

class Meta:
name = "ReportRunlessAssetEventsSuccess"


class GrapheneReportRunlessAssetEventsResult(graphene.Union):
"""The output from reporting runless events."""

class Meta:
types = (
GrapheneUnauthorizedError,
GraphenePythonError,
GrapheneReportRunlessAssetEventsSuccess,
)
name = "ReportRunlessAssetEventsResult"


class GrapheneReportRunlessAssetEventsMutation(graphene.Mutation):
"""Reports runless events for an asset or a subset of its partitions."""

Output = graphene.NonNull(GrapheneReportRunlessAssetEventsResult)

class Arguments:
eventParams = graphene.Argument(graphene.NonNull(GrapheneReportRunlessAssetEventsParams))

class Meta:
name = "ReportRunlessAssetEventsMutation"

@capture_error
@check_permission(Permissions.REPORT_RUNLESS_ASSET_EVENTS)
def mutate(
self, graphene_info: ResolveInfo, eventParams: GrapheneReportRunlessAssetEventsParams
):
event_type = eventParams["eventType"].to_dagster_event_type()
asset_key = AssetKey.from_graphql_input(eventParams["assetKey"])
partition_keys = eventParams.get("partitionKeys", None)
description = eventParams.get("description", None)

return report_runless_asset_events(
graphene_info,
event_type=event_type,
asset_key=asset_key,
partition_keys=partition_keys,
description=description,
)


class GrapheneLogTelemetrySuccess(graphene.ObjectType):
"""Output indicating that telemetry was logged."""

Expand Down Expand Up @@ -818,6 +871,7 @@ class Meta:
reloadWorkspace = GrapheneReloadWorkspaceMutation.Field()
shutdownRepositoryLocation = GrapheneShutdownRepositoryLocationMutation.Field()
wipeAssets = GrapheneAssetWipeMutation.Field()
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field()
Expand Down
Loading

1 comment on commit 2567e2d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-9m15f1g1a-elementl.vercel.app

Built with commit 2567e2d.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.