diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py index 83b9dc5b05be8..96f2c0b99d41d 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py @@ -1,15 +1,4 @@ -from typing import ( - AbstractSet, - Any, - Callable, - Iterable, - Mapping, - Optional, - Sequence, - Set, - Tuple, - Union, -) +from typing import AbstractSet, Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union from typing_extensions import TypeAlias @@ -30,9 +19,11 @@ from dagster._core.storage.tags import COMPUTE_KIND_TAG from dagster._utils.warnings import disable_dagster_warnings -from ..input import In from .asset_decorator import make_asset_deps from .decorator_assets_definition_builder import ( + DecoratorAssetsDefinitionBuilder, + DecoratorAssetsDefinitionBuilderArgs, + NamedIn, build_named_ins, compute_required_resource_keys, get_function_params_without_context_or_config_or_resources, @@ -43,13 +34,13 @@ AssetCheckFunction: TypeAlias = Callable[..., AssetCheckFunctionReturn] -def _build_asset_check_input( +def _build_asset_check_named_ins( name: str, asset_key: AssetKey, fn: Callable[..., Any], additional_ins: Mapping[str, AssetIn], additional_deps: Optional[AbstractSet[AssetKey]], -) -> Mapping[AssetKey, Tuple[str, In]]: +) -> Mapping[AssetKey, NamedIn]: fn_params = get_function_params_without_context_or_config_or_resources(fn) if asset_key in (additional_deps or []): @@ -192,7 +183,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: asset_key = AssetKey.from_coercible_or_definition(asset) additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []]) - input_tuples_by_asset_key = _build_asset_check_input( + named_in_by_asset_key = _build_asset_check_named_ins( resolved_name, asset_key, fn, @@ -202,7 +193,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: # additional_deps on AssetCheckSpec holds the keys passed to additional_deps and # additional_ins. We don't want to include the primary asset key in this set. - additional_ins_and_deps = input_tuples_by_asset_key.keys() - {asset_key} + additional_ins_and_deps = named_in_by_asset_key.keys() - {asset_key} spec = AssetCheckSpec( name=resolved_name, @@ -215,18 +206,48 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: resource_defs_for_execution = wrap_resources_for_execution(resource_defs) - op_required_resource_keys = compute_required_resource_keys( - required_resource_keys or set(), - resource_defs_for_execution, - fn=fn, + builder_args = DecoratorAssetsDefinitionBuilderArgs( decorator_name="@asset_check", + name=name, + description=description, + required_resource_keys=required_resource_keys or set(), + config_schema=config_schema, + retry_policy=retry_policy, + specs=[], + check_specs=[spec], + can_subset=False, + compute_kind=compute_kind, + op_tags=op_tags, + op_def_resource_defs=resource_defs_for_execution, + assets_def_resource_defs=resource_defs_for_execution, + upstream_asset_deps=[], + # unsupported capabiltiies in asset checks + partitions_def=None, + code_version=None, + backfill_policy=None, + group_name=None, + # non-sensical args in this context + asset_deps={}, + asset_in_map={}, + asset_out_map={}, ) + builder = DecoratorAssetsDefinitionBuilder( + named_ins_by_asset_key=named_in_by_asset_key, + named_outs_by_asset_key={}, + internal_deps={}, + op_name=spec.get_python_identifier(), + args=builder_args, + fn=fn, + ) + + op_required_resource_keys = builder.required_resource_keys + out = Out(dagster_type=None) op_def = _Op( name=spec.get_python_identifier(), - ins=dict(input_tuples_by_asset_key.values()), + ins=dict(named_in_by_asset_key.values()), out=out, # Any resource requirements specified as arguments will be identified as # part of the Op definition instantiation @@ -242,7 +263,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: return AssetChecksDefinition.create( keys_by_input_name={ input_tuple[0]: asset_key - for asset_key, input_tuple in input_tuples_by_asset_key.items() + for asset_key, input_tuple in named_in_by_asset_key.items() }, node_def=op_def, resource_defs=resource_defs_for_execution, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py index 77a153955da9c..57442ed46901f 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py @@ -507,18 +507,22 @@ def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: asset_name=self.op_name, ) + @cached_property + def required_resource_keys(self) -> AbstractSet[str]: + return compute_required_resource_keys( + required_resource_keys=self.args.required_resource_keys, + resource_defs=self.args.assets_def_resource_defs, + fn=self.fn, + decorator_name=self.args.decorator_name, + ) + def _create_op_definition(self) -> OpDefinition: return _Op( name=self.op_name, description=self.args.description, ins=self.ins_by_input_names, out=self.combined_outs_by_output_name, - required_resource_keys=compute_required_resource_keys( - required_resource_keys=self.args.required_resource_keys, - resource_defs=self.args.op_def_resource_defs, - fn=self.fn, - decorator_name=self.args.decorator_name, - ), + required_resource_keys=self.required_resource_keys, tags={ **({COMPUTE_KIND_TAG: self.args.compute_kind} if self.args.compute_kind else {}), **(self.args.op_tags or {}),