Skip to content

Commit

Permalink
[exploration] output-free multi_asset
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Aug 23, 2023
1 parent 36d9651 commit 1893f3e
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 68 deletions.
117 changes: 117 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import AbstractSet, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.source_asset import SourceAsset

from .auto_materialize_policy import AutoMaterializePolicy
from .events import (
AssetKey,
CoercibleToAssetKey,
)
from .freshness_policy import FreshnessPolicy
from .metadata import MetadataUserInput


class AssetSpec(
NamedTuple(
"_AssetSpec",
[
("asset_key", PublicAttr[AssetKey]),
("deps", PublicAttr[AbstractSet[AssetKey]]),
("description", PublicAttr[Optional[str]]),
("metadata", PublicAttr[Optional[Mapping[str, Any]]]),
("group_name", PublicAttr[Optional[str]]),
("skippable", PublicAttr[bool]),
("code_version", PublicAttr[Optional[str]]),
("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]),
("auto_materialize_policy", PublicAttr[Optional[AutoMaterializePolicy]]),
("backfill_policy", PublicAttr[Optional[BackfillPolicy]]),
],
)
):
"""Specifies the core attributes of an asset. This object is attached to the decorated
function that defines how it materialized.
Attributes:
asset_key (AssetKey): The unique identifier for this asset.
deps (Optional[AbstractSet[AssetKey]]): The asset keys for the upstream assets that
materializing this asset depends on.
description (Optional[str]): Human-readable description of this asset.
metadata (Optional[Dict[str, Any]]): A dict of static metadata for this asset.
For example, users can provide information about the database table this
asset corresponds to.
skippable (bool): Whether this asset can be omitted during materialization, causing downstream
dependencies to skip.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code for this specific asset,
overriding the code version of the materialization function
freshness_policy (Optional[FreshnessPolicy]): A policy which indicates how up to date this
asset is intended to be.
auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to
the specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
"""

def __new__(
cls,
asset_key: CoercibleToAssetKey,
deps: Optional[
Iterable[
Union[
CoercibleToAssetKey,
"AssetSpec",
AssetsDefinition,
SourceAsset,
]
]
] = None,
description: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
skippable: bool = False,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
):
from .decorators.asset_decorator import (
asset_key_from_coercible_or_definition,
)

dep_set = set()
if deps:
for dep in deps:
if isinstance(dep, AssetSpec):
dep_set.add(dep.asset_key)
else:
dep_set.add(asset_key_from_coercible_or_definition(dep))

return super().__new__(
cls,
asset_key=AssetKey.from_coercible(asset_key),
deps=dep_set,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
skippable=check.bool_param(skippable, "skippable"),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy,
"freshness_policy",
FreshnessPolicy,
),
auto_materialize_policy=check.opt_inst_param(
auto_materialize_policy,
"auto_materialize_policy",
AutoMaterializePolicy,
),
backfill_policy=check.opt_inst_param(
backfill_policy,
"backfill_policy",
BackfillPolicy,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from ..asset_check_spec import AssetCheckSpec
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..assets import AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..decorators.graph_decorator import graph
Expand Down Expand Up @@ -457,7 +458,8 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
)
def multi_asset(
*,
outs: Mapping[str, AssetOut],
specs: Optional[Sequence[AssetSpec]] = None,
outs: Optional[Mapping[str, AssetOut]] = None,
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
deps: Optional[Iterable[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]]] = None,
Expand Down Expand Up @@ -489,7 +491,12 @@ def multi_asset(
Args:
name (Optional[str]): The name of the op.
outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.
specs (Optional[Sequence[AssetSpec]]): The specifications for the assets always materialized by this function.
conditional_specs (Optional[Sequence[AssetSpec]]): The specifications for the assets that are conditionally
materialized by this function.
outs (Optional[Dict[str, AssetOut]]): Used instead of AssetSpecs when the materialized asset values are
output by this function and persisted by IO managers. AssetOuts represent output, IO management,
and core asset properties.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, AssetKey, str]]]):
Expand Down Expand Up @@ -560,6 +567,8 @@ def my_function():
"""
from dagster._core.execution.build_resources import wrap_resources_for_execution

specs = check.opt_list_param(specs, "specs", of_type=AssetSpec)

upstream_asset_deps = _type_check_deps_and_non_argument_deps(
deps=deps, non_argument_deps=non_argument_deps
)
Expand All @@ -584,10 +593,28 @@ def my_function():
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys

asset_out_map: Mapping[str, AssetOut] = {} if outs is None else outs

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
op_name = name or fn.__name__
asset_ins = build_asset_ins(fn, ins or {}, deps=_make_asset_keys(upstream_asset_deps))
output_tuples_by_asset_key = build_asset_outs(outs)

if asset_out_map and specs:
raise DagsterInvalidDefinitionError("Must specify only outs or assets but not both.")
elif specs:
output_tuples_by_asset_key = {}
for asset_spec in specs:
# output names are asset keys joined with _
output_name = "_".join(asset_spec.asset_key.path)
output_tuples_by_asset_key[asset_spec.asset_key] = (
output_name,
Out(
Nothing,
is_required=not (can_subset or asset_spec.skippable),
),
)
else:
output_tuples_by_asset_key = build_asset_outs(asset_out_map)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
Expand All @@ -598,13 +625,14 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:

# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys()) | set(output_tuples_by_asset_key.keys())

for out_name, asset_keys in asset_deps.items():
check.invariant(
out_name in outs,
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(outs.keys())[:20]}.",
)
if asset_out_map and out_name not in asset_out_map:
check.failed(
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(asset_out_map.keys())[:20]}.",
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
Expand Down Expand Up @@ -658,51 +686,59 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
output_name: asset_key
for asset_key, (output_name, _) in output_tuples_by_asset_key.items()
}
partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}

# source group names from the AssetOuts (if any)
if specs:
internal_deps = {spec.asset_key: spec.deps for spec in specs if spec.deps is not None}
props_by_asset_key: Mapping[AssetKey, Union[AssetSpec, AssetOut]] = {
spec.asset_key: spec for spec in specs
}
else:
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}
props_by_asset_key = {
keys_by_output_name[output_name]: asset_out
for output_name, asset_out in asset_out_map.items()
}

# handle properties defined ons AssetSpecs or AssetOuts
group_names_by_key = {
keys_by_output_name[output_name]: out.group_name
for output_name, out in outs.items()
if out.group_name is not None
asset_key: props.group_name
for asset_key, props in props_by_asset_key.items()
if props.group_name is not None
}
if group_name:
check.invariant(
not group_names_by_key,
"Cannot set group_name parameter on multi_asset if one or more of the AssetOuts"
" supplied to this multi_asset have a group_name defined.",
"Cannot set group_name parameter on multi_asset if one or more of the"
" AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.",
)
group_names_by_key = {
asset_key: group_name for asset_key in keys_by_output_name.values()
}
group_names_by_key = {asset_key: group_name for asset_key in props_by_asset_key}

# source freshness policies from the AssetOuts (if any)
freshness_policies_by_key = {
keys_by_output_name[output_name]: out.freshness_policy
for output_name, out in outs.items()
if out.freshness_policy is not None
asset_key: props.freshness_policy
for asset_key, props in props_by_asset_key.items()
if props.freshness_policy is not None
}
auto_materialize_policies_by_key = {
keys_by_output_name[output_name]: out.auto_materialize_policy
for output_name, out in outs.items()
if out.auto_materialize_policy is not None
}

partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
asset_key: props.auto_materialize_policy
for asset_key, props in props_by_asset_key.items()
if props.auto_materialize_policy is not None
}
metadata_by_key = {
keys_by_output_name[output_name]: out.metadata
for output_name, out in outs.items()
if out.metadata is not None
asset_key: props.metadata
for asset_key, props in props_by_asset_key.items()
if props.metadata is not None
}

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=op,
asset_deps={keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
asset_deps=internal_deps,
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
can_subset=can_subset,
Expand Down
Loading

0 comments on commit 1893f3e

Please sign in to comment.