Skip to content

Commit

Permalink
Use builder.create_op_definition in @multi_asset_check
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 13, 2024
1 parent 352d985 commit c867666
Showing 1 changed file with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
from dagster._core.definitions.asset_in import AssetIn
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.output import Out
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._utils.warnings import disable_dagster_warnings

from .asset_decorator import make_asset_deps
Expand All @@ -29,7 +27,6 @@
create_check_specs_by_output_name,
get_function_params_without_context_or_config_or_resources,
)
from .op_decorator import _Op

AssetCheckFunctionReturn: TypeAlias = AssetCheckResult
AssetCheckFunction: TypeAlias = Callable[..., AssetCheckFunctionReturn]
Expand Down Expand Up @@ -321,9 +318,6 @@ def checks():
def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
op_name = name or fn.__name__

outs = {
spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs
}
named_ins_by_asset_key = build_named_ins(
fn=fn,
asset_ins={},
Expand All @@ -334,14 +328,13 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
builder_args = DecoratorAssetsDefinitionBuilderArgs(
decorator_name="@multi_asset_check",
name=name,
# @asset_check previous behavior is to not set description on underlying op
op_description=None,
op_description=description,
required_resource_keys=required_resource_keys or set(),
config_schema=config_schema,
retry_policy=retry_policy,
specs=[],
check_specs_by_output_name=create_check_specs_by_output_name(specs),
can_subset=False,
can_subset=can_subset,
compute_kind=compute_kind,
op_tags=op_tags,
op_def_resource_defs=resource_defs,
Expand All @@ -368,19 +361,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
)

with disable_dagster_warnings():
op_def = _Op(
name=op_name,
description=description,
ins=dict(named_ins_by_asset_key.values()),
out=outs,
required_resource_keys=builder.required_resource_keys,
tags={
**({COMPUTE_KIND_TAG: compute_kind} if compute_kind else {}),
**(op_tags or {}),
},
config_schema=config_schema,
retry_policy=retry_policy,
)(fn)
op_def = builder.create_op_definition()

return AssetChecksDefinition.create(
node_def=op_def,
Expand Down

0 comments on commit c867666

Please sign in to comment.