From d976b9f77708ff2cf48fdfcb039a66362028bd12 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 19 Jul 2024 09:08:04 -0700 Subject: [PATCH] On multi-asset concept page, use AssetSpec, not AssetOut --- docs/content/concepts/assets/multi-assets.mdx | 95 ++++++++----------- ...multi_asset_conditional_materialization.py | 22 ++--- .../concepts/assets/multi_assets.py | 51 ++++------ 3 files changed, 72 insertions(+), 96 deletions(-) diff --git a/docs/content/concepts/assets/multi-assets.mdx b/docs/content/concepts/assets/multi-assets.mdx index e76e68760eada..0ff6b303e803e 100644 --- a/docs/content/concepts/assets/multi-assets.mdx +++ b/docs/content/concepts/assets/multi-assets.mdx @@ -28,109 +28,96 @@ The function responsible for computing the contents of any asset is an [op](/con ### A basic multi-asset -The easiest way to create a multi-asset is with the decorator. This decorator functions similarly to the decorator, but requires an `outs` parameter specifying each output asset of the function. +The easiest way to create a multi-asset is with the decorator. This decorator functions similarly to the decorator, but requires a `specs` parameter specifying each asset that the function materializes. ```python file=/concepts/assets/multi_assets.py startafter=start_basic_multi_asset endbefore=end_basic_multi_asset -from dagster import AssetOut, multi_asset, AssetExecutionContext +from dagster import AssetSpec, multi_asset -@multi_asset( - outs={ - "my_string_asset": AssetOut(), - "my_int_asset": AssetOut(), - } -) +@multi_asset(specs=[AssetSpec("users"), AssetSpec("orders")]) def my_function(): - return "abc", 123 + # some code that writes out data to the users table and the orders table + ... ``` -By default, the names of the outputs will be used to form the asset keys of the multi-asset. The decorated function will be used to create the op for these assets and must emit an output for each of them. In this case, we can emit multiple outputs by returning a tuple of values, one for each asset. - ### Conditional materialization -In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the `is_required` parameter along with `yield` syntax to implement this behavior. +In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the `skippable` parameter along with `yield` syntax and to implement this behavior. + +If the `skippable` parameter is set to `True` on an , and your function does not `yield` a object for that asset, then: -If the `is_required` parameter is set to `False` on an output, and your function does not `yield` an object for that output, then no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger. +- No asset materialization event will be created +- Downstream assets in the same run will not be materialized +- Asset sensors monitoring the asset will not trigger ```python file=/concepts/assets/multi_asset_conditional_materialization.py import random -from dagster import AssetOut, Output, asset, multi_asset +from dagster import AssetSpec, MaterializeResult, asset, multi_asset @multi_asset( - outs={"asset1": AssetOut(is_required=False), "asset2": AssetOut(is_required=False)} + specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)] ) def assets_1_and_2(): if random.randint(1, 10) < 5: - yield Output([1, 2, 3, 4], output_name="asset1") + yield MaterializeResult(asset_key="asset1") if random.randint(1, 10) < 5: - yield Output([6, 7, 8, 9], output_name="asset2") + yield MaterializeResult(asset_key="asset2") -@asset -def downstream1(asset1): - # will not run when assets_1_and_2 doesn't materialize the asset1 - return asset1 + [5] +@asset(deps=["asset1"]) +def downstream1(): + """Will not run when assets_1_and_2 doesn't materialize asset1.""" -@asset -def downstream2(asset2): - # will not run when assets_1_and_2 doesn't materialize the asset2 - return asset2 + [10] +@asset(deps=["asset2"]) +def downstream2(): + """Will not run when assets_1_and_2 doesn't materialize asset2.""" ``` ### Subsetting multi-assets By default, it is assumed that the computation inside of a multi-asset will always produce the contents all of the associated assets. This means that attempting to execute a set of assets that produces some, but not all, of the assets defined by a given multi-asset will result in an error. -Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the `is_required` attribute of the outputs to `False`, and set the `can_subset` parameter of the decorator to `True`. +Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the `skippable` attribute of the asset specs to `True`, and set the `can_subset` parameter of the decorator to `True`. -Inside the body of the function, we can use `context.selected_output_names` or `context.selected_asset_keys` to find out which computations should be run. +Inside the body of the function, we can use `context.selected_asset_keys` to find out which assets should be materialized. ```python file=/concepts/assets/multi_assets.py startafter=start_subsettable_multi_asset endbefore=end_subsettable_multi_asset -from dagster import AssetOut, Output, multi_asset +from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset @multi_asset( - outs={ - "a": AssetOut(is_required=False), - "b": AssetOut(is_required=False), - }, + specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)], can_subset=True, ) def split_actions(context: AssetExecutionContext): - if "a" in context.op_execution_context.selected_output_names: - yield Output(value=123, output_name="a") - if "b" in context.op_execution_context.selected_output_names: - yield Output(value=456, output_name="b") + if "asset1" in context.op_execution_context.selected_asset_keys: + yield MaterializeResult(asset_key="asset1") + if "asset2" in context.op_execution_context.selected_asset_keys: + yield MaterializeResult(asset_key="asset2") ``` -Because our outputs are now optional, we can no longer rely on returning a tuple of values, as we don't know in advance which values will be computed. Instead, we explicitly `yield` each output that we're expected to create. - ### Dependencies inside multi-assets -When a multi-asset is created, it is assumed that each output asset depends on all of the input assets. This may not always be the case, however. - -In these situations, you may optionally provide a mapping from each output asset to the set of s that it depends on. This information is used to display lineage information in the Dagster UI and for parsing selections over your asset graph. +Assets defined within multi-assets can have dependencies on upstream assets. These dependencies can be expressed using the `deps` attribute on . ```python file=/concepts/assets/multi_assets.py startafter=start_asset_deps_multi_asset endbefore=end_asset_deps_multi_asset -from dagster import AssetKey, AssetOut, Output, multi_asset +from dagster import AssetKey, AssetSpec, asset, multi_asset -@multi_asset( - outs={"c": AssetOut(), "d": AssetOut()}, - internal_asset_deps={ - "c": {AssetKey("a")}, - "d": {AssetKey("b")}, - }, -) -def my_complex_assets(a, b): - # c only depends on a - yield Output(value=a + 1, output_name="c") - # d only depends on b - yield Output(value=b + 1, output_name="d") +@asset +def a(): ... + + +@asset +def b(): ... + + +@multi_asset(specs=[AssetSpec("c", deps=["b"]), AssetSpec("d", deps=["a"])]) +def my_complex_assets(): ... ``` ### Multi-asset code versions diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/multi_asset_conditional_materialization.py b/examples/docs_snippets/docs_snippets/concepts/assets/multi_asset_conditional_materialization.py index b6c5949da7522..6bf0a6fa98df5 100644 --- a/examples/docs_snippets/docs_snippets/concepts/assets/multi_asset_conditional_materialization.py +++ b/examples/docs_snippets/docs_snippets/concepts/assets/multi_asset_conditional_materialization.py @@ -1,26 +1,24 @@ import random -from dagster import AssetOut, Output, asset, multi_asset +from dagster import AssetSpec, MaterializeResult, asset, multi_asset @multi_asset( - outs={"asset1": AssetOut(is_required=False), "asset2": AssetOut(is_required=False)} + specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)] ) def assets_1_and_2(): if random.randint(1, 10) < 5: - yield Output([1, 2, 3, 4], output_name="asset1") + yield MaterializeResult(asset_key="asset1") if random.randint(1, 10) < 5: - yield Output([6, 7, 8, 9], output_name="asset2") + yield MaterializeResult(asset_key="asset2") -@asset -def downstream1(asset1): - # will not run when assets_1_and_2 doesn't materialize the asset1 - return asset1 + [5] +@asset(deps=["asset1"]) +def downstream1(): + """Will not run when assets_1_and_2 doesn't materialize asset1.""" -@asset -def downstream2(asset2): - # will not run when assets_1_and_2 doesn't materialize the asset2 - return asset2 + [10] +@asset(deps=["asset2"]) +def downstream2(): + """Will not run when assets_1_and_2 doesn't materialize asset2.""" diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py b/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py index b0318fb59eb61..96b6451584a49 100644 --- a/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py +++ b/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py @@ -2,17 +2,13 @@ # start_basic_multi_asset -from dagster import AssetOut, multi_asset, AssetExecutionContext +from dagster import AssetSpec, multi_asset -@multi_asset( - outs={ - "my_string_asset": AssetOut(), - "my_int_asset": AssetOut(), - } -) +@multi_asset(specs=[AssetSpec("users"), AssetSpec("orders")]) def my_function(): - return "abc", 123 + # some code that writes out data to the users table and the orders table + ... # end_basic_multi_asset @@ -34,41 +30,36 @@ def my_assets(): # end_io_manager_multi_asset # start_subsettable_multi_asset -from dagster import AssetOut, Output, multi_asset +from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset @multi_asset( - outs={ - "a": AssetOut(is_required=False), - "b": AssetOut(is_required=False), - }, + specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)], can_subset=True, ) def split_actions(context: AssetExecutionContext): - if "a" in context.op_execution_context.selected_output_names: - yield Output(value=123, output_name="a") - if "b" in context.op_execution_context.selected_output_names: - yield Output(value=456, output_name="b") + if "asset1" in context.op_execution_context.selected_asset_keys: + yield MaterializeResult(asset_key="asset1") + if "asset2" in context.op_execution_context.selected_asset_keys: + yield MaterializeResult(asset_key="asset2") # end_subsettable_multi_asset # start_asset_deps_multi_asset -from dagster import AssetKey, AssetOut, Output, multi_asset +from dagster import AssetKey, AssetSpec, asset, multi_asset -@multi_asset( - outs={"c": AssetOut(), "d": AssetOut()}, - internal_asset_deps={ - "c": {AssetKey("a")}, - "d": {AssetKey("b")}, - }, -) -def my_complex_assets(a, b): - # c only depends on a - yield Output(value=a + 1, output_name="c") - # d only depends on b - yield Output(value=b + 1, output_name="d") +@asset +def a(): ... + + +@asset +def b(): ... + + +@multi_asset(specs=[AssetSpec("c", deps=["b"]), AssetSpec("d", deps=["a"])]) +def my_complex_assets(): ... # end_asset_deps_multi_asset