Skip to content

Commit

Permalink
On multi-asset concept page, use AssetSpec, not AssetOut
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Aug 7, 2024
1 parent 9a9f482 commit d976b9f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 96 deletions.
95 changes: 41 additions & 54 deletions docs/content/concepts/assets/multi-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,109 +28,96 @@ The function responsible for computing the contents of any asset is an [op](/con

### A basic multi-asset

The easiest way to create a multi-asset is with the <PyObject object="multi_asset" decorator /> decorator. This decorator functions similarly to the <PyObject object="asset" decorator /> decorator, but requires an `outs` parameter specifying each output asset of the function.
The easiest way to create a multi-asset is with the <PyObject object="multi_asset" decorator /> decorator. This decorator functions similarly to the <PyObject object="asset" decorator /> decorator, but requires a `specs` parameter specifying each asset that the function materializes.

```python file=/concepts/assets/multi_assets.py startafter=start_basic_multi_asset endbefore=end_basic_multi_asset
from dagster import AssetOut, multi_asset, AssetExecutionContext
from dagster import AssetSpec, multi_asset


@multi_asset(
outs={
"my_string_asset": AssetOut(),
"my_int_asset": AssetOut(),
}
)
@multi_asset(specs=[AssetSpec("users"), AssetSpec("orders")])
def my_function():
return "abc", 123
# some code that writes out data to the users table and the orders table
...
```

By default, the names of the outputs will be used to form the asset keys of the multi-asset. The decorated function will be used to create the op for these assets and must emit an output for each of them. In this case, we can emit multiple outputs by returning a tuple of values, one for each asset.

### Conditional materialization

In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the `is_required` parameter along with `yield` syntax to implement this behavior.
In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the `skippable` parameter along with `yield` syntax and <PyObject object="MaterializeResult" /> to implement this behavior.

If the `skippable` parameter is set to `True` on an <PyObject object="AssetSpec" />, and your function does not `yield` a <PyObject object="MaterializeResult" /> object for that asset, then:

If the `is_required` parameter is set to `False` on an output, and your function does not `yield` an <PyObject object="Output" /> object for that output, then no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger.
- No asset materialization event will be created
- Downstream assets in the same run will not be materialized
- Asset sensors monitoring the asset will not trigger

```python file=/concepts/assets/multi_asset_conditional_materialization.py
import random

from dagster import AssetOut, Output, asset, multi_asset
from dagster import AssetSpec, MaterializeResult, asset, multi_asset


@multi_asset(
outs={"asset1": AssetOut(is_required=False), "asset2": AssetOut(is_required=False)}
specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)]
)
def assets_1_and_2():
if random.randint(1, 10) < 5:
yield Output([1, 2, 3, 4], output_name="asset1")
yield MaterializeResult(asset_key="asset1")

if random.randint(1, 10) < 5:
yield Output([6, 7, 8, 9], output_name="asset2")
yield MaterializeResult(asset_key="asset2")


@asset
def downstream1(asset1):
# will not run when assets_1_and_2 doesn't materialize the asset1
return asset1 + [5]
@asset(deps=["asset1"])
def downstream1():
"""Will not run when assets_1_and_2 doesn't materialize asset1."""


@asset
def downstream2(asset2):
# will not run when assets_1_and_2 doesn't materialize the asset2
return asset2 + [10]
@asset(deps=["asset2"])
def downstream2():
"""Will not run when assets_1_and_2 doesn't materialize asset2."""
```

### Subsetting multi-assets

By default, it is assumed that the computation inside of a multi-asset will always produce the contents all of the associated assets. This means that attempting to execute a set of assets that produces some, but not all, of the assets defined by a given multi-asset will result in an error.

Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the `is_required` attribute of the outputs to `False`, and set the `can_subset` parameter of the decorator to `True`.
Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the `skippable` attribute of the asset specs to `True`, and set the `can_subset` parameter of the decorator to `True`.

Inside the body of the function, we can use `context.selected_output_names` or `context.selected_asset_keys` to find out which computations should be run.
Inside the body of the function, we can use `context.selected_asset_keys` to find out which assets should be materialized.

```python file=/concepts/assets/multi_assets.py startafter=start_subsettable_multi_asset endbefore=end_subsettable_multi_asset
from dagster import AssetOut, Output, multi_asset
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset


@multi_asset(
outs={
"a": AssetOut(is_required=False),
"b": AssetOut(is_required=False),
},
specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)],
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")
if "asset1" in context.op_execution_context.selected_asset_keys:
yield MaterializeResult(asset_key="asset1")
if "asset2" in context.op_execution_context.selected_asset_keys:
yield MaterializeResult(asset_key="asset2")
```

Because our outputs are now optional, we can no longer rely on returning a tuple of values, as we don't know in advance which values will be computed. Instead, we explicitly `yield` each output that we're expected to create.

### Dependencies inside multi-assets

When a multi-asset is created, it is assumed that each output asset depends on all of the input assets. This may not always be the case, however.

In these situations, you may optionally provide a mapping from each output asset to the set of <PyObject object="AssetKey" />s that it depends on. This information is used to display lineage information in the Dagster UI and for parsing selections over your asset graph.
Assets defined within multi-assets can have dependencies on upstream assets. These dependencies can be expressed using the `deps` attribute on <PyObject object="AssetSpec" />.

```python file=/concepts/assets/multi_assets.py startafter=start_asset_deps_multi_asset endbefore=end_asset_deps_multi_asset
from dagster import AssetKey, AssetOut, Output, multi_asset
from dagster import AssetKey, AssetSpec, asset, multi_asset


@multi_asset(
outs={"c": AssetOut(), "d": AssetOut()},
internal_asset_deps={
"c": {AssetKey("a")},
"d": {AssetKey("b")},
},
)
def my_complex_assets(a, b):
# c only depends on a
yield Output(value=a + 1, output_name="c")
# d only depends on b
yield Output(value=b + 1, output_name="d")
@asset
def a(): ...


@asset
def b(): ...


@multi_asset(specs=[AssetSpec("c", deps=["b"]), AssetSpec("d", deps=["a"])])
def my_complex_assets(): ...
```

### Multi-asset code versions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import random

from dagster import AssetOut, Output, asset, multi_asset
from dagster import AssetSpec, MaterializeResult, asset, multi_asset


@multi_asset(
outs={"asset1": AssetOut(is_required=False), "asset2": AssetOut(is_required=False)}
specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)]
)
def assets_1_and_2():
if random.randint(1, 10) < 5:
yield Output([1, 2, 3, 4], output_name="asset1")
yield MaterializeResult(asset_key="asset1")

if random.randint(1, 10) < 5:
yield Output([6, 7, 8, 9], output_name="asset2")
yield MaterializeResult(asset_key="asset2")


@asset
def downstream1(asset1):
# will not run when assets_1_and_2 doesn't materialize the asset1
return asset1 + [5]
@asset(deps=["asset1"])
def downstream1():
"""Will not run when assets_1_and_2 doesn't materialize asset1."""


@asset
def downstream2(asset2):
# will not run when assets_1_and_2 doesn't materialize the asset2
return asset2 + [10]
@asset(deps=["asset2"])
def downstream2():
"""Will not run when assets_1_and_2 doesn't materialize asset2."""
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@


# start_basic_multi_asset
from dagster import AssetOut, multi_asset, AssetExecutionContext
from dagster import AssetSpec, multi_asset


@multi_asset(
outs={
"my_string_asset": AssetOut(),
"my_int_asset": AssetOut(),
}
)
@multi_asset(specs=[AssetSpec("users"), AssetSpec("orders")])
def my_function():
return "abc", 123
# some code that writes out data to the users table and the orders table
...


# end_basic_multi_asset
Expand All @@ -34,41 +30,36 @@ def my_assets():
# end_io_manager_multi_asset

# start_subsettable_multi_asset
from dagster import AssetOut, Output, multi_asset
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset


@multi_asset(
outs={
"a": AssetOut(is_required=False),
"b": AssetOut(is_required=False),
},
specs=[AssetSpec("asset1", skippable=True), AssetSpec("asset2", skippable=True)],
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")
if "asset1" in context.op_execution_context.selected_asset_keys:
yield MaterializeResult(asset_key="asset1")
if "asset2" in context.op_execution_context.selected_asset_keys:
yield MaterializeResult(asset_key="asset2")


# end_subsettable_multi_asset

# start_asset_deps_multi_asset
from dagster import AssetKey, AssetOut, Output, multi_asset
from dagster import AssetKey, AssetSpec, asset, multi_asset


@multi_asset(
outs={"c": AssetOut(), "d": AssetOut()},
internal_asset_deps={
"c": {AssetKey("a")},
"d": {AssetKey("b")},
},
)
def my_complex_assets(a, b):
# c only depends on a
yield Output(value=a + 1, output_name="c")
# d only depends on b
yield Output(value=b + 1, output_name="d")
@asset
def a(): ...


@asset
def b(): ...


@multi_asset(specs=[AssetSpec("c", deps=["b"]), AssetSpec("d", deps=["a"])])
def my_complex_assets(): ...


# end_asset_deps_multi_asset

0 comments on commit d976b9f

Please sign in to comment.