Skip to content

Commit

Permalink
Add a new AssetExecutionType (Observable) and use it to omit material…
Browse files Browse the repository at this point in the history
…izations for observable source asset wrapping (#16621)

## Summary & Motivation

This adds a new execution type (Observation) which does not
automatically produce a materialization on execution. This allows us to
drive observation from a common execution pathway, rather than a
completely parallel system.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Oct 9, 2023
1 parent 38218ed commit 03584cc
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@


class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {AssetExecutionType.MATERIALIZATION}
return AssetExecutionType.str_to_enum(varietal_str) in {
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
}

@staticmethod
def str_to_enum(varietal_str: Optional[str]) -> "AssetExecutionType":
Expand Down
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dagster._annotations import experimental_param, public
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.freshness_policy import FreshnessPolicy
Expand Down Expand Up @@ -905,6 +906,16 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)

return AssetExecutionType.str_to_enum(
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
else {}
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}
)

kwargs = {
"key": source_asset.key,
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
TypeCheck,
)
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_version import (
CODE_VERSION_TAG,
DATA_VERSION_IS_USER_PROVIDED_TAG,
Expand Down Expand Up @@ -779,15 +780,38 @@ def _gen_fn():

asset_key, partitions = _asset_key_and_partitions_for_output(output_context)
if asset_key:
for materialization in _get_output_asset_materializations(
asset_key,
partitions,
output,
output_def,
manager_metadata,
step_context,
):
yield DagsterEvent.asset_materialization(step_context, materialization)
asset_layer = step_context.job_def.asset_layer
execution_type = (
asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key)
if asset_layer.has_assets_def_for_asset(asset_key)
else AssetExecutionType.MATERIALIZATION
)

check.invariant(
execution_type != AssetExecutionType.UNEXECUTABLE,
"There should never be unexecutable assets here",
)

check.invariant(
execution_type in {AssetExecutionType.MATERIALIZATION, AssetExecutionType.OBSERVATION},
f"Unexpected asset execution type {execution_type}",
)

yield from (
(
DagsterEvent.asset_materialization(step_context, materialization)
for materialization in _get_output_asset_materializations(
asset_key,
partitions,
output,
output_def,
manager_metadata,
step_context,
)
)
if execution_type == AssetExecutionType.MATERIALIZATION
else ()
)

yield DagsterEvent.handled_output(
step_context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def test_observable_source_asset_decorator() -> None:
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_external_asset_from_source_asset(an_observable_source_asset)])
assets_def = create_external_asset_from_source_asset(an_observable_source_asset)
assert assets_def.is_asset_executable(an_observable_source_asset.key)
defs = Definitions(assets=[assets_def])

instance = DagsterInstance.ephemeral()
result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)
Expand All @@ -225,6 +227,4 @@ def an_observable_source_asset() -> DataVersion:
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1
assert len(all_materializations) == 0

0 comments on commit 03584cc

Please sign in to comment.