diff --git a/docs/sphinx/index.rst b/docs/sphinx/index.rst index 2c998e25db4e2..fdef973e69503 100644 --- a/docs/sphinx/index.rst +++ b/docs/sphinx/index.rst @@ -9,6 +9,7 @@ sections/api/apidocs/config sections/api/apidocs/errors sections/api/apidocs/execution + sections/api/apidocs/external-assets sections/api/apidocs/graphs sections/api/apidocs/hooks sections/api/apidocs/internals diff --git a/docs/sphinx/sections/api/apidocs/external-assets.rst b/docs/sphinx/sections/api/apidocs/external-assets.rst new file mode 100644 index 0000000000000..abf1197111099 --- /dev/null +++ b/docs/sphinx/sections/api/apidocs/external-assets.rst @@ -0,0 +1,90 @@ +External Assets (Experimental) +============================== + +Instance API +------------ + +External asset events can be recorded using :py:func:`DagsterInstance.report_runless_asset_event` on :py:class:`DagsterInstance`. + +*Example* +.. code-block:: python + + from dagster import DagsterInstance, AssetMaterialization, AssetKey + + instance = DagsterInstance.get() + instance.report_runless_asset_event(AssetMaterialization(AssetKey('my_asset'))) + +Rest API +-------- + +The `dagster-webserver` makes available endpoints for reporting asset events. + +`/report_asset_materialization/` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A `POST` request made to this endpoint with the required information will result in an `AssetMaterialization` event being recorded. Parameters can be passed in multiple ways, listed in precedence order below. + +**Params** +* `asset_key` (required) + * URL: the asset key can be specified as path components after `/report_asset_materialization/`, where each `/` delimits parts of a multipart :py:class:`AssetKey`. + * JSON Body `asset_key`: value is passed to the :py:class:`AssetKey` constructor. + * Query Param `asset_key`: accepts string or json encoded array for multipart keys. +* `metadata` (optional) + * JSON Body `metadata`: value is passed to the :py:class:`AssetMaterialization` constructor. + * Query Param `metadata`: accepts json encoded object. +* `data_version` (optional) + * JSON Body `data_version`: value is passed to the :py:class:`AssetMaterialization` constructor. + * Query Param `data_version`: value is passed to the :py:class:`AssetMaterialization` constructor. +* `description` (optional) + * JSON Body `description`: value is passed to the :py:class:`AssetMaterialization` constructor. + * Query Param `description`: value is passed to the :py:class:`AssetMaterialization` constructor. +* `partition` (optional) + * JSON Body `partition`: value is passed to the :py:class:`AssetMaterialization` constructor. + * Query Param `partition`: value is passed to the :py:class:`AssetMaterialization` constructor. + + +*Examples* + +.. code-block:: bash + + curl -X POST localhost:3000/report_asset_materialization/my_asset + +.. code-block:: bash + + curl --request POST \ + --url https://org.dagster.cloud/deployment/report_asset_materialization/ \ + --header 'Content-Type: application/json' \ + --header 'Dagster-Cloud-Api-Token: token' \ + --data '{ + "asset_key": "my_asset", + "metadata": { + "rows": 10 + }, + }' + +.. code-block:: python + + import requests + + url = f"{dagster_ui_host}/report_asset_materialization/my_asset" + response = requests.request("POST", url) + response.raise_for_status() + + +.. code-block:: python + + import requests + + url = "http://org.dagster.cloud/deploy/report_asset_materialization/" + + payload = { + "asset_key": "my_asset", + "metadata": {"rows": 10}, + } + headers = { + "Content-Type": "application/json", + "Dagster-Cloud-Api-Token": "token" + } + + response = requests.request("POST", url, json=payload, headers=headers) + response.raise_for_status() diff --git a/python_modules/dagster-webserver/dagster_webserver/external_assets.py b/python_modules/dagster-webserver/dagster_webserver/external_assets.py index d4dc478bec63d..a03d1c3c98a0c 100644 --- a/python_modules/dagster-webserver/dagster_webserver/external_assets.py +++ b/python_modules/dagster-webserver/dagster_webserver/external_assets.py @@ -1,3 +1,8 @@ +from typing import Any + +import dagster._check as check +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation +from dagster._core.definitions.asset_check_spec import AssetCheckSeverity from dagster._core.definitions.data_version import ( DATA_VERSION_IS_USER_PROVIDED_TAG, DATA_VERSION_TAG, @@ -11,6 +16,28 @@ ) +def _asset_key_from_request(key: str, request: Request, json_body): + check.invariant(key == "asset_key") # + + if request.path_params.get(key): + # use from_user_string to treat / as multipart key separator + return AssetKey.from_user_string(request.path_params["asset_key"]) + elif ReportAssetMatParam.asset_key in json_body: + return AssetKey(json_body[ReportAssetMatParam.asset_key]) + elif ReportAssetMatParam.asset_key in request.query_params: + return AssetKey.from_db_string(request.query_params["asset_key"]) + + return None + + +def _value_from_body_or_params(key: str, request: Request, json_body) -> Any: + if key in json_body: + return json_body[key] + elif key in request.query_params: + return request.query_params[key] + return None + + async def handle_report_asset_materialization_request( context: BaseWorkspaceRequestContext, request: Request, @@ -35,15 +62,7 @@ async def handle_report_asset_materialization_request( status_code=400, ) - asset_key = None - if request.path_params.get(ReportAssetMatParam.asset_key): - # use from_user_string to treat / as multipart key separator - asset_key = AssetKey.from_user_string(request.path_params["asset_key"]) - elif ReportAssetMatParam.asset_key in json_body: - asset_key = AssetKey(json_body[ReportAssetMatParam.asset_key]) - elif ReportAssetMatParam.asset_key in request.query_params: - asset_key = AssetKey.from_db_string(request.query_params["asset_key"]) - + asset_key = _asset_key_from_request(ReportAssetMatParam.asset_key, request, json_body) if asset_key is None: return JSONResponse( { @@ -56,28 +75,15 @@ async def handle_report_asset_materialization_request( ) tags = None - if ReportAssetMatParam.data_version in json_body: - tags = { - DATA_VERSION_TAG: json_body[ReportAssetMatParam.data_version], - DATA_VERSION_IS_USER_PROVIDED_TAG: "true", - } - elif ReportAssetMatParam.data_version in request.query_params: + data_version = _value_from_body_or_params(ReportAssetMatParam.data_version, request, json_body) + if data_version is not None: tags = { - DATA_VERSION_TAG: request.query_params[ReportAssetMatParam.data_version], + DATA_VERSION_TAG: data_version, DATA_VERSION_IS_USER_PROVIDED_TAG: "true", } - partition = None - if ReportAssetMatParam.partition in json_body: - partition = json_body[ReportAssetMatParam.partition] - elif ReportAssetMatParam.partition in request.query_params: - partition = request.query_params[ReportAssetMatParam.partition] - - description = None - if ReportAssetMatParam.description in json_body: - description = json_body[ReportAssetMatParam.description] - elif ReportAssetMatParam.description in request.query_params: - description = request.query_params[ReportAssetMatParam.description] + partition = _value_from_body_or_params(ReportAssetMatParam.partition, request, json_body) + description = _value_from_body_or_params(ReportAssetMatParam.description, request, json_body) metadata = None if ReportAssetMatParam.metadata in json_body: @@ -114,11 +120,89 @@ async def handle_report_asset_materialization_request( return JSONResponse({}) +async def handle_report_asset_check_request( + context: BaseWorkspaceRequestContext, + request: Request, +) -> JSONResponse: + # Record a runless asset check evaluation event. + # The asset key is passed as url path with / delimiting parts or as a query param. + # Properties can be passed as json post body or query params, with that order of precedence. + + body_content_type = request.headers.get("content-type") + if body_content_type is None: + json_body = {} + elif body_content_type == "application/json": + json_body = await request.json() + else: + return JSONResponse( + { + "error": ( + f"Unhandled content type {body_content_type}, expect no body or" + " application/json" + ), + }, + status_code=400, + ) + + asset_key = _asset_key_from_request(ReportAssetCheckEvalParam.asset_key, request, json_body) + if asset_key is None: + return JSONResponse( + { + "error": ( + "Empty asset key, must provide asset key as url path after" + " /report_asset_check_evaluation/ or query param asset_key." + ), + }, + status_code=400, + ) + + passed = _value_from_body_or_params(ReportAssetCheckEvalParam.passed, request, json_body) + check_name = _value_from_body_or_params( + ReportAssetCheckEvalParam.check_name, request, json_body + ) + severity = _value_from_body_or_params(ReportAssetCheckEvalParam.severity, request, json_body) + if severity is None: + severity = "ERROR" # default + + metadata = {} + if ReportAssetCheckEvalParam.metadata in json_body: + metadata = json_body[ReportAssetCheckEvalParam.metadata] + elif ReportAssetCheckEvalParam.metadata in request.query_params: + try: + metadata = json.loads(request.query_params[ReportAssetCheckEvalParam.metadata]) + except Exception as exc: + return JSONResponse( + { + "error": f"Error parsing metadata json: {exc}", + }, + status_code=400, + ) + + try: + evaluation = AssetCheckEvaluation( + check_name=check_name, + passed=passed, + asset_key=asset_key, + metadata=metadata, + severity=AssetCheckSeverity(severity), + ) + except Exception as exc: + return JSONResponse( + { + "error": f"Error constructing AssetMaterialization: {exc}", + }, + status_code=400, + ) + + context.instance.report_runless_asset_event(evaluation) + + return JSONResponse({}) + + +# note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword class ReportAssetMatParam: """Class to collect all supported args by report_asset_materialization endpoint to ensure consistency with related APIs. - - note: Enum not used to avoid value type problems X(str, Enum) doesn't work as partition conflicts with keyword """ asset_key = "asset_key" @@ -126,3 +210,15 @@ class ReportAssetMatParam: metadata = "metadata" description = "description" partition = "partition" + + +class ReportAssetCheckEvalParam: + """Class to collect all supported args by report_asset_check_evaluation endpoint + to ensure consistency with related APIs. + """ + + asset_key = "asset_key" + check_name = "check_name" + metadata = "metadata" + severity = "severity" + passed = "passed" diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 35f1cbbd1c546..3968335dfa1f0 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -32,7 +32,10 @@ from starlette.routing import Mount, Route, WebSocketRoute from starlette.types import Message -from .external_assets import handle_report_asset_materialization_request +from .external_assets import ( + handle_report_asset_check_request, + handle_report_asset_materialization_request, +) from .graphql import GraphQLServer from .version import __version__ @@ -202,6 +205,10 @@ async def report_asset_materialization_endpoint(self, request: Request) -> JSONR context = self.make_request_context(request) return await handle_report_asset_materialization_request(context, request) + async def report_asset_check_endpoint(self, request: Request) -> JSONResponse: + context = self.make_request_context(request) + return await handle_report_asset_check_request(context, request) + def index_html_endpoint(self, request: Request): """Serves root html.""" index_path = self.relative_path("webapp/build/index.html") @@ -321,6 +328,11 @@ def build_routes(self): self.report_asset_materialization_endpoint, methods=["POST"], ), + Route( + "/report_asset_check/{asset_key:path}", + self.report_asset_check_endpoint, + methods=["POST"], + ), Route("/{path:path}", self.index_html_endpoint), Route("/", self.index_html_endpoint), ] diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py index 2d91c53db4ade..af4ebf07bb1e3 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py @@ -3,6 +3,7 @@ from dagster import ( DagsterInstance, ) +from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.data_version import ( DATA_VERSION_IS_USER_PROVIDED_TAG, DATA_VERSION_TAG, @@ -10,7 +11,7 @@ from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._seven import json from dagster_pipes import PipesContext -from dagster_webserver.external_assets import ReportAssetMatParam +from dagster_webserver.external_assets import ReportAssetCheckEvalParam, ReportAssetMatParam from starlette.testclient import TestClient @@ -178,3 +179,57 @@ def test_report_asset_materialization_apis_consistent( KNOWN_DIFF = {"partition", "description"} assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF + + +def test_report_asset_check_evaluation_apis_consistent( + instance: DagsterInstance, test_client: TestClient +): + # ensure the ext report_asset_check_result and the API endpoint have the same capabilities + sample_payload = { + "asset_key": "sample_key", + "check_name": "sample_check", + "metadata": {"meta": "data"}, + "severity": "WARN", + "passed": False, + } + + # sample has entry for all supported params (banking on usage of enum) + assert set(sample_payload.keys()) == set( + {v for k, v in vars(ReportAssetCheckEvalParam).items() if not k.startswith("__")} + ) + + response = test_client.post("/report_asset_check/", json=sample_payload) + assert response.status_code == 200, response.json() + check_key = AssetCheckKey(name="sample_check", asset_key=AssetKey("sample_key")) + results = instance.event_log_storage.get_latest_asset_check_execution_by_key([check_key]) + assert results + record = results[check_key] + assert record + evt = record.event.dagster_event + assert evt + evaluation = evt.asset_check_evaluation_data + + for k, v in sample_payload.items(): + if k == "check_name": + assert evaluation.check_name == v + elif k == "asset_key": + assert evaluation.asset_key == AssetKey(v) + elif k == "metadata": + assert evaluation.metadata.keys() == v.keys() + elif k == "passed": + assert evaluation.passed == v + elif k == "severity": + assert evaluation.severity.value == v + else: + assert ( + False + ), "need to add validation that sample payload content was written successfully" + + # all ext report_asset_materialization kwargs should be in sample payload + sig = inspect.signature(PipesContext.report_asset_check) + skip_set = {"self"} + params = [p for p in sig.parameters if p not in skip_set] + + KNOWN_DIFF = set() + + assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF