diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index d1b60b5842d05..5d55fe572013e 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -2,7 +2,6 @@ AbstractSet, Any, Callable, - Dict, Iterable, Mapping, Optional, @@ -16,8 +15,7 @@ import dagster._check as check from dagster._annotations import deprecated_param, experimental_param -from dagster._builtins import Nothing -from dagster._config import UserConfigSchema +from dagster._config.config_schema import UserConfigSchema from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.config import ConfigMapping @@ -40,12 +38,12 @@ from ..asset_in import AssetIn from ..asset_out import AssetOut from ..asset_spec import AssetSpec -from ..assets import ASSET_SUBSET_INPUT_PREFIX, AssetsDefinition, get_partition_mappings_from_deps +from ..assets import AssetsDefinition, get_partition_mappings_from_deps from ..backfill_policy import BackfillPolicy, BackfillPolicyType from ..decorators.graph_decorator import graph from ..decorators.op_decorator import _Op from ..events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix -from ..input import GraphIn, In +from ..input import GraphIn from ..output import GraphOut, Out from ..partition import PartitionsDefinition from ..policy import RetryPolicy @@ -56,7 +54,7 @@ NoValueSentinel, validate_tags_strict, ) -from .assets_definition_factory import build_asset_ins +from .assets_definition_factory import build_asset_ins, build_asset_outs @overload @@ -680,49 +678,15 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: fn=fn, ) else: - asset_ins = build_asset_ins( - fn, - ins or {}, - deps=( - {dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set() - ), - ) - output_tuples_by_asset_key = build_asset_outs(asset_out_map) - - # validate that the asset_ins are a subset of the upstream asset_deps. - upstream_internal_asset_keys = set().union(*asset_deps.values()) - asset_in_keys = set(asset_ins.keys()) - if asset_deps and not asset_in_keys.issubset(upstream_internal_asset_keys): - invalid_asset_in_keys = asset_in_keys - upstream_internal_asset_keys - check.failed( - f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as asset" - " inputs, but are not specified in `internal_asset_deps`. Asset inputs must" - " be associated with an output produced by the asset." - ) - - # validate that the asset_deps make sense - valid_asset_deps = asset_in_keys | set(output_tuples_by_asset_key.keys()) - for out_name, asset_keys in asset_deps.items(): - if asset_out_map and out_name not in asset_out_map: - check.failed( - f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument" - f" for multi-asset {op_name}. Must be one of the outs for this multi-asset" - f" {list(asset_out_map.keys())[:20]}.", - ) - invalid_asset_deps = asset_keys.difference(valid_asset_deps) - check.invariant( - not invalid_asset_deps, - f"Invalid asset dependencies: {invalid_asset_deps} specified in" - f" `internal_asset_deps` argument for multi-asset '{op_name}' on key" - f" '{out_name}'. Each specified asset key must be associated with an input to" - " the asset or produced by this asset. Valid keys:" - f" {list(valid_asset_deps)[:20]}", - ) - - in_out_mapper = InOutMapper.from_asset_ins_and_asset_outs( - asset_ins=asset_ins, - asset_outs=output_tuples_by_asset_key, + in_out_mapper = InOutMapper.non_spec_based_create( + asset_out_map=asset_out_map, + ins=ins or {}, + fn=fn, check_specs=check_specs or [], + op_name=op_name, + asset_deps=asset_deps, + upstream_asset_deps=upstream_asset_deps or [], + can_subset=can_subset, ) arg_resource_keys = {arg.name for arg in get_resource_args(fn)} @@ -737,35 +701,13 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}", ) - if specs: - internal_deps = { - spec.key: {dep.asset_key for dep in spec.deps} - for spec in specs - if spec.deps is not None - } - else: - internal_deps = { - in_out_mapper.keys_by_output_name[name]: asset_deps[name] for name in asset_deps - } - - # when a subsettable multi-asset is defined, it is possible that it will need to be - # broken into two separate parts, one which depends on the other. in order to represent - # this in the op execution graph, inputs need to be added to connect these possible deps - if can_subset and internal_deps: - asset_ins = { - **asset_ins, - **build_subsettable_asset_ins( - asset_ins, output_tuples_by_asset_key, internal_deps.values() - ), - } - with disable_dagster_warnings(): op_required_resource_keys = required_resource_keys - arg_resource_keys op = _Op( name=op_name, description=description, - ins=dict(asset_ins.values()), + ins=in_out_mapper.asset_ins_by_input_names, out=in_out_mapper.combined_outs_by_output_name, required_resource_keys=op_required_resource_keys, tags={ @@ -777,11 +719,8 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: code_version=code_version, )(fn) - keys_by_input_name = { - input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() - } partition_mappings = { - keys_by_input_name[input_name]: asset_in.partition_mapping + in_out_mapper.keys_by_input_name[input_name]: asset_in.partition_mapping for input_name, asset_in in (ins or {}).items() if asset_in.partition_mapping is not None } @@ -797,7 +736,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: resolved_specs = [] input_deps_by_key = { key: AssetDep(asset=key, partition_mapping=partition_mappings.get(key)) - for key in keys_by_input_name.values() + for key in in_out_mapper.keys_by_input_name.values() } input_deps = list(input_deps_by_key.values()) for output_name, asset_out in asset_out_map.items(): @@ -824,7 +763,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: resolved_specs = [spec._replace(group_name=group_name) for spec in resolved_specs] return AssetsDefinition.dagster_internal_init( - keys_by_input_name=keys_by_input_name, + keys_by_input_name=in_out_mapper.keys_by_input_name, keys_by_output_name=in_out_mapper.keys_by_output_name, node_def=op, partitions_def=partitions_def, @@ -841,28 +780,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: return inner -def build_subsettable_asset_ins( - asset_ins: Mapping[AssetKey, Tuple[str, In]], - asset_outs: Mapping[AssetKey, Tuple[str, Out]], - internal_upstream_deps: Iterable[AbstractSet[AssetKey]], -) -> Mapping[AssetKey, Tuple[str, In]]: - """Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not - currently an input, but may become one if this asset is subset. - - For example, if a subsettable multi-asset produces both A and C, where C depends on both A and - some other asset B, there are some situations where executing just A and C without B will result - in these assets being generated by different steps within the same job. In this case, we need - a separate input to represent the fact that C depends on A. - """ - # set of asset keys which are upstream of another asset, and are not currently inputs - potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys())) - return { - key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing)) - for key, (name, _) in asset_outs.items() - if key in potential_deps - } - - @overload def graph_asset( compose_fn: Callable[..., Any], @@ -1234,20 +1151,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: return inner -def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]: - """Creates a mapping from AssetKey to (name of output, Out object).""" - outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {} - for output_name, asset_out in asset_outs.items(): - out = asset_out.to_out() - asset_key = asset_out.key or AssetKey( - list(filter(None, [*(asset_out.key_prefix or []), output_name])) - ) - - outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out) - - return outs_by_asset_key - - def _deps_and_non_argument_deps_to_asset_deps( deps: Optional[Iterable[CoercibleToAssetDep]], non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]], diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py index 165a740b8ac07..ee284f1d9d11d 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py @@ -6,6 +6,7 @@ Any, Callable, Dict, + Iterable, List, Mapping, NamedTuple, @@ -18,9 +19,12 @@ import dagster._check as check from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations +from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_in import AssetIn from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.asset_out import AssetOut from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.assets import ASSET_SUBSET_INPUT_PREFIX from dagster._core.definitions.input import In from dagster._core.definitions.output import Out from dagster._core.definitions.resource_annotation import ( @@ -123,6 +127,42 @@ def build_asset_ins( return ins_by_asset_key +def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]: + """Creates a mapping from AssetKey to (name of output, Out object).""" + outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {} + for output_name, asset_out in asset_outs.items(): + out = asset_out.to_out() + asset_key = asset_out.key or AssetKey( + list(filter(None, [*(asset_out.key_prefix or []), output_name])) + ) + + outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out) + + return outs_by_asset_key + + +def build_subsettable_asset_ins( + asset_ins: Mapping[AssetKey, Tuple[str, In]], + asset_outs: Mapping[AssetKey, Tuple[str, Out]], + internal_upstream_deps: Iterable[AbstractSet[AssetKey]], +) -> Mapping[AssetKey, Tuple[str, In]]: + """Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not + currently an input, but may become one if this asset is subset. + + For example, if a subsettable multi-asset produces both A and C, where C depends on both A and + some other asset B, there are some situations where executing just A and C without B will result + in these assets being generated by different steps within the same job. In this case, we need + a separate input to represent the fact that C depends on A. + """ + # set of asset keys which are upstream of another asset, and are not currently inputs + potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys())) + return { + key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing)) + for key, (name, _) in asset_outs.items() + if key in potential_deps + } + + class InMapping(NamedTuple): input_name: str input: In @@ -133,16 +173,27 @@ class OutMapping(NamedTuple): output: Out +def make_keys_by_output_name( + asset_outs: Mapping[AssetKey, Tuple[str, Out]], +) -> Mapping[str, AssetKey]: + return {output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()} + + class InOutMapper: def __init__( self, - in_mappings: Mapping[AssetKey, InMapping], - out_mappings: Mapping[AssetKey, OutMapping], + *, + asset_ins: Mapping[AssetKey, Tuple[str, In]], + asset_outs: Mapping[AssetKey, Tuple[str, Out]], check_specs: Sequence[AssetCheckSpec], + internal_deps: Mapping[AssetKey, Set[AssetKey]], + can_subset: bool, ) -> None: - self.in_mappings = in_mappings - self.out_mappings = out_mappings + self._passed_asset_ins = asset_ins + self.asset_outs = asset_outs self.check_specs = check_specs + self.internal_deps = internal_deps + self.can_subset = can_subset @staticmethod def from_specs( @@ -190,30 +241,121 @@ def from_specs( ) remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams} asset_ins = build_asset_ins(fn, explicit_ins, deps=remaining_upstream_keys) - return InOutMapper.from_asset_ins_and_asset_outs( - asset_ins, output_tuples_by_asset_key, check_specs + + internal_deps = { + spec.key: {dep.asset_key for dep in spec.deps} + for spec in specs + if spec.deps is not None + } + + return InOutMapper( + asset_ins=asset_ins, + asset_outs=output_tuples_by_asset_key, + check_specs=check_specs, + internal_deps=internal_deps, + can_subset=can_subset, ) @staticmethod - def from_asset_ins_and_asset_outs( - asset_ins: Mapping[AssetKey, Tuple[str, In]], - asset_outs: Mapping[AssetKey, Tuple[str, Out]], + def non_spec_based_create( + *, + asset_out_map: Mapping[str, AssetOut], + asset_deps: Mapping[str, Set[AssetKey]], + upstream_asset_deps: Iterable[AssetDep], + ins: Mapping[str, AssetIn], + op_name: str, + fn: Callable[..., Any], check_specs: Sequence[AssetCheckSpec], + can_subset: bool, ): - in_mappings = { + asset_ins = build_asset_ins( + fn, + ins or {}, + deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()), + ) + output_tuples_by_asset_key = build_asset_outs(asset_out_map) + + # validate that the asset_ins are a subset of the upstream asset_deps. + upstream_internal_asset_keys = set().union(*asset_deps.values()) + asset_in_keys = set(asset_ins.keys()) + if asset_deps and not asset_in_keys.issubset(upstream_internal_asset_keys): + invalid_asset_in_keys = asset_in_keys - upstream_internal_asset_keys + check.failed( + f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as asset" + " inputs, but are not specified in `internal_asset_deps`. Asset inputs must" + " be associated with an output produced by the asset." + ) + + # validate that the asset_deps make sense + valid_asset_deps = asset_in_keys | set(output_tuples_by_asset_key.keys()) + for out_name, asset_keys in asset_deps.items(): + if asset_out_map and out_name not in asset_out_map: + check.failed( + f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument" + f" for multi-asset {op_name}. Must be one of the outs for this multi-asset" + f" {list(asset_out_map.keys())[:20]}.", + ) + invalid_asset_deps = asset_keys.difference(valid_asset_deps) + check.invariant( + not invalid_asset_deps, + f"Invalid asset dependencies: {invalid_asset_deps} specified in" + f" `internal_asset_deps` argument for multi-asset '{op_name}' on key" + f" '{out_name}'. Each specified asset key must be associated with an input to" + " the asset or produced by this asset. Valid keys:" + f" {list(valid_asset_deps)[:20]}", + ) + + keys_by_output_name = make_keys_by_output_name(output_tuples_by_asset_key) + internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps} + + return InOutMapper( + asset_ins=asset_ins, + asset_outs=output_tuples_by_asset_key, + check_specs=check_specs or [], + internal_deps=internal_deps, + can_subset=can_subset, + ) + + @cached_property + def asset_ins(self) -> Mapping[AssetKey, Tuple[str, In]]: + if self.can_subset and self.internal_deps: + return { + **self._passed_asset_ins, + **build_subsettable_asset_ins( + self._passed_asset_ins, self.asset_outs, self.internal_deps.values() + ), + } + else: + return self._passed_asset_ins + + @cached_property + def in_mappings(self) -> Mapping[AssetKey, InMapping]: + return { asset_key: InMapping(input_name, in_) - for asset_key, (input_name, in_) in asset_ins.items() + for asset_key, (input_name, in_) in self.asset_ins.items() } - out_mappings = { + + @cached_property + def out_mappings(self) -> Mapping[AssetKey, OutMapping]: + return { asset_key: OutMapping(output_name, out_) - for asset_key, (output_name, out_) in asset_outs.items() + for asset_key, (output_name, out_) in self.asset_outs.items() } - return InOutMapper(in_mappings, out_mappings, check_specs) @cached_property def asset_outs_by_output_name(self) -> Mapping[str, Out]: return dict(self.out_mappings.values()) + @cached_property + def asset_ins_by_input_names(self) -> Mapping[str, In]: + return dict(self.in_mappings.values()) + + @cached_property + def keys_by_input_name(self) -> Mapping[str, AssetKey]: + return { + in_mapping.input_name: asset_key for asset_key, in_mapping in self.in_mappings.items() + } + @cached_property def keys_by_output_name(self) -> Mapping[str, AssetKey]: return {