Skip to content

Commit

Permalink
Revert "Extract _get_op_def_compute_fn into wrap_source_asset_observe…
Browse files Browse the repository at this point in the history
…_fn_in_op_compute_fn (#16618)" (#16688)

## Summary & Motivation

This caused bugs caught in manual testing:

```
Could not load location dagster_test.toys.repo to check for sensors due to the following error: TypeError: 'staticmethod' object is not callable

Stack Trace:
  File "/Users/johann/dagster/python_modules/dagster/dagster/_grpc/server.py", line 295, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_grpc/server.py", line 139, in __init__
    loadable_targets = get_loadable_targets(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_grpc/utils.py", line 47, in get_loadable_targets
    else loadable_targets_from_python_module(module_name, working_directory)
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
    module = load_python_module(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/code_pointer.py", line 135, in load_python_module
    return importlib.import_module(module_name)
  File "/Users/johann/.pyenv/versions/3.9.10/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/johann/dagster/python_modules/dagster-test/dagster_test/toys/repo.py", line 201, in <module>
    def data_versions_repository():
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/decorators/repository_decorator.py", line 405, in repository
    return _Repository()(definitions_fn)
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/decorators/repository_decorator.py", line 161, in __call__
    else CachingRepositoryData.from_list(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data.py", line 346, in from_list
    return build_caching_repository_data_from_list(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py", line 195, in build_caching_repository_data_from_list
    for job_def in get_base_asset_jobs(
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/assets_job.py", line 76, in get_base_asset_jobs
    for observable in [sa for sa in source_assets if sa.is_observable]:
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/assets_job.py", line 76, in <listcomp>
    for observable in [sa for sa in source_assets if sa.is_observable]:
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/source_asset.py", line 256, in is_observable
    return self.node_def is not None
  File "/Users/johann/dagster/python_modules/dagster/dagster/_core/definitions/source_asset.py", line 270, in node_def
    compute_fn=wrap_source_asset_observe_fn_in_op_compute_fn(self),
```

Reverting for now


## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Sep 21, 2023
1 parent e6d199d commit e4337b9
Showing 1 changed file with 61 additions and 76 deletions.
137 changes: 61 additions & 76 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Expand Down Expand Up @@ -47,11 +46,6 @@
DagsterInvalidInvocationError,
DagsterInvalidObservationError,
)

if TYPE_CHECKING:
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
)
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings
Expand All @@ -60,75 +54,6 @@
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]


@staticmethod
def wrap_source_asset_observe_fn_in_op_compute_fn(
source_asset: "SourceAsset",
) -> "DecoratedOpFunction":
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
is_context_provided,
)
from dagster._core.execution.context.compute import (
OpExecutionContext,
)

check.not_none(source_asset.observe_fn, "Must be an observable source asset")
assert source_asset.observe_fn # for type checker

observe_fn = source_asset.observe_fn

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
observe_fn(context, **resource_kwargs)
if observe_fn_has_context
else observe_fn(**resource_kwargs)
)

if isinstance(observe_fn_return_value, DataVersion):
if source_asset.partitions_def is not None:
raise DagsterInvalidObservationError(
f"{source_asset.key} is partitioned, so its observe function should return a"
" DataVersionsByPartition, not a DataVersion"
)

context.log_event(
AssetObservation(
asset_key=source_asset.key,
tags={DATA_VERSION_TAG: observe_fn_return_value.value},
)
)
elif isinstance(observe_fn_return_value, DataVersionsByPartition):
if source_asset.partitions_def is None:
raise DagsterInvalidObservationError(
f"{source_asset.key} is not partitioned, so its observe function should return"
" a DataVersion, not a DataVersionsByPartition"
)

for (
partition_key,
data_version,
) in observe_fn_return_value.data_versions_by_partition.items():
context.log_event(
AssetObservation(
asset_key=source_asset.key,
tags={DATA_VERSION_TAG: data_version.value},
partition=partition_key,
)
)
else:
raise DagsterInvalidObservationError(
f"Observe function for {source_asset.key} must return a DataVersion or"
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)

return DecoratedOpFunction(fn)


@experimental_param(param="resource_defs")
@experimental_param(param="io_manager_def")
class SourceAsset(ResourceAddable):
Expand Down Expand Up @@ -255,6 +180,66 @@ def is_observable(self) -> bool:
"""bool: Whether the asset is observable."""
return self.node_def is not None

def _get_op_def_compute_fn(self, observe_fn: SourceAssetObserveFunction):
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
is_context_provided,
)
from dagster._core.execution.context.compute import (
OpExecutionContext,
)

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
observe_fn(context, **resource_kwargs)
if observe_fn_has_context
else observe_fn(**resource_kwargs)
)

if isinstance(observe_fn_return_value, DataVersion):
if self.partitions_def is not None:
raise DagsterInvalidObservationError(
f"{self.key} is partitioned, so its observe function should return a"
" DataVersionsByPartition, not a DataVersion"
)

context.log_event(
AssetObservation(
asset_key=self.key,
tags={DATA_VERSION_TAG: observe_fn_return_value.value},
)
)
elif isinstance(observe_fn_return_value, DataVersionsByPartition):
if self.partitions_def is None:
raise DagsterInvalidObservationError(
f"{self.key} is not partitioned, so its observe function should return a"
" DataVersion, not a DataVersionsByPartition"
)

for (
partition_key,
data_version,
) in observe_fn_return_value.data_versions_by_partition.items():
context.log_event(
AssetObservation(
asset_key=self.key,
tags={DATA_VERSION_TAG: data_version.value},
partition=partition_key,
)
)
else:
raise DagsterInvalidObservationError(
f"Observe function for {self.key} must return a DataVersion or"
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)

return DecoratedOpFunction(fn)

@property
def required_resource_keys(self) -> AbstractSet[str]:
return {requirement.key for requirement in self.get_resource_requirements()}
Expand All @@ -267,7 +252,7 @@ def node_def(self) -> Optional[OpDefinition]:

if self._node_def is None:
self._node_def = OpDefinition(
compute_fn=wrap_source_asset_observe_fn_in_op_compute_fn(self),
compute_fn=self._get_op_def_compute_fn(self.observe_fn),
name=self.key.to_python_identifier(),
description=self.description,
required_resource_keys=self._required_resource_keys,
Expand Down

0 comments on commit e4337b9

Please sign in to comment.