diff --git a/docs/content/concepts/assets/asset-checks.mdx b/docs/content/concepts/assets/asset-checks.mdx index 77be5d719e008..11d150d8401ba 100644 --- a/docs/content/concepts/assets/asset-checks.mdx +++ b/docs/content/concepts/assets/asset-checks.mdx @@ -158,6 +158,59 @@ defs = Definitions( There are a variety of types supported via the class. You can view the metadata on the **Checks** tab of the **Asset details** page. +### Asset check factories + +If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks. + +```python file=/concepts/assets/asset_checks/factory.py +from typing import Any, Mapping, Sequence + +from dagster import AssetCheckResult, Definitions, asset, asset_check + + +@asset +def orders(): + ... + + +@asset +def items(): + ... + + +def make_checks(check_blobs: Sequence[Mapping[str, str]]): + checks = [] + for check_blob in check_blobs: + + @asset_check(name=check_blob["name"], asset=check_blob["asset"]) + def _check(context): + db_connection = ... + rows = db_connection.execute(check_blob["sql"]) + return AssetCheckResult( + passed=len(rows) == 0, metadata={"num_rows": len(rows)} + ) + + checks.append(_check) + + return checks + + +check_blobs = [ + { + "name": "orders_id_has_no_nulls", + "asset": "orders", + "sql": "select * from orders where order_id is null", + }, + { + "name": "items_id_has_no_nulls", + "asset": "items", + "sql": "select * from items where item_id is null", + }, +] + +defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs)) +``` + --- ## Executing checks diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/asset_checks/factory.py b/examples/docs_snippets/docs_snippets/concepts/assets/asset_checks/factory.py new file mode 100644 index 0000000000000..4a0704d1b00d7 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/assets/asset_checks/factory.py @@ -0,0 +1,46 @@ +from typing import Any, Mapping, Sequence + +from dagster import AssetCheckResult, Definitions, asset, asset_check + + +@asset +def orders(): + ... + + +@asset +def items(): + ... + + +def make_checks(check_blobs: Sequence[Mapping[str, str]]): + checks = [] + for check_blob in check_blobs: + + @asset_check(name=check_blob["name"], asset=check_blob["asset"]) + def _check(context): + db_connection = ... + rows = db_connection.execute(check_blob["sql"]) + return AssetCheckResult( + passed=len(rows) == 0, metadata={"num_rows": len(rows)} + ) + + checks.append(_check) + + return checks + + +check_blobs = [ + { + "name": "orders_id_has_no_nulls", + "asset": "orders", + "sql": "select * from orders where order_id is null", + }, + { + "name": "items_id_has_no_nulls", + "asset": "items", + "sql": "select * from items where item_id is null", + }, +] + +defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs)) diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_asset_checks.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_asset_checks.py index 4a5735d1132d5..2200a583602ec 100644 --- a/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_asset_checks.py +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_asset_checks.py @@ -1,17 +1,34 @@ import pytest +from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.events import AssetKey from docs_snippets.concepts.assets.asset_checks.asset_with_check import ( defs as asset_with_check_defs, ) +from docs_snippets.concepts.assets.asset_checks.factory import check_blobs, make_checks from docs_snippets.concepts.assets.asset_checks.metadata import defs as metadata_defs from docs_snippets.concepts.assets.asset_checks.orders_check import defs as orders_defs from docs_snippets.concepts.assets.asset_checks.severity import defs as severity_defs @pytest.mark.parametrize( - "defs", [orders_defs, asset_with_check_defs, severity_defs, metadata_defs] + "defs", + [orders_defs, asset_with_check_defs, severity_defs, metadata_defs], ) def test_execute(defs): job_def = defs.get_implicit_global_asset_job_def() result = job_def.execute_in_process() assert result.success + + +def test_factory(): + assert [c.spec.key for c in make_checks(check_blobs)] == [ + AssetCheckKey( + AssetKey(["orders"]), + "orders_id_has_no_nulls", + ), + AssetCheckKey( + AssetKey(["items"]), + "items_id_has_no_nulls", + ), + ]