Skip to content

Commit

Permalink
Hide non_argument_deps
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 22, 2024
1 parent c2bf0d9 commit 6056f43
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 34 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
166 changes: 145 additions & 21 deletions python_modules/dagster/dagster/_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ def is_public(obj: Annotatable) -> bool:
@dataclass
class DeprecatedInfo:
breaking_version: str
additional_warn_text: Optional[str] = None
subject: Optional[str] = None
hidden: bool
additional_warn_text: Optional[str]
subject: Optional[str]


@overload
Expand Down Expand Up @@ -124,7 +125,6 @@ def deprecated(
emit_runtime_warning (bool): Whether to emit a warning when the function is called.
Usage:
.. code-block:: python
@deprecated(breaking_version="2.0", additional_warn_text="Use my_new_function instead")
Expand All @@ -140,6 +140,7 @@ def not_deprecated_function():
...
some_deprecated_function()
...
"""
if __obj is None:
return lambda obj: deprecated(
Expand All @@ -154,7 +155,12 @@ def not_deprecated_function():
setattr(
target,
_DEPRECATED_ATTR_NAME,
DeprecatedInfo(breaking_version, additional_warn_text, subject),
DeprecatedInfo(
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
subject=subject,
hidden=False,
),
)

if emit_runtime_warning:
Expand Down Expand Up @@ -233,6 +239,14 @@ def deprecated_param(
additional_warn_text (str): Additional text to display after the deprecation warning.
Typically this should suggest a newer API.
emit_runtime_warning (bool): Whether to emit a warning when the function is called.
hidden (bool): Whether or not this is a hidden parameters. Hidden parameters are only
passed via kwargs and are hidden from the type signature. This makes it so
that this hidden parameter does not appear in typeaheads. In order to provide
high quality error messages we also provide the helper function
only_allow_hidden_params_in_kwargs to ensure there are high quality
error messages if the user passes an unsupported keyword argument.
"""
if __obj is None:
return lambda obj: deprecated_param( # type: ignore
Expand All @@ -243,29 +257,125 @@ def deprecated_param(
emit_runtime_warning=emit_runtime_warning,
)
else:
return attach_deprecation_info_and_wrap(
__obj,
param=param,
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
emit_runtime_warning=emit_runtime_warning,
hidden=False,
)


def attach_deprecation_info_and_wrap(
obj: T_Annotatable,
param: str,
breaking_version: str,
additional_warn_text: Optional[str] = None,
emit_runtime_warning: bool = True,
hidden: bool = False,
) -> T_Annotatable:
if not hidden:
check.invariant(
_annotatable_has_param(__obj, param),
_annotatable_has_param(obj, param),
f"Attempted to mark undefined parameter `{param}` deprecated.",
)
target = _get_annotation_target(__obj)
if not hasattr(target, _DEPRECATED_PARAM_ATTR_NAME):
setattr(target, _DEPRECATED_PARAM_ATTR_NAME, {})
getattr(target, _DEPRECATED_PARAM_ATTR_NAME)[param] = DeprecatedInfo(
target = _get_annotation_target(obj)
if not hasattr(target, _DEPRECATED_PARAM_ATTR_NAME):
setattr(target, _DEPRECATED_PARAM_ATTR_NAME, {})
getattr(target, _DEPRECATED_PARAM_ATTR_NAME)[param] = DeprecatedInfo(
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
hidden=hidden,
subject=None,
)

if not emit_runtime_warning:
return obj

condition = lambda *_, **kwargs: kwargs.get(param) is not None
warning_fn = lambda: deprecation_warning(
_get_subject(obj, param=param),
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
stacklevel=4,
)
return apply_pre_call_decorator(obj, warning_fn, condition=condition)


@overload
def hidden_param(
__obj: T_Annotatable,
*,
param: str,
breaking_version: str,
additional_warn_text: Optional[str] = ...,
emit_runtime_warning: bool = ...,
) -> T_Annotatable: ...


@overload
def hidden_param(
__obj: None = ...,
*,
param: str,
breaking_version: str,
additional_warn_text: Optional[str] = ...,
emit_runtime_warning: bool = ...,
) -> Callable[[T_Annotatable], T_Annotatable]: ...


def hidden_param(
__obj: Optional[T_Annotatable] = None,
*,
param: str,
breaking_version: str,
additional_warn_text: Optional[str] = None,
emit_runtime_warning: bool = True,
) -> T_Annotatable:
"""Hidden parameters are only passed via kwargs and are hidden from the
type signature. This makes it so that this hidden parameter does not
appear in typeaheads. In order to provide high quality error messages
we also provide the helper function only_allow_hidden_params_in_kwargs
to ensure there are high quality error messages if the user passes
an unsupported keyword argument.
Args:
breaking_version (str): The version at which the deprecated function will be removed.
additional_warn_text (Optional[str]): Additional text to display after the deprecation warning.
Typically this should suggest a newer API.
subject (Optional[str]): The subject of the deprecation warning. Defaults to a string
representation of the decorated object. This is useful when marking usage of
a deprecated API inside an otherwise non-deprecated function, so
that it can be easily cleaned up later. It should only be used with
`emit_runtime_warning=False`, as we don't want to warn users when a
deprecated API is used internally.
emit_runtime_warning (bool): Whether to emit a warning when the function is called.
Usage:
.. code-block:: python
@hidden_param(breaking_version="2.0", additional_warn_text="Use my_new_function instead")
def func_with_hidden_args(**kwargs):
only_allow_hidden_params_in_kwargs(func_with_hidden_args, kwargs)
"""
if __obj is None:
return lambda obj: hidden_param( # type: ignore
obj,
param=param,
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
emit_runtime_warning=emit_runtime_warning,
)
else:
return attach_deprecation_info_and_wrap(
__obj,
param=param,
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
emit_runtime_warning=emit_runtime_warning,
hidden=True,
)

if emit_runtime_warning:
condition = lambda *_, **kwargs: kwargs.get(param) is not None
warning_fn = lambda: deprecation_warning(
_get_subject(__obj, param=param),
breaking_version=breaking_version,
additional_warn_text=additional_warn_text,
stacklevel=4,
)
return apply_pre_call_decorator(__obj, warning_fn, condition=condition)
else:
return __obj


def has_deprecated_params(obj: Annotatable) -> bool:
Expand Down Expand Up @@ -575,3 +685,17 @@ def _get_warning_stacklevel(obj: Annotatable):
def _annotatable_has_param(obj: Annotatable, param: str) -> bool:
target_fn = get_decorator_target(obj)
return param in inspect.signature(target_fn).parameters


def only_allow_hidden_params_in_kwargs(annotatable: Annotatable, kwargs: Mapping[str, Any]) -> None:
deprecated_params = (
get_deprecated_params(annotatable) if has_deprecated_params(annotatable) else {}
)
for param in kwargs:
if param not in deprecated_params:
raise TypeError(f"{annotatable.__name__} got an unexpected keyword argument '{param}'")

check.invariant(
deprecated_params[param].hidden,
f"Unexpected non-hidden deprecated parameter '{param}' in kwargs. Should never get here.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
)

import dagster._check as check
from dagster._annotations import deprecated_param, experimental_param
from dagster._annotations import (
experimental_param,
hidden_param,
only_allow_hidden_params_in_kwargs,
)
from dagster._config.config_schema import UserConfigSchema
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
Expand Down Expand Up @@ -55,6 +59,7 @@
@overload
def asset(
compute_fn: Callable[..., Any],
**kwargs,
) -> AssetsDefinition: ...


Expand Down Expand Up @@ -85,20 +90,43 @@ def asset(
retry_policy: Optional[RetryPolicy] = ...,
code_version: Optional[str] = ...,
key: Optional[CoercibleToAssetKey] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
owners: Optional[Sequence[str]] = ...,
**kwargs,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


def _validate_hidden_non_argument_dep_param(
non_argument_deps: Any,
) -> Optional[Union[Set[AssetKey], Set[str]]]:
if non_argument_deps is None:
return non_argument_deps

if not isinstance(non_argument_deps, set):
check.failed("non_arguments_deps must be a set if not None")

assert isinstance(non_argument_deps, set)

check.set_param(non_argument_deps, "non_argument_deps", of_type=(str, AssetKey))

check.invariant(
all(isinstance(dep, str) for dep in non_argument_deps)
or all(isinstance(dep, AssetKey) for dep in non_argument_deps),
)

return non_argument_deps


@experimental_param(param="resource_defs")
@experimental_param(param="io_manager_def")
@experimental_param(param="auto_materialize_policy")
@experimental_param(param="backfill_policy")
@experimental_param(param="owners")
@experimental_param(param="tags")
@deprecated_param(
param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."
@hidden_param(
param="non_argument_deps",
breaking_version="2.0.0",
additional_warn_text="use `deps` instead.",
)
def asset(
compute_fn: Optional[Callable[..., Any]] = None,
Expand Down Expand Up @@ -127,9 +155,9 @@ def asset(
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
key: Optional[CoercibleToAssetKey] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
**kwargs,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -203,12 +231,13 @@ def asset(
output when given the same inputs.
check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that
execute in the decorated function after materializing the asset.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the asset.
key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name.
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the asset.
Hidden parameter not exposed in the decorator signature, but passed in kwargs.
Examples:
.. code-block:: python
Expand All @@ -220,10 +249,13 @@ def my_asset(my_upstream_asset: int) -> int:
compute_kind = check.opt_str_param(compute_kind, "compute_kind")
required_resource_keys = check.opt_set_param(required_resource_keys, "required_resource_keys")
upstream_asset_deps = _deps_and_non_argument_deps_to_asset_deps(
deps=deps, non_argument_deps=non_argument_deps
deps=deps,
non_argument_deps=_validate_hidden_non_argument_dep_param(kwargs.get("non_argument_deps")),
)
resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))

only_allow_hidden_params_in_kwargs(asset, kwargs)

args = AssetDecoratorArgs(
name=name,
key_prefix=key_prefix,
Expand Down Expand Up @@ -462,8 +494,10 @@ def create_assets_def_from_fn_and_decorator_args(


@experimental_param(param="resource_defs")
@deprecated_param(
param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."
@hidden_param(
param="non_argument_deps",
breaking_version="2.0.0",
additional_warn_text="use `deps` instead.",
)
def multi_asset(
*,
Expand All @@ -486,8 +520,7 @@ def multi_asset(
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
**kwargs: Mapping[str, Any],
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same op and same
upstream assets.
Expand Down Expand Up @@ -576,14 +609,19 @@ def my_function(asset0):
"""
from dagster._core.execution.build_resources import wrap_resources_for_execution

only_allow_hidden_params_in_kwargs(multi_asset, kwargs)

args = DecoratorAssetsDefinitionBuilderArgs(
name=name,
op_description=description,
specs=check.opt_list_param(specs, "specs", of_type=AssetSpec),
check_specs_by_output_name=create_check_specs_by_output_name(check_specs),
asset_out_map=check.opt_mapping_param(outs, "outs", key_type=str, value_type=AssetOut),
upstream_asset_deps=_deps_and_non_argument_deps_to_asset_deps(
deps=deps, non_argument_deps=non_argument_deps
deps=deps,
non_argument_deps=_validate_hidden_non_argument_dep_param(
kwargs.get("non_argument_deps")
),
),
asset_deps=check.opt_mapping_param(
internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set
Expand Down
Loading

0 comments on commit 6056f43

Please sign in to comment.