diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py index 836db9d382901..58d2803a505f6 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py @@ -74,7 +74,7 @@ def build_asset_ins( fn: Callable[..., Any], asset_ins: Mapping[str, AssetIn], deps: Optional[AbstractSet[AssetKey]], -) -> Mapping[AssetKey, Tuple[str, In]]: +) -> Mapping[AssetKey, "NamedIn"]: """Creates a mapping from AssetKey to (name of input, In object).""" deps = check.opt_set_param(deps, "deps", AssetKey) @@ -98,7 +98,7 @@ def build_asset_ins( "of the arguments to the decorated function" ) - ins_by_asset_key: Dict[AssetKey, Tuple[str, In]] = {} + ins_by_asset_key: Dict[AssetKey, NamedIn] = {} for input_name in all_input_names: asset_key = None @@ -116,7 +116,7 @@ def build_asset_ins( asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name]))) - ins_by_asset_key[asset_key] = ( + ins_by_asset_key[asset_key] = NamedIn( input_name.replace("-", "_"), In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type), ) @@ -127,7 +127,7 @@ def build_asset_ins( f"deps value {asset_key} also declared as input/AssetIn" ) # mypy doesn't realize that Nothing is a valid type here - ins_by_asset_key[asset_key] = ( + ins_by_asset_key[asset_key] = NamedIn( stringify_asset_key_to_input_name(asset_key), In(cast(type, Nothing)), ) @@ -135,25 +135,25 @@ def build_asset_ins( return ins_by_asset_key -def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]: +def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "NamedOut"]: """Creates a mapping from AssetKey to (name of output, Out object).""" - outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {} + named_outs_by_asset_key: Dict[AssetKey, NamedOut] = {} for output_name, asset_out in asset_outs.items(): out = asset_out.to_out() asset_key = asset_out.key or AssetKey( list(filter(None, [*(asset_out.key_prefix or []), output_name])) ) - outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out) + named_outs_by_asset_key[asset_key] = NamedOut(output_name.replace("-", "_"), out) - return outs_by_asset_key + return named_outs_by_asset_key def build_subsettable_asset_ins( asset_ins: Mapping[AssetKey, Tuple[str, In]], asset_outs: Mapping[AssetKey, Tuple[str, Out]], internal_upstream_deps: Iterable[AbstractSet[AssetKey]], -) -> Mapping[AssetKey, Tuple[str, In]]: +) -> Mapping[AssetKey, "NamedIn"]: """Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not currently an input, but may become one if this asset is subset. @@ -165,18 +165,18 @@ def build_subsettable_asset_ins( # set of asset keys which are upstream of another asset, and are not currently inputs potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys())) return { - key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing)) + key: NamedIn(f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing)) for key, (name, _) in asset_outs.items() if key in potential_deps } -class InMapping(NamedTuple): +class NamedIn(NamedTuple): input_name: str input: In -class OutMapping(NamedTuple): +class NamedOut(NamedTuple): output_name: str output: Out @@ -234,20 +234,34 @@ class AssetsDefinitionBuilder: def __init__( self, *, - input_tuples_by_asset_key: Mapping[AssetKey, Tuple[str, In]], - output_tables_by_asset_key: Mapping[AssetKey, Tuple[str, Out]], + named_ins_by_asset_key: Mapping[AssetKey, NamedIn], + named_outs_by_asset_key: Mapping[AssetKey, NamedOut], internal_deps: Mapping[AssetKey, Set[AssetKey]], op_name: str, multi_asset_args: AssetsDefinitionBuilderArgs, fn: Callable[..., Any], ) -> None: - self._passed_input_tuples_by_asset_key = input_tuples_by_asset_key - self.output_tuples_by_asset_key = output_tables_by_asset_key + self.named_outs_by_asset_key = named_outs_by_asset_key self.internal_deps = internal_deps self.op_name = op_name self.args = multi_asset_args self.fn = fn + self.named_ins_by_asset_key = ( + ( + { + **named_ins_by_asset_key, + **build_subsettable_asset_ins( + named_ins_by_asset_key, + named_outs_by_asset_key, + self.internal_deps.values(), + ), + } + ) + if self.args.can_subset and self.internal_deps + else named_ins_by_asset_key + ) + @staticmethod def from_args(*, fn: Callable[..., Any], args: AssetsDefinitionBuilderArgs): op_name = args.name or fn.__name__ @@ -279,10 +293,10 @@ def from_specs( ): check.param_invariant(args.specs, "args", "Must use specs in this codepath") - output_tuples_by_asset_key = {} + named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {} for asset_spec in args.specs: output_name = asset_spec.key.to_python_identifier() - output_tuples_by_asset_key[asset_spec.key] = ( + named_outs_by_asset_key[asset_spec.key] = NamedOut( output_name, Out( Nothing, @@ -296,12 +310,9 @@ def from_specs( upstream_keys = set() for spec in args.specs: for dep in spec.deps: - if dep.asset_key not in output_tuples_by_asset_key: + if dep.asset_key not in named_outs_by_asset_key: upstream_keys.add(dep.asset_key) - if ( - dep.asset_key in output_tuples_by_asset_key - and dep.partition_mapping is not None - ): + if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None: # self-dependent asset also needs to be considered an upstream_key upstream_keys.add(dep.asset_key) @@ -324,8 +335,8 @@ def from_specs( } return AssetsDefinitionBuilder( - input_tuples_by_asset_key=input_tuples_by_asset_key, - output_tables_by_asset_key=output_tuples_by_asset_key, + named_ins_by_asset_key=input_tuples_by_asset_key, + named_outs_by_asset_key=named_outs_by_asset_key, internal_deps=internal_deps, op_name=op_name, multi_asset_args=args, @@ -388,8 +399,8 @@ def from_asset_outs( internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps} return AssetsDefinitionBuilder( - input_tuples_by_asset_key=inputs_tuples_by_asset_key, - output_tables_by_asset_key=output_tuples_by_asset_key, + named_ins_by_asset_key=inputs_tuples_by_asset_key, + named_outs_by_asset_key=output_tuples_by_asset_key, internal_deps=internal_deps, op_name=op_name, multi_asset_args=args, @@ -401,53 +412,26 @@ def group_name(self) -> Optional[str]: return self.args.group_name @cached_property - def input_tuples_by_asset_key(self) -> Mapping[AssetKey, Tuple[str, In]]: - if self.args.can_subset and self.internal_deps: - return { - **self._passed_input_tuples_by_asset_key, - **build_subsettable_asset_ins( - self._passed_input_tuples_by_asset_key, - self.output_tuples_by_asset_key, - self.internal_deps.values(), - ), - } - else: - return self._passed_input_tuples_by_asset_key - - @cached_property - def in_mappings(self) -> Mapping[AssetKey, InMapping]: - return { - asset_key: InMapping(input_name, in_) - for asset_key, (input_name, in_) in self.input_tuples_by_asset_key.items() - } - - @cached_property - def out_mappings(self) -> Mapping[AssetKey, OutMapping]: - return { - asset_key: OutMapping(output_name, out_) - for asset_key, (output_name, out_) in self.output_tuples_by_asset_key.items() - } - - @cached_property - def asset_outs_by_output_name(self) -> Mapping[str, Out]: - return dict(self.out_mappings.values()) + def outs_by_output_name(self) -> Mapping[str, Out]: + return dict(self.named_outs_by_asset_key.values()) @cached_property def asset_keys_by_input_name(self) -> Mapping[str, AssetKey]: return { - in_mapping.input_name: asset_key for asset_key, in_mapping in self.in_mappings.items() + in_mapping.input_name: asset_key + for asset_key, in_mapping in self.named_ins_by_asset_key.items() } @cached_property def asset_keys_by_output_name(self) -> Mapping[str, AssetKey]: return { out_mapping.output_name: asset_key - for asset_key, out_mapping in self.out_mappings.items() + for asset_key, out_mapping in self.named_outs_by_asset_key.items() } @cached_property def asset_keys(self) -> Set[AssetKey]: - return set(self.out_mappings.keys()) + return set(self.named_outs_by_asset_key.keys()) @cached_property def check_specs_by_output_name(self) -> Mapping[str, AssetCheckSpec]: @@ -465,24 +449,23 @@ def check_outs_by_output_name(self) -> Mapping[str, Out]: @cached_property def combined_outs_by_output_name(self) -> Mapping[str, Out]: return { - **self.asset_outs_by_output_name, + **self.outs_by_output_name, **self.check_outs_by_output_name, } @cached_property def overlapping_output_names(self) -> Set[str]: - return set(self.asset_outs_by_output_name.keys()) & set( - self.check_outs_by_output_name.keys() - ) + return set(self.outs_by_output_name.keys()) & set(self.check_outs_by_output_name.keys()) @cached_property - def asset_ins_by_input_names(self) -> Mapping[str, In]: - return {in_name: in_obj for in_name, in_obj in self.in_mappings.values()} + def ins_by_input_names(self) -> Mapping[str, In]: + return {in_name: in_obj for in_name, in_obj in self.named_ins_by_asset_key.values()} @cached_property def asset_keys_by_input_names(self) -> Mapping[str, AssetKey]: return { - in_mapping.input_name: asset_key for asset_key, in_mapping in self.in_mappings.items() + in_mapping.input_name: asset_key + for asset_key, in_mapping in self.named_ins_by_asset_key.items() } @cached_property @@ -506,7 +489,7 @@ def _create_op_definition(self) -> OpDefinition: return _Op( name=self.op_name, description=self.args.description, - ins=self.asset_ins_by_input_names, + ins=self.ins_by_input_names, out=self.combined_outs_by_output_name, required_resource_keys=compute_required_resource_keys( self.args.required_resource_keys,