Skip to content

Commit

Permalink
[exploration] output-free multi_asset
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Aug 21, 2023
1 parent ed7d3ce commit d99d0c5
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 50 deletions.
104 changes: 104 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import AbstractSet, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.backfill_policy import BackfillPolicy

from .auto_materialize_policy import AutoMaterializePolicy
from .events import (
AssetKey,
CoercibleToAssetKey,
)
from .freshness_policy import FreshnessPolicy
from .metadata import MetadataUserInput


class AssetSpec(
NamedTuple(
"_AssetSpec",
[
("asset_key", PublicAttr[AssetKey]),
("deps", PublicAttr[AbstractSet[AssetKey]]),
("description", PublicAttr[Optional[str]]),
("metadata", PublicAttr[Optional[Mapping[str, Any]]]),
("group_name", PublicAttr[Optional[str]]),
("code_version", PublicAttr[Optional[str]]),
("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]),
("auto_materialize_policy", PublicAttr[Optional[AutoMaterializePolicy]]),
("backfill_policy", PublicAttr[Optional[BackfillPolicy]]),
],
)
):
"""This object specifies the attributes of an asset. This object is attached to the decorated
function that defines how it materialized.
Attributes:
asset_key (AssetKey): The unique identifier for this asset.
deps (Optional[AbstractSet[AssetKey]]): The asset keys for the upstream assets that
materializing this asset depends on.
description (Optional[str]): Human-readable description of this asset.
metadata (Optional[Dict[str, Any]]): A dict of static metadata for this asset.
For example, users can provide information about the database table this
asset corresponds to.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code that generates this asset.
freshness_policy (Optional[FreshnessPolicy]): A policy which indicates how up to date this
asset is intended to be.
auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to
the specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
"""

def __new__(
cls,
asset_key: CoercibleToAssetKey,
deps: Optional[
Iterable[
Union[
CoercibleToAssetKey,
"AssetSpec",
# AssetsDefinition, if only one-key trick
]
]
] = None,
description: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
):
dep_set = set()
if deps:
for dep in deps:
if isinstance(dep, AssetSpec):
dep_set.add(dep.asset_key)
else:
dep_set.add(AssetKey.from_coercible(asset_key))

return super().__new__(
cls,
asset_key=AssetKey.from_coercible(asset_key),
deps=dep_set,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy,
"freshness_policy",
FreshnessPolicy,
),
auto_materialize_policy=check.opt_inst_param(
auto_materialize_policy,
"auto_materialize_policy",
AutoMaterializePolicy,
),
backfill_policy=check.opt_inst_param(
backfill_policy,
"backfill_policy",
BackfillPolicy,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)

from ..asset_in import AssetIn
from ..asset_node import AssetSpec
from ..asset_out import AssetOut
from ..assets import AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
Expand Down Expand Up @@ -438,7 +439,8 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
)
def multi_asset(
*,
outs: Mapping[str, AssetOut],
outs: Optional[Mapping[str, AssetOut]] = None,
specs: Optional[Sequence[AssetSpec]] = None,
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
deps: Optional[Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]]] = None,
Expand Down Expand Up @@ -562,10 +564,28 @@ def my_function():
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys

asset_out_map: Mapping[str, AssetOut] = {} if outs is None else outs

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
op_name = name or fn.__name__
asset_ins = build_asset_ins(fn, ins or {}, deps=_make_asset_keys(upstream_asset_deps))
asset_outs = build_asset_outs(outs)

if asset_out_map and specs:
raise DagsterInvalidDefinitionError("Must specify only outs or assets but not both.")
elif specs:
outs_by_asset_key = {}
for asset_node in specs:
# output names are asset keys joined with _
output_name = "_".join(asset_node.asset_key.path)
outs_by_asset_key[asset_node.asset_key] = (
output_name,
Out(
Nothing,
is_required=not can_subset,
),
)
else:
outs_by_asset_key = build_asset_outs(asset_out_map)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
Expand All @@ -575,14 +595,14 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
)

# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys()) | set(asset_outs.keys())
valid_asset_deps = set(asset_ins.keys()) | set(outs_by_asset_key.keys())
for out_name, asset_keys in asset_deps.items():
check.invariant(
out_name in outs,
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(outs.keys())[:20]}.",
)
if asset_out_map and out_name not in asset_out_map:
check.failed(
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(asset_out_map.keys())[:20]}.",
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
Expand All @@ -599,7 +619,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
name=op_name,
description=description,
ins=dict(asset_ins.values()),
out=dict(asset_outs.values()),
out=dict(outs_by_asset_key.values()),
required_resource_keys=op_required_resource_keys,
tags={
**({"kind": compute_kind} if compute_kind else {}),
Expand All @@ -614,53 +634,79 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
keys_by_output_name = {
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
output_name: asset_key for asset_key, (output_name, _) in outs_by_asset_key.items()
}

# source group names from the AssetOuts (if any)
group_names_by_key = {
keys_by_output_name[output_name]: out.group_name
for output_name, out in outs.items()
if out.group_name is not None
}
if group_name:
check.invariant(
not group_names_by_key,
"Cannot set group_name parameter on multi_asset if one or more of the AssetOuts"
" supplied to this multi_asset have a group_name defined.",
)
# build by key structures via outputs
if specs:
partition_mappings = {
# node.asset_key: node.partition_mapping for node in assets
}
group_names_by_key = {
asset_key: group_name for asset_key in keys_by_output_name.values()
node.asset_key: node.group_name for node in specs if node.group_name is not None
}
# if group_name: # handle this case

# source freshness policies from the AssetOuts (if any)
freshness_policies_by_key = {
keys_by_output_name[output_name]: out.freshness_policy
for output_name, out in outs.items()
if out.freshness_policy is not None
}
auto_materialize_policies_by_key = {
keys_by_output_name[output_name]: out.auto_materialize_policy
for output_name, out in outs.items()
if out.auto_materialize_policy is not None
}
freshness_policies_by_key = {
node.asset_key: node.freshness_policy
for node in specs
if node.freshness_policy is not None
}
auto_materialize_policies_by_key = {
node.asset_key: node.auto_materialize_policy
for node in specs
if node.auto_materialize_policy is not None
}
metadata_by_key = {
node.asset_key: node.metadata for node in specs if node.metadata is not None
}
internal_deps = {node.asset_key: node.deps for node in specs if node.deps is not None}
else:
# source group names from the AssetOuts (if any)
group_names_by_key = {
keys_by_output_name[output_name]: out.group_name
for output_name, out in asset_out_map.items()
if out.group_name is not None
}
if group_name:
check.invariant(
not group_names_by_key,
"Cannot set group_name parameter on multi_asset if one or more of the AssetOuts"
" supplied to this multi_asset have a group_name defined.",
)
group_names_by_key = {
asset_key: group_name for asset_key in keys_by_output_name.values()
}

partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}
metadata_by_key = {
keys_by_output_name[output_name]: out.metadata
for output_name, out in outs.items()
if out.metadata is not None
}
# source freshness policies from the AssetOuts (if any)
freshness_policies_by_key = {
keys_by_output_name[output_name]: out.freshness_policy
for output_name, out in asset_out_map.items()
if out.freshness_policy is not None
}
auto_materialize_policies_by_key = {
keys_by_output_name[output_name]: out.auto_materialize_policy
for output_name, out in asset_out_map.items()
if out.auto_materialize_policy is not None
}

partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}
metadata_by_key = {
keys_by_output_name[output_name]: out.metadata
for output_name, out in asset_out_map.items()
if out.metadata is not None
}
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=op,
asset_deps={keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
asset_deps=internal_deps,
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
can_subset=can_subset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Output,
)
from .output import DynamicOutputDefinition
from .result import MaterializeResult

if TYPE_CHECKING:
from ..execution.context.invocation import BoundOpExecutionContext
Expand Down Expand Up @@ -390,7 +391,7 @@ def type_check_gen(gen):
for event in gen:
if isinstance(
event,
(AssetMaterialization, AssetObservation, ExpectationResult),
(AssetMaterialization, AssetObservation, ExpectationResult, MaterializeResult),
):
yield event
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
AssetKey,
AssetMaterialization,
AssetObservation,
CoercibleToAssetKey,
ExpectationResult,
UserEvent,
)
Expand Down
Loading

0 comments on commit d99d0c5

Please sign in to comment.