Skip to content

Commit

Permalink
[external assets] report_asset_check endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Oct 11, 2023
1 parent f465ec0 commit cceaf17
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 31 deletions.
1 change: 1 addition & 0 deletions docs/sphinx/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
sections/api/apidocs/config
sections/api/apidocs/errors
sections/api/apidocs/execution
sections/api/apidocs/external-assets
sections/api/apidocs/graphs
sections/api/apidocs/hooks
sections/api/apidocs/internals
Expand Down
90 changes: 90 additions & 0 deletions docs/sphinx/sections/api/apidocs/external-assets.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
External Assets (Experimental)
==============================

Instance API
------------

External asset events can be recorded using :py:func:`DagsterInstance.report_runless_asset_event` on :py:class:`DagsterInstance`.

*Example*
.. code-block:: python
from dagster import DagsterInstance, AssetMaterialization, AssetKey
instance = DagsterInstance.get()
instance.report_runless_asset_event(AssetMaterialization(AssetKey('my_asset')))
Rest API
--------

The `dagster-webserver` makes available endpoints for reporting asset events.

`/report_asset_materialization/`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

A `POST` request made to this endpoint with the required information will result in an `AssetMaterialization` event being recorded. Parameters can be passed in multiple ways, listed in precedence order below.

**Params**
* `asset_key` (required)
* URL: the asset key can be specified as path components after `/report_asset_materialization/`, where each `/` delimits parts of a multipart :py:class:`AssetKey`.
* JSON Body `asset_key`: value is passed to the :py:class:`AssetKey` constructor.
* Query Param `asset_key`: accepts string or json encoded array for multipart keys.
* `metadata` (optional)
* JSON Body `metadata`: value is passed to the :py:class:`AssetMaterialization` constructor.
* Query Param `metadata`: accepts json encoded object.
* `data_version` (optional)
* JSON Body `data_version`: value is passed to the :py:class:`AssetMaterialization` constructor.
* Query Param `data_version`: value is passed to the :py:class:`AssetMaterialization` constructor.
* `description` (optional)
* JSON Body `description`: value is passed to the :py:class:`AssetMaterialization` constructor.
* Query Param `description`: value is passed to the :py:class:`AssetMaterialization` constructor.
* `partition` (optional)
* JSON Body `partition`: value is passed to the :py:class:`AssetMaterialization` constructor.
* Query Param `partition`: value is passed to the :py:class:`AssetMaterialization` constructor.


*Examples*

.. code-block:: bash
curl -X POST localhost:3000/report_asset_materialization/my_asset
.. code-block:: bash
curl --request POST \
--url https://org.dagster.cloud/deployment/report_asset_materialization/ \
--header 'Content-Type: application/json' \
--header 'Dagster-Cloud-Api-Token: token' \
--data '{
"asset_key": "my_asset",
"metadata": {
"rows": 10
},
}'
.. code-block:: python
import requests
url = f"{dagster_ui_host}/report_asset_materialization/my_asset"
response = requests.request("POST", url)
response.raise_for_status()
.. code-block:: python
import requests
url = "http://org.dagster.cloud/deploy/report_asset_materialization/"
payload = {
"asset_key": "my_asset",
"metadata": {"rows": 10},
}
headers = {
"Content-Type": "application/json",
"Dagster-Cloud-Api-Token": "token"
}
response = requests.request("POST", url, json=payload, headers=headers)
response.raise_for_status()
154 changes: 125 additions & 29 deletions python_modules/dagster-webserver/dagster_webserver/external_assets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from typing import Any

import dagster._check as check
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import (
DATA_VERSION_IS_USER_PROVIDED_TAG,
DATA_VERSION_TAG,
Expand All @@ -11,6 +16,28 @@
)


def _asset_key_from_request(key: str, request: Request, json_body):
check.invariant(key == "asset_key") #

if request.path_params.get(key):
# use from_user_string to treat / as multipart key separator
return AssetKey.from_user_string(request.path_params["asset_key"])
elif ReportAssetMatParam.asset_key in json_body:
return AssetKey(json_body[ReportAssetMatParam.asset_key])
elif ReportAssetMatParam.asset_key in request.query_params:
return AssetKey.from_db_string(request.query_params["asset_key"])

return None


def _value_from_body_or_params(key: str, request: Request, json_body) -> Any:
if key in json_body:
return json_body[key]
elif key in request.query_params:
return request.query_params[key]
return None


async def handle_report_asset_materialization_request(
context: BaseWorkspaceRequestContext,
request: Request,
Expand All @@ -35,15 +62,7 @@ async def handle_report_asset_materialization_request(
status_code=400,
)

asset_key = None
if request.path_params.get(ReportAssetMatParam.asset_key):
# use from_user_string to treat / as multipart key separator
asset_key = AssetKey.from_user_string(request.path_params["asset_key"])
elif ReportAssetMatParam.asset_key in json_body:
asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key])
elif ReportAssetMatParam.asset_key in request.query_params:
asset_key = AssetKey.from_db_string(request.query_params["asset_key"])

asset_key = _asset_key_from_request(ReportAssetMatParam.asset_key, request, json_body)
if asset_key is None:
return JSONResponse(
{
Expand All @@ -56,28 +75,15 @@ async def handle_report_asset_materialization_request(
)

tags = None
if ReportAssetMatParam.data_version in json_body:
tags = {
DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version],
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}
elif ReportAssetMatParam.data_version in request.query_params:
data_version = _value_from_body_or_params(ReportAssetMatParam.data_version, request, json_body)
if data_version is not None:
tags = {
DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version],
DATA_VERSION_TAG: data_version,
DATA_VERSION_IS_USER_PROVIDED_TAG: "true",
}

partition = None
if ReportAssetMatParam.partition in json_body:
partition = json_body[ReportAssetMatParam.partition]
elif ReportAssetMatParam.partition in request.query_params:
partition = request.query_params[ReportAssetMatParam.partition]

description = None
if ReportAssetMatParam.description in json_body:
description = json_body[ReportAssetMatParam.description]
elif ReportAssetMatParam.description in request.query_params:
description = request.query_params[ReportAssetMatParam.description]
partition = _value_from_body_or_params(ReportAssetMatParam.partition, request, json_body)
description = _value_from_body_or_params(ReportAssetMatParam.description, request, json_body)

metadata = None
if ReportAssetMatParam.metadata in json_body:
Expand Down Expand Up @@ -114,15 +120,105 @@ async def handle_report_asset_materialization_request(
return JSONResponse({})


async def handle_report_asset_check_request(
context: BaseWorkspaceRequestContext,
request: Request,
) -> JSONResponse:
# Record a runless asset check evaluation event.
# The asset key is passed as url path with / delimiting parts or as a query param.
# Properties can be passed as json post body or query params, with that order of precedence.

body_content_type = request.headers.get("content-type")
if body_content_type is None:
json_body = {}
elif body_content_type == "application/json":
json_body = await request.json()
else:
return JSONResponse(
{
"error": (
f"Unhandled content type {body_content_type}, expect no body or"
" application/json"
),
},
status_code=400,
)

asset_key = _asset_key_from_request(ReportAssetCheckEvalParam.asset_key, request, json_body)
if asset_key is None:
return JSONResponse(
{
"error": (
"Empty asset key, must provide asset key as url path after"
" /report_asset_check_evaluation/ or query param asset_key."
),
},
status_code=400,
)

passed = _value_from_body_or_params(ReportAssetCheckEvalParam.passed, request, json_body)
check_name = _value_from_body_or_params(
ReportAssetCheckEvalParam.check_name, request, json_body
)
severity = _value_from_body_or_params(ReportAssetCheckEvalParam.severity, request, json_body)
if severity is None:
severity = "ERROR" # default

metadata = {}
if ReportAssetCheckEvalParam.metadata in json_body:
metadata = json_body[ReportAssetCheckEvalParam.metadata]
elif ReportAssetCheckEvalParam.metadata in request.query_params:
try:
metadata = json.loads(request.query_params[ReportAssetCheckEvalParam.metadata])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing metadata json: {exc}",
},
status_code=400,
)

try:
evaluation = AssetCheckEvaluation(
check_name=check_name,
passed=passed,
asset_key=asset_key,
metadata=metadata,
severity=AssetCheckSeverity(severity),
)
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
},
status_code=400,
)

context.instance.report_runless_asset_event(evaluation)

return JSONResponse({})


# note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword
class ReportAssetMatParam:
"""Class to collect all supported args by report_asset_materialization endpoint
to ensure consistency with related APIs.
note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword
"""

asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"


class ReportAssetCheckEvalParam:
"""Class to collect all supported args by report_asset_check_evaluation endpoint
to ensure consistency with related APIs.
"""

asset_key = "asset_key"
check_name = "check_name"
metadata = "metadata"
severity = "severity"
passed = "passed"
14 changes: 13 additions & 1 deletion python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
from starlette.routing import Mount, Route, WebSocketRoute
from starlette.types import Message

from .external_assets import handle_report_asset_materialization_request
from .external_assets import (
handle_report_asset_check_request,
handle_report_asset_materialization_request,
)
from .graphql import GraphQLServer
from .version import __version__

Expand Down Expand Up @@ -202,6 +205,10 @@ async def report_asset_materialization_endpoint(self, request: Request) -> JSONR
context = self.make_request_context(request)
return await handle_report_asset_materialization_request(context, request)

async def report_asset_check_endpoint(self, request: Request) -> JSONResponse:
context = self.make_request_context(request)
return await handle_report_asset_check_request(context, request)

def index_html_endpoint(self, request: Request):
"""Serves root html."""
index_path = self.relative_path("webapp/build/index.html")
Expand Down Expand Up @@ -321,6 +328,11 @@ def build_routes(self):
self.report_asset_materialization_endpoint,
methods=["POST"],
),
Route(
"/report_asset_check/{asset_key:path}",
self.report_asset_check_endpoint,
methods=["POST"],
),
Route("/{path:path}", self.index_html_endpoint),
Route("/", self.index_html_endpoint),
]
Expand Down
Loading

0 comments on commit cceaf17

Please sign in to comment.