Skip to content

Commit

Permalink
[external-assets] ObserveResult
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 5, 2024
1 parent 7274fb7 commit 1670c1f
Show file tree
Hide file tree
Showing 11 changed files with 715 additions and 76 deletions.
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@
make_values_resource as make_values_resource,
resource as resource,
)
from dagster._core.definitions.result import MaterializeResult as MaterializeResult
from dagster._core.definitions.result import (
MaterializeResult as MaterializeResult,
)
from dagster._core.definitions.run_config import RunConfig as RunConfig
from dagster._core.definitions.run_request import (
AddDynamicPartitionsRequest as AddDynamicPartitionsRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import Output
from dagster._core.definitions.source_asset import (
SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION,
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
Expand Down Expand Up @@ -137,12 +139,6 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
Expand Down Expand Up @@ -173,10 +169,11 @@ def _shim_assets_def(context: AssetExecutionContext):
op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
)
return return_value

check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from .inference import infer_output_props
from .input import In, InputDefinition
from .output import Out, OutputDefinition
from .result import MaterializeResult
from .result import MaterializeResult, ObserveResult

if TYPE_CHECKING:
from dagster._core.definitions.asset_layer import AssetLayer
Expand Down Expand Up @@ -574,4 +574,4 @@ def _validate_context_type_hint(fn):

def _is_result_object_type(ttype):
# Is this type special result object type
return ttype in (MaterializeResult, AssetCheckResult)
return ttype in (MaterializeResult, ObserveResult, AssetCheckResult)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Output,
)
from .output import DynamicOutputDefinition, OutputDefinition
from .result import MaterializeResult
from .result import AssetResult

if TYPE_CHECKING:
from ..execution.context.compute import OpExecutionContext
Expand Down Expand Up @@ -344,7 +344,7 @@ def _resolve_inputs(
return input_dict


def _key_for_result(result: MaterializeResult, context: "BaseDirectExecutionContext") -> AssetKey:
def _key_for_result(result: AssetResult, context: "BaseDirectExecutionContext") -> AssetKey:
if not context.per_invocation_properties.assets_def:
raise DagsterInvariantViolationError(
f"Op {context.per_invocation_properties.alias} does not have an assets definition."
Expand All @@ -359,13 +359,13 @@ def _key_for_result(result: MaterializeResult, context: "BaseDirectExecutionCont
return next(iter(context.per_invocation_properties.assets_def.keys))

raise DagsterInvariantViolationError(
"MaterializeResult did not include asset_key and it can not be inferred. Specify which"
f"{result.__class__.__name__} did not include asset_key and it can not be inferred. Specify which"
f" asset_key, options are: {context.per_invocation_properties.assets_def.keys}"
)


def _output_name_for_result_obj(
event: MaterializeResult,
event: AssetResult,
context: "BaseDirectExecutionContext",
):
if not context.per_invocation_properties.assets_def:
Expand All @@ -388,7 +388,7 @@ def _handle_gen_event(
(AssetMaterialization, AssetObservation, ExpectationResult),
):
return event
elif isinstance(event, MaterializeResult):
elif isinstance(event, AssetResult):
output_name = _output_name_for_result_obj(event, context)
outputs_seen.add(output_name)
return event
Expand Down Expand Up @@ -516,7 +516,7 @@ def _type_check_function_output(
for event in validate_and_coerce_op_result_to_iterator(result, op_context, op_def.output_defs):
if isinstance(event, (Output, DynamicOutput)):
_type_check_output(output_defs_by_name[event.output_name], event, context)
elif isinstance(event, (MaterializeResult)):
elif isinstance(event, AssetResult):
# ensure result objects are contextually valid
_output_name_for_result_obj(event, context)

Expand Down
46 changes: 35 additions & 11 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import NamedTuple, Optional, Sequence

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.data_version import DataVersion

Expand All @@ -12,9 +12,9 @@
from .metadata import MetadataUserInput


class MaterializeResult(
class AssetResult(
NamedTuple(
"_MaterializeResult",
"_AssetResult",
[
("asset_key", PublicAttr[Optional[AssetKey]]),
("metadata", PublicAttr[Optional[MetadataUserInput]]),
Expand All @@ -23,14 +23,7 @@ class MaterializeResult(
],
)
):
"""An object representing a successful materialization of an asset. These can be returned from
@asset and @multi_asset decorated functions to pass metadata or specify specific assets were
materialized.
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
"""
"""Base class for MaterializeResult and ObserveResult."""

def __new__(
cls,
Expand Down Expand Up @@ -62,3 +55,34 @@ def check_result_named(self, check_name: str) -> AssetCheckResult:
return check_result

check.failed(f"Could not find check result named {check_name}")


class MaterializeResult(AssetResult):
"""An object representing a successful materialization of an asset. These can be returned from
@asset and @multi_asset decorated functions to pass metadata or specify specific assets were
materialized.
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the
corresponding AssetMaterialization event.
data_version (Optional[DataVersion]): The data version of the asset that was observed.
"""


@experimental
class ObserveResult(AssetResult):
"""An object representing a successful observation of an asset. These can be returned from
@asset and @multi_asset decorated functions to pass metadata or specify that specific assets were
observed. The @asset or @multi_asset must specify
"dagster/asset_execution_type": "OBSERVATION" in its metadata for this to
work.
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the
corresponding AssetObservation event.
data_version (Optional[DataVersion]): The data version of the asset that was observed.
"""
11 changes: 9 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DataVersion,
DataVersionsByPartition,
)
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey, Output
from dagster._core.definitions.metadata import (
ArbitraryMetadataMapping,
MetadataMapping,
Expand Down Expand Up @@ -59,6 +59,12 @@
# Going with this catch-all for the time-being to permit pythonic resources
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]

# This is a private key that is attached to the Output emitted from a source asset observation
# function and used to prevent observations from being auto-generated from it. This is a workaround
# because we cannot currently auto-convert the observation function to use `ObserveResult`. It can
# be removed when that conversion is completed.
SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION = "__source_asset_observation__"


def wrap_source_asset_observe_fn_in_op_compute_fn(
source_asset: "SourceAsset",
Expand All @@ -78,7 +84,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext) -> None:
def fn(context: OpExecutionContext) -> Output[None]:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down Expand Up @@ -124,6 +130,7 @@ def fn(context: OpExecutionContext) -> None:
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)
return Output(None, metadata={SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION: True})

return DecoratedOpFunction(fn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
from dagster._core.definitions.result import AssetResult, MaterializeResult, ObserveResult
from dagster._core.errors import (
DagsterExecutionStepExecutionError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -58,6 +58,7 @@
AssetCheckEvaluation,
AssetCheckResult,
MaterializeResult,
ObserveResult,
]


Expand Down Expand Up @@ -114,6 +115,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU
AssetCheckResult,
AssetCheckEvaluation,
MaterializeResult,
ObserveResult,
),
):
raise DagsterInvariantViolationError(
Expand Down Expand Up @@ -213,7 +215,7 @@ def execute_core_compute(
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
elif isinstance(step_output, MaterializeResult):
elif isinstance(step_output, AssetResult):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
from dagster._core.definitions.input import InputDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.result import MaterializeResult
from dagster._core.definitions.result import AssetResult, ObserveResult
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation
from dagster._utils import is_named_tuple_instance
Expand Down Expand Up @@ -168,10 +168,10 @@ def _filter_expected_output_defs(
result_tuple = (
(result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
asset_results = [x for x in result_tuple if isinstance(x, AssetResult)]
remove_outputs = [
r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key)
for x in materialize_results
for x in asset_results
for r in x.check_results or []
]
return [out for out in output_defs if out.name not in remove_outputs]
Expand Down Expand Up @@ -257,7 +257,8 @@ def validate_and_coerce_op_result_to_iterator(
"value. Check out the docs on logging events here: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/op-events#op-events-and-exceptions"
)
elif isinstance(result, AssetCheckResult):
# These don't correspond to output defs so pass them through
elif isinstance(result, (AssetCheckResult, ObserveResult)):
yield result
elif result is not None and not output_defs:
raise DagsterInvariantViolationError(
Expand Down Expand Up @@ -310,7 +311,7 @@ def validate_and_coerce_op_result_to_iterator(
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
)
elif isinstance(element, MaterializeResult):
elif isinstance(element, AssetResult):
yield element # coerced in to Output in outer iterator
elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
Expand Down
Loading

0 comments on commit 1670c1f

Please sign in to comment.