Skip to content

Commit

Permalink
InOutMapper.non_spec_based_create into its own codepath
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 2, 2024
1 parent 95255f9 commit c6c023f
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
AbstractSet,
Any,
Callable,
Dict,
Iterable,
Mapping,
Optional,
Expand All @@ -16,8 +15,7 @@

import dagster._check as check
from dagster._annotations import deprecated_param, experimental_param
from dagster._builtins import Nothing
from dagster._config import UserConfigSchema
from dagster._config.config_schema import UserConfigSchema
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.config import ConfigMapping
Expand All @@ -40,12 +38,12 @@
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..assets import ASSET_SUBSET_INPUT_PREFIX, AssetsDefinition, get_partition_mappings_from_deps
from ..assets import AssetsDefinition, get_partition_mappings_from_deps
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..decorators.graph_decorator import graph
from ..decorators.op_decorator import _Op
from ..events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from ..input import GraphIn, In
from ..input import GraphIn
from ..output import GraphOut, Out
from ..partition import PartitionsDefinition
from ..policy import RetryPolicy
Expand All @@ -56,7 +54,7 @@
NoValueSentinel,
validate_tags_strict,
)
from .assets_definition_factory import build_asset_ins
from .assets_definition_factory import build_asset_ins, build_asset_outs


@overload
Expand Down Expand Up @@ -680,49 +678,15 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
fn=fn,
)
else:
asset_ins = build_asset_ins(
fn,
ins or {},
deps=(
{dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()
),
)
output_tuples_by_asset_key = build_asset_outs(asset_out_map)

# validate that the asset_ins are a subset of the upstream asset_deps.
upstream_internal_asset_keys = set().union(*asset_deps.values())
asset_in_keys = set(asset_ins.keys())
if asset_deps and not asset_in_keys.issubset(upstream_internal_asset_keys):
invalid_asset_in_keys = asset_in_keys - upstream_internal_asset_keys
check.failed(
f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as asset"
" inputs, but are not specified in `internal_asset_deps`. Asset inputs must"
" be associated with an output produced by the asset."
)

# validate that the asset_deps make sense
valid_asset_deps = asset_in_keys | set(output_tuples_by_asset_key.keys())
for out_name, asset_keys in asset_deps.items():
if asset_out_map and out_name not in asset_out_map:
check.failed(
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument"
f" for multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(asset_out_map.keys())[:20]}.",
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
f"Invalid asset dependencies: {invalid_asset_deps} specified in"
f" `internal_asset_deps` argument for multi-asset '{op_name}' on key"
f" '{out_name}'. Each specified asset key must be associated with an input to"
" the asset or produced by this asset. Valid keys:"
f" {list(valid_asset_deps)[:20]}",
)

in_out_mapper = InOutMapper.from_asset_ins_and_asset_outs(
asset_ins=asset_ins,
asset_outs=output_tuples_by_asset_key,
in_out_mapper = InOutMapper.non_spec_based_create(
asset_out_map=asset_out_map,
ins=ins or {},
fn=fn,
check_specs=check_specs or [],
op_name=op_name,
asset_deps=asset_deps,
upstream_asset_deps=upstream_asset_deps or [],
can_subset=can_subset,
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
Expand All @@ -737,35 +701,13 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}",
)

if specs:
internal_deps = {
spec.key: {dep.asset_key for dep in spec.deps}
for spec in specs
if spec.deps is not None
}
else:
internal_deps = {
in_out_mapper.keys_by_output_name[name]: asset_deps[name] for name in asset_deps
}

# when a subsettable multi-asset is defined, it is possible that it will need to be
# broken into two separate parts, one which depends on the other. in order to represent
# this in the op execution graph, inputs need to be added to connect these possible deps
if can_subset and internal_deps:
asset_ins = {
**asset_ins,
**build_subsettable_asset_ins(
asset_ins, output_tuples_by_asset_key, internal_deps.values()
),
}

with disable_dagster_warnings():
op_required_resource_keys = required_resource_keys - arg_resource_keys

op = _Op(
name=op_name,
description=description,
ins=dict(asset_ins.values()),
ins=in_out_mapper.asset_ins_by_input_names,
out=in_out_mapper.combined_outs_by_output_name,
required_resource_keys=op_required_resource_keys,
tags={
Expand All @@ -777,11 +719,8 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
code_version=code_version,
)(fn)

keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
in_out_mapper.keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}
Expand All @@ -797,7 +736,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
resolved_specs = []
input_deps_by_key = {
key: AssetDep(asset=key, partition_mapping=partition_mappings.get(key))
for key in keys_by_input_name.values()
for key in in_out_mapper.keys_by_input_name.values()
}
input_deps = list(input_deps_by_key.values())
for output_name, asset_out in asset_out_map.items():
Expand All @@ -824,7 +763,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
resolved_specs = [spec._replace(group_name=group_name) for spec in resolved_specs]

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_input_name=in_out_mapper.keys_by_input_name,
keys_by_output_name=in_out_mapper.keys_by_output_name,
node_def=op,
partitions_def=partitions_def,
Expand All @@ -841,28 +780,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return inner


def build_subsettable_asset_ins(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
internal_upstream_deps: Iterable[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
"""Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not
currently an input, but may become one if this asset is subset.
For example, if a subsettable multi-asset produces both A and C, where C depends on both A and
some other asset B, there are some situations where executing just A and C without B will result
in these assets being generated by different steps within the same job. In this case, we need
a separate input to represent the fact that C depends on A.
"""
# set of asset keys which are upstream of another asset, and are not currently inputs
potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys()))
return {
key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing))
for key, (name, _) in asset_outs.items()
if key in potential_deps
}


@overload
def graph_asset(
compose_fn: Callable[..., Any],
Expand Down Expand Up @@ -1234,20 +1151,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return inner


def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]:
"""Creates a mapping from AssetKey to (name of output, Out object)."""
outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {}
for output_name, asset_out in asset_outs.items():
out = asset_out.to_out()
asset_key = asset_out.key or AssetKey(
list(filter(None, [*(asset_out.key_prefix or []), output_name]))
)

outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out)

return outs_by_asset_key


def _deps_and_non_argument_deps_to_asset_deps(
deps: Optional[Iterable[CoercibleToAssetDep]],
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]],
Expand Down
Loading

0 comments on commit c6c023f

Please sign in to comment.