Skip to content

Commit

Permalink
first stab
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Oct 5, 2023
1 parent 4c084f3 commit bd830a1
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
import dagster._check as check
from dagster._annotations import deprecated
from dagster._core.definitions.events import AssetKey
from dagster._core.events import EngineEventData
from dagster._core.events import (
EngineEventData,
DagsterEventType,
AssetMaterialization,
AssetObservation,
)
from dagster._core.instance import (
DagsterInstance,
)
Expand Down Expand Up @@ -46,6 +51,7 @@
from ...schema.errors import GrapheneRunNotFoundError
from ...schema.roots.mutation import (
GrapheneAssetWipeSuccess,
GrapheneReportRunlessAssetEventsSuccess,
GrapheneDeletePipelineRunSuccess,
GrapheneTerminateRunFailure,
GrapheneTerminateRunsResult,
Expand Down Expand Up @@ -370,3 +376,43 @@ def wipe_assets(
instance = graphene_info.context.instance
instance.wipe_assets(asset_keys)
return GrapheneAssetWipeSuccess(assetKeys=asset_keys)


def create_asset_event(
event_type: DagsterEventType,
asset_key: AssetKey,
partition_key: Optional[str],
description: Optional[str],
) -> Union[AssetMaterialization, AssetObservation]:
if event_type == DagsterEventType.ASSET_MATERIALIZATION:
return AssetMaterialization(
asset_key=asset_key, partition=partition_key, description=description
)
elif event_type == DagsterEventType.ASSET_OBSERVATION:
return AssetObservation(
asset_key=asset_key, partition=partition_key, description=description
)
else:
check.failed(f"Unexpected event type {event_type}")


def report_runless_asset_events(
graphene_info: "ResolveInfo",
event_type: DagsterEventType,
asset_key: AssetKey,
partition_keys: Optional[Sequence[str]] = None,
description: Optional[str] = None,
) -> "GrapheneAssetWipeSuccess":
instance = graphene_info.context.instance

if partition_keys is not None:
for partition_key in partition_keys:
instance.report_runless_asset_event(
create_asset_event(event_type, asset_key, partition_key, description)
)
else:
instance.report_runless_asset_event(
create_asset_event(event_type, asset_key, None, description)
)

return GrapheneReportRunlessAssetEventsSuccess(asset_key=asset_key)
30 changes: 30 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import graphene
import pendulum
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._core.events import DagsterEventType

from .pipelines.status import GrapheneRunStatus
from .runs import GrapheneRunConfigData
Expand Down Expand Up @@ -188,6 +189,34 @@ class Meta:
name = "LaunchBackfillParams"


class GrapheneAssetEventType(graphene.Enum):
"""The event type of an asset event."""

ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION"
ASSET_OBSERVATION = "ASSET_OBSERVATION"

class Meta:
name = "AssetEventType"

def to_dagster_event_type(self):
if self == GrapheneAssetEventType.ASSET_MATERIALIZATION:
return DagsterEventType.ASSET_MATERIALIZATION
elif self == GrapheneAssetEventType.ASSET_OBSERVATION:
return DagsterEventType.ASSET_OBSERVATION
else:
check.assert_never(self)


class GrapheneReportRunlessAssetEventsParams(graphene.InputObjectType):
eventType = graphene.NonNull(GrapheneAssetEventType)
assetKey = graphene.NonNull(GrapheneAssetKeyInput)
partitionNames = graphene.InputField(graphene.List(graphene.String))
description = graphene.String()

class Meta:
name = "ReportRunlessAssetEventsParams"


class GrapheneSensorSelector(graphene.InputObjectType):
repositoryName = graphene.NonNull(graphene.String)
repositoryLocationName = graphene.NonNull(graphene.String)
Expand Down Expand Up @@ -348,4 +377,5 @@ class Meta:
GrapheneStepExecution,
GrapheneStepOutputHandle,
GrapheneInputTag,
GrapheneReportRunlessAssetEventsParams,
]
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
terminate_pipeline_execution,
terminate_pipeline_execution_for_runs,
wipe_assets,
report_runless_asset_events,
)
from ...implementation.external import fetch_workspace, get_full_external_job_or_raise
from ...implementation.telemetry import log_ui_telemetry_event
Expand Down Expand Up @@ -58,6 +59,7 @@
from ..external import GrapheneWorkspace, GrapheneWorkspaceLocationEntry
from ..inputs import (
GrapheneAssetKeyInput,
GrapheneReportRunlessAssetEventsParams,
GrapheneExecutionParams,
GrapheneLaunchBackfillParams,
GrapheneReexecutionParams,
Expand Down Expand Up @@ -673,6 +675,50 @@ def mutate(self, graphene_info: ResolveInfo, assetKeys: Sequence[GrapheneAssetKe
)


class GrapheneReportRunlessAssetEventsSuccess(graphene.ObjectType):
"""Output indicating that runless asset events were reported."""

assetKey = GrapheneAssetKey

class Meta:
name = "ReportRunlessAssetEventsSuccess"


class GrapheneReportRunlessAssetEventsResult(graphene.Union):
"""The output from reporting runless events."""

class Meta:
types = (
GrapheneAssetNotFoundError,
GrapheneUnauthorizedError,
GraphenePythonError,
GrapheneReportRunlessAssetEventsSuccess,
)
name = "ReportRunlessAssetEventsResult"


class GrapheneReportRunlessAssetEventsMutation(graphene.Mutation):
"""Reports runless events for an asset or a subset of its partitions."""

Output = graphene.NonNull(GrapheneReportRunlessAssetEventsResult)

class Arguments:
eventParams = graphene.Argument(GrapheneReportRunlessAssetEventsParams)

class Meta:
name = "ReportRunlessAssetEventsMutation"

@capture_error
@check_permission(Permissions.REPORT_RUNLESS_ASSET_EVENTS)
def mutate(
self, graphene_info: ResolveInfo, eventParams: GrapheneReportRunlessAssetEventsParams
):
event_type = eventParams.eventType.to_dagster_event_type()
asset_key = AssetKey.from_graphql_input(eventParams["assetKey"])

return report_runless_asset_events(graphene_info, AssetKey.from_graphql_input(eventParams))


class GrapheneLogTelemetrySuccess(graphene.ObjectType):
"""Output indicating that telemetry was logged."""

Expand Down Expand Up @@ -818,6 +864,7 @@ class Meta:
reloadWorkspace = GrapheneReloadWorkspaceMutation.Field()
shutdownRepositoryLocation = GrapheneShutdownRepositoryLocationMutation.Field()
wipeAssets = GrapheneAssetWipeMutation.Field()
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field()
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_core/workspace/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Permissions(str, Enum):
RELOAD_REPOSITORY_LOCATION = "reload_repository_location"
RELOAD_WORKSPACE = "reload_workspace"
WIPE_ASSETS = "wipe_assets"
REPORT_RUNLESS_ASSET_EVENTS = "report_runless_asset_events"
LAUNCH_PARTITION_BACKFILL = "launch_partition_backfill"
CANCEL_PARTITION_BACKFILL = "cancel_partition_backfill"
EDIT_DYNAMIC_PARTITIONS = "edit_dynamic_partitions"
Expand All @@ -37,6 +38,7 @@ def __str__(self) -> str:
Permissions.RELOAD_REPOSITORY_LOCATION: False,
Permissions.RELOAD_WORKSPACE: False,
Permissions.WIPE_ASSETS: False,
Permissions.REPORT_RUNLESS_ASSET_EVENTS: False,
Permissions.LAUNCH_PARTITION_BACKFILL: False,
Permissions.CANCEL_PARTITION_BACKFILL: False,
Permissions.EDIT_DYNAMIC_PARTITIONS: False,
Expand All @@ -56,6 +58,7 @@ def __str__(self) -> str:
Permissions.RELOAD_REPOSITORY_LOCATION: True,
Permissions.RELOAD_WORKSPACE: True,
Permissions.WIPE_ASSETS: True,
Permissions.REPORT_RUNLESS_ASSET_EVENTS: True,
Permissions.LAUNCH_PARTITION_BACKFILL: True,
Permissions.CANCEL_PARTITION_BACKFILL: True,
Permissions.EDIT_DYNAMIC_PARTITIONS: True,
Expand Down

0 comments on commit bd830a1

Please sign in to comment.