Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use builder.create_op_definition in @multi_asset_check #22415

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
from dagster._core.definitions.asset_in import AssetIn
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.output import Out
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._utils.warnings import disable_dagster_warnings

from .asset_decorator import make_asset_deps
Expand All @@ -29,7 +27,6 @@
create_check_specs_by_output_name,
get_function_params_without_context_or_config_or_resources,
)
from .op_decorator import _Op

AssetCheckFunctionReturn: TypeAlias = AssetCheckResult
AssetCheckFunction: TypeAlias = Callable[..., AssetCheckFunctionReturn]
Expand Down Expand Up @@ -322,9 +319,6 @@ def checks():
def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
op_name = name or fn.__name__

outs = {
spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs
}
named_ins_by_asset_key = build_named_ins(
fn=fn,
asset_ins={},
Expand All @@ -335,14 +329,13 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
builder_args = DecoratorAssetsDefinitionBuilderArgs(
decorator_name="@multi_asset_check",
name=name,
# @asset_check previous behavior is to not set description on underlying op
op_description=None,
op_description=description,
required_resource_keys=required_resource_keys or set(),
config_schema=config_schema,
retry_policy=retry_policy,
specs=[],
check_specs_by_output_name=create_check_specs_by_output_name(specs),
can_subset=False,
can_subset=can_subset,
compute_kind=compute_kind,
op_tags=op_tags,
op_def_resource_defs=resource_defs,
Expand All @@ -369,19 +362,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
)

with disable_dagster_warnings():
op_def = _Op(
name=op_name,
description=description,
ins=dict(named_ins_by_asset_key.values()),
out=outs,
required_resource_keys=builder.required_resource_keys,
tags={
**({COMPUTE_KIND_TAG: compute_kind} if compute_kind else {}),
**(op_tags or {}),
},
config_schema=config_schema,
retry_policy=retry_policy,
)(fn)
op_def = builder.create_op_definition()

return AssetChecksDefinition.create(
node_def=op_def,
Expand Down