Skip to content

Commit

Permalink
add AssetSpec and specs on @multi_asset (#15674)
Browse files Browse the repository at this point in the history
Introduces an `AssetSpec` class for declaring the assets that are
materialized instead of using `AssetOut` . This mirrors the newly added
setup of `AssetCheckSpec` and `check_specs` from
#15928 and
#15948.

This is motivated by providing cleaner API surface for users who do not
need to leverage Dagsters IO management, and therefor are better served
with APIs that are conceptually "output-free".

## How I Tested These Changes
added tests
  • Loading branch information
alangenfeld authored Aug 25, 2023
1 parent 737429e commit 41045f2
Show file tree
Hide file tree
Showing 4 changed files with 409 additions and 90 deletions.
110 changes: 110 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import AbstractSet, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.source_asset import SourceAsset

from .auto_materialize_policy import AutoMaterializePolicy
from .events import (
AssetKey,
CoercibleToAssetKey,
)
from .freshness_policy import FreshnessPolicy
from .metadata import MetadataUserInput


@experimental
class AssetSpec(
NamedTuple(
"_AssetSpec",
[
("asset_key", PublicAttr[AssetKey]),
("deps", PublicAttr[AbstractSet[AssetKey]]),
("description", PublicAttr[Optional[str]]),
("metadata", PublicAttr[Optional[Mapping[str, Any]]]),
("group_name", PublicAttr[Optional[str]]),
("skippable", PublicAttr[bool]),
("code_version", PublicAttr[Optional[str]]),
("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]),
("auto_materialize_policy", PublicAttr[Optional[AutoMaterializePolicy]]),
],
)
):
"""Specifies the core attributes of an asset. This object is attached to the decorated
function that defines how it materialized.
Attributes:
asset_key (AssetKey): The unique identifier for this asset.
deps (Optional[AbstractSet[AssetKey]]): The asset keys for the upstream assets that
materializing this asset depends on.
description (Optional[str]): Human-readable description of this asset.
metadata (Optional[Dict[str, Any]]): A dict of static metadata for this asset.
For example, users can provide information about the database table this
asset corresponds to.
skippable (bool): Whether this asset can be omitted during materialization, causing downstream
dependencies to skip.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code for this specific asset,
overriding the code version of the materialization function
freshness_policy (Optional[FreshnessPolicy]): A policy which indicates how up to date this
asset is intended to be.
auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to
the specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
"""

def __new__(
cls,
asset_key: CoercibleToAssetKey,
deps: Optional[
Iterable[
Union[
CoercibleToAssetKey,
"AssetSpec",
AssetsDefinition,
SourceAsset,
]
]
] = None,
description: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
skippable: bool = False,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
):
from .decorators.asset_decorator import (
asset_key_from_coercible_or_definition,
)

dep_set = set()
if deps:
for dep in deps:
if isinstance(dep, AssetSpec):
dep_set.add(dep.asset_key)
else:
dep_set.add(asset_key_from_coercible_or_definition(dep))

return super().__new__(
cls,
asset_key=AssetKey.from_coercible(asset_key),
deps=dep_set,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
skippable=check.bool_param(skippable, "skippable"),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy,
"freshness_policy",
FreshnessPolicy,
),
auto_materialize_policy=check.opt_inst_param(
auto_materialize_policy,
"auto_materialize_policy",
AutoMaterializePolicy,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from ..asset_check_spec import AssetCheckSpec
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..assets import AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..decorators.graph_decorator import graph
Expand Down Expand Up @@ -457,7 +458,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
)
def multi_asset(
*,
outs: Mapping[str, AssetOut],
outs: Optional[Mapping[str, AssetOut]] = None,
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
deps: Optional[Iterable[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]]] = None,
Expand All @@ -474,8 +475,10 @@ def multi_asset(
group_name: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same op and same
upstream assets.
Expand All @@ -489,7 +492,9 @@ def multi_asset(
Args:
name (Optional[str]): The name of the op.
outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.
outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the assets materialized by
this function. AssetOuts detail the output, IO management, and core asset properties.
This argument is required except when AssetSpecs are used.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, AssetKey, str]]]):
Expand Down Expand Up @@ -524,6 +529,8 @@ def multi_asset(
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
code_version (Optional[str]): (Experimental) Version of the code encapsulated by the multi-asset. If set,
this is used as a default code version for all defined assets.
specs (Optional[Sequence[AssetSpec]]): (Experimental) The specifications for the assets materialized
by this function.
check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that
execute in the decorated function after materializing the assets.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead. Set of asset keys that are upstream
Expand Down Expand Up @@ -560,6 +567,8 @@ def my_function():
"""
from dagster._core.execution.build_resources import wrap_resources_for_execution

specs = check.opt_list_param(specs, "specs", of_type=AssetSpec)

upstream_asset_deps = _type_check_deps_and_non_argument_deps(
deps=deps, non_argument_deps=non_argument_deps
)
Expand All @@ -584,10 +593,74 @@ def my_function():
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys

asset_out_map: Mapping[str, AssetOut] = {} if outs is None else outs

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
op_name = name or fn.__name__
asset_ins = build_asset_ins(fn, ins or {}, deps=_make_asset_keys(upstream_asset_deps))
output_tuples_by_asset_key = build_asset_outs(outs)

if asset_out_map and specs:
raise DagsterInvalidDefinitionError("Must specify only outs or assets but not both.")
elif specs:
output_tuples_by_asset_key = {}
for asset_spec in specs:
# output names are asset keys joined with _
output_name = "_".join(asset_spec.asset_key.path)
output_tuples_by_asset_key[asset_spec.asset_key] = (
output_name,
Out(
Nothing,
is_required=not (can_subset or asset_spec.skippable),
),
)
if upstream_asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs"
" directly."
)
if internal_asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)

upstream_keys = {
dep for spec in specs for dep in spec.deps if dep not in output_tuples_by_asset_key
}

explicit_ins = ins or {}
# get which asset keys have inputs set
loaded_upstreams = build_asset_ins(fn, explicit_ins, deps=set())
unexpected_upstreams = {
key for key in loaded_upstreams.keys() if key not in upstream_keys
}
if unexpected_upstreams:
raise DagsterInvalidDefinitionError(
f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed"
" AssetSpec(s). Set the deps on the appropriate AssetSpec(s)."
)
remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams}
asset_ins = build_asset_ins(fn, explicit_ins, deps=remaining_upstream_keys)
else:
asset_ins = build_asset_ins(fn, ins or {}, deps=_make_asset_keys(upstream_asset_deps))
output_tuples_by_asset_key = build_asset_outs(asset_out_map)
# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys()) | set(output_tuples_by_asset_key.keys())
for out_name, asset_keys in asset_deps.items():
if asset_out_map and out_name not in asset_out_map:
check.failed(
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument"
f" for multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(asset_out_map.keys())[:20]}.",
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
f"Invalid asset dependencies: {invalid_asset_deps} specified in"
f" `internal_asset_deps` argument for multi-asset '{op_name}' on key"
f" '{out_name}'. Each specified asset key must be associated with an input to"
" the asset or produced by this asset. Valid keys:"
f" {list(valid_asset_deps)[:20]}",
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
Expand All @@ -596,25 +669,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
" arguments to the decorated function",
)

# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys()) | set(output_tuples_by_asset_key.keys())
for out_name, asset_keys in asset_deps.items():
check.invariant(
out_name in outs,
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(outs.keys())[:20]}.",
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
f"Invalid asset dependencies: {invalid_asset_deps} specified in"
f" `internal_asset_deps` argument for multi-asset '{op_name}' on key"
f" '{out_name}'. Each specified asset key must be associated with an input to"
" the asset or produced by this asset. Valid keys:"
f" {list(valid_asset_deps)[:20]}",
)

asset_outs_by_output_name: Mapping[str, Out] = dict(output_tuples_by_asset_key.values())

check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs(check_specs)
Expand Down Expand Up @@ -658,51 +712,59 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
output_name: asset_key
for asset_key, (output_name, _) in output_tuples_by_asset_key.items()
}
partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}

# source group names from the AssetOuts (if any)
if specs:
internal_deps = {spec.asset_key: spec.deps for spec in specs if spec.deps is not None}
props_by_asset_key: Mapping[AssetKey, Union[AssetSpec, AssetOut]] = {
spec.asset_key: spec for spec in specs
}
else:
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}
props_by_asset_key = {
keys_by_output_name[output_name]: asset_out
for output_name, asset_out in asset_out_map.items()
}

# handle properties defined ons AssetSpecs or AssetOuts
group_names_by_key = {
keys_by_output_name[output_name]: out.group_name
for output_name, out in outs.items()
if out.group_name is not None
asset_key: props.group_name
for asset_key, props in props_by_asset_key.items()
if props.group_name is not None
}
if group_name:
check.invariant(
not group_names_by_key,
"Cannot set group_name parameter on multi_asset if one or more of the AssetOuts"
" supplied to this multi_asset have a group_name defined.",
"Cannot set group_name parameter on multi_asset if one or more of the"
" AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.",
)
group_names_by_key = {
asset_key: group_name for asset_key in keys_by_output_name.values()
}
group_names_by_key = {asset_key: group_name for asset_key in props_by_asset_key}

# source freshness policies from the AssetOuts (if any)
freshness_policies_by_key = {
keys_by_output_name[output_name]: out.freshness_policy
for output_name, out in outs.items()
if out.freshness_policy is not None
asset_key: props.freshness_policy
for asset_key, props in props_by_asset_key.items()
if props.freshness_policy is not None
}
auto_materialize_policies_by_key = {
keys_by_output_name[output_name]: out.auto_materialize_policy
for output_name, out in outs.items()
if out.auto_materialize_policy is not None
}

partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
asset_key: props.auto_materialize_policy
for asset_key, props in props_by_asset_key.items()
if props.auto_materialize_policy is not None
}
metadata_by_key = {
keys_by_output_name[output_name]: out.metadata
for output_name, out in outs.items()
if out.metadata is not None
asset_key: props.metadata
for asset_key, props in props_by_asset_key.items()
if props.metadata is not None
}

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=op,
asset_deps={keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
asset_deps=internal_deps,
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
can_subset=can_subset,
Expand Down Expand Up @@ -786,7 +848,11 @@ def build_asset_ins(

for asset_key in deps:
stringified_asset_key = "_".join(asset_key.path).replace("-", "_")
# mypy doesn't realize that Nothing is a valid type here
if asset_key in ins_by_asset_key:
raise DagsterInvalidDefinitionError(
f"deps value {asset_key} also declared as input/AssetIn"
)
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = (stringified_asset_key, In(cast(type, Nothing)))

return ins_by_asset_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,7 @@ def the_upstream_asset():

with pytest.raises(
DagsterInvalidDefinitionError,
# this is a known bad error experience. TODO - update the error to be helpful
match=(
"@op 'depends_on_upstream_asset' decorated function has parameter 'the_upstream_asset'"
" that is one of the input_defs of type 'Nothing' which should not be included since no"
" data will be passed for it."
),
match=r"deps value .* also declared as input/AssetIn",
):

@asset(deps=[the_upstream_asset])
Expand Down
Loading

0 comments on commit 41045f2

Please sign in to comment.