Skip to content

Commit

Permalink
[external-assets] ObserveResult
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Nov 22, 2023
1 parent a933627 commit 5503f7f
Show file tree
Hide file tree
Showing 12 changed files with 743 additions and 65 deletions.
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@
make_values_resource as make_values_resource,
resource as resource,
)
from dagster._core.definitions.result import MaterializeResult as MaterializeResult
from dagster._core.definitions.result import (
MaterializeResult as MaterializeResult,
ObserveResult as ObserveResult,
)
from dagster._core.definitions.run_config import RunConfig as RunConfig
from dagster._core.definitions.run_request import (
AddDynamicPartitionsRequest as AddDynamicPartitionsRequest,
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def dagster_internal_init(
is_subset=is_subset,
)

def __call__(self, *args: object, **kwargs: object) -> object:
def __call__(self, *args: object, **kwargs: object) -> Any:
from .composition import is_in_composition
from .graph_definition import GraphDefinition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import Output
from dagster._core.definitions.source_asset import (
SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION,
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
Expand Down Expand Up @@ -137,12 +139,6 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
Expand Down Expand Up @@ -173,10 +169,11 @@ def _shim_assets_def(context: AssetExecutionContext):
op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
)
return return_value

check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from .inference import infer_output_props
from .input import In, InputDefinition
from .output import Out, OutputDefinition
from .result import MaterializeResult
from .result import MaterializeResult, ObserveResult

if TYPE_CHECKING:
from dagster._core.definitions.asset_layer import AssetLayer
Expand Down Expand Up @@ -574,4 +574,4 @@ def _validate_context_type_hint(fn):

def _is_result_object_type(ttype):
# Is this type special result object type
return ttype in (MaterializeResult, AssetCheckResult)
return ttype in (MaterializeResult, ObserveResult, AssetCheckResult)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Output,
)
from .output import DynamicOutputDefinition, OutputDefinition
from .result import MaterializeResult
from .result import MaterializeResult, ObserveResult

if TYPE_CHECKING:
from ..execution.context.invocation import BoundOpExecutionContext
Expand Down Expand Up @@ -326,21 +326,23 @@ def _resolve_inputs(
return input_dict


def _key_for_result(result: MaterializeResult, context: "BoundOpExecutionContext") -> AssetKey:
def _key_for_result(
result: Union[MaterializeResult, ObserveResult], context: "BoundOpExecutionContext"
) -> AssetKey:
if result.asset_key:
return result.asset_key

if len(context.assets_def.keys) == 1:
return next(iter(context.assets_def.keys))

raise DagsterInvariantViolationError(
"MaterializeResult did not include asset_key and it can not be inferred. Specify which"
f"{result.__class__.__name__} did not include asset_key and it can not be inferred. Specify which"
f" asset_key, options are: {context.assets_def.keys}"
)


def _output_name_for_result_obj(
event: MaterializeResult,
event: Union[MaterializeResult, ObserveResult],
context: "BoundOpExecutionContext",
):
asset_key = _key_for_result(event, context)
Expand All @@ -359,7 +361,7 @@ def _handle_gen_event(
(AssetMaterialization, AssetObservation, ExpectationResult),
):
return event
elif isinstance(event, MaterializeResult):
elif isinstance(event, (MaterializeResult, ObserveResult)):
output_name = _output_name_for_result_obj(event, context)
outputs_seen.add(output_name)
return event
Expand Down Expand Up @@ -466,7 +468,7 @@ def _type_check_function_output(
for event in validate_and_coerce_op_result_to_iterator(result, context, op_def.output_defs):
if isinstance(event, (Output, DynamicOutput)):
_type_check_output(output_defs_by_name[event.output_name], event, context)
elif isinstance(event, (MaterializeResult)):
elif isinstance(event, (MaterializeResult, ObserveResult)):
# ensure result objects are contextually valid
_output_name_for_result_obj(event, context)

Expand Down
59 changes: 59 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,65 @@ class MaterializeResult(
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the
corresponding AssetMaterialization event.
data_version (Optional[DataVersion]): The data version of the asset that was observed.
"""

def __new__(
cls,
*, # enforce kwargs
asset_key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[MetadataUserInput] = None,
check_results: Optional[Sequence[AssetCheckResult]] = None,
data_version: Optional[DataVersion] = None,
):
asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

return super().__new__(
cls,
asset_key=asset_key,
metadata=check.opt_nullable_mapping_param(
metadata,
"metadata",
key_type=str,
),
check_results=check.opt_sequence_param(
check_results, "check_results", of_type=AssetCheckResult
),
data_version=check.opt_inst_param(data_version, "data_version", DataVersion),
)

def check_result_named(self, check_name: str) -> AssetCheckResult:
for check_result in self.check_results:
if check_result.check_name == check_name:
return check_result

check.failed(f"Could not find check result named {check_name}")


@experimental
class ObserveResult(
NamedTuple(
"_ObserveResult",
[
("asset_key", PublicAttr[Optional[AssetKey]]),
("metadata", PublicAttr[Optional[MetadataUserInput]]),
("check_results", PublicAttr[Sequence[AssetCheckResult]]),
("data_version", PublicAttr[Optional[DataVersion]]),
],
)
):
"""An object representing a successful observation of an asset. These can be returned from
@asset and @multi_asset decorated functions to pass metadata or specify that specific assets were
observed.
Attributes:
asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to.
metadata (Optional[MetadataUserInput]): Metadata to record with the corresponding AssetMaterialization event.
check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the
corresponding AssetObservation event.
data_version (Optional[DataVersion]): The data version of the asset that was observed.
"""

def __new__(
Expand Down
11 changes: 9 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/source_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DataVersion,
DataVersionsByPartition,
)
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey, Output
from dagster._core.definitions.metadata import (
ArbitraryMetadataMapping,
MetadataMapping,
Expand Down Expand Up @@ -59,6 +59,12 @@
# Going with this catch-all for the time-being to permit pythonic resources
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]

# This is a private key that is attached to the Output emitted from a source asset observation
# function and used to prevent observations from being auto-generated from it. This is a workaround
# because we cannot currently auto-convert the observation function to use `ObserveResult`. It can
# be removed when that conversion is completed.
SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION = "__source_asset_observation__"


def wrap_source_asset_observe_fn_in_op_compute_fn(
source_asset: "SourceAsset",
Expand All @@ -78,7 +84,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext) -> None:
def fn(context: OpExecutionContext) -> Output[None]:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down Expand Up @@ -124,6 +130,7 @@ def fn(context: OpExecutionContext) -> None:
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)
return Output(None, metadata={SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION: True})

return DecoratedOpFunction(fn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
from dagster._core.definitions.result import MaterializeResult, ObserveResult
from dagster._core.errors import (
DagsterExecutionStepExecutionError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -58,6 +58,7 @@
AssetCheckEvaluation,
AssetCheckResult,
MaterializeResult,
ObserveResult,
]


Expand Down Expand Up @@ -114,6 +115,7 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU
AssetCheckResult,
AssetCheckEvaluation,
MaterializeResult,
ObserveResult,
),
):
raise DagsterInvariantViolationError(
Expand Down Expand Up @@ -213,7 +215,7 @@ def execute_core_compute(
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
elif isinstance(step_output, MaterializeResult):
elif isinstance(step_output, (MaterializeResult, ObserveResult)):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
from dagster._core.definitions.input import InputDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.result import MaterializeResult
from dagster._core.definitions.result import MaterializeResult, ObserveResult
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind, is_generic_output_annotation
from dagster._utils import is_named_tuple_instance
Expand Down Expand Up @@ -168,10 +168,12 @@ def _filter_expected_output_defs(
result_tuple = (
(result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
materialize_or_observe_results = [
x for x in result_tuple if isinstance(x, (MaterializeResult, ObserveResult))
]
remove_outputs = [
r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key)
for x in materialize_results
for x in materialize_or_observe_results
for r in x.check_results or []
]
return [out for out in output_defs if out.name not in remove_outputs]
Expand Down Expand Up @@ -257,7 +259,7 @@ def validate_and_coerce_op_result_to_iterator(
"value. Check out the docs on logging events here: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/op-events#op-events-and-exceptions"
)
elif isinstance(result, AssetCheckResult):
elif isinstance(result, (AssetCheckResult, ObserveResult)):
yield result
elif result is not None and not output_defs:
raise DagsterInvariantViolationError(
Expand Down Expand Up @@ -310,7 +312,7 @@ def validate_and_coerce_op_result_to_iterator(
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
)
elif isinstance(element, MaterializeResult):
elif isinstance(element, (MaterializeResult, ObserveResult)):
yield element # coerced in to Output in outer iterator
elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
Expand Down
Loading

0 comments on commit 5503f7f

Please sign in to comment.