Skip to content

Commit

Permalink
Wrap observable source assets in AssetsDefinition
Browse files Browse the repository at this point in the history
cp

explicit use ephem instnace
  • Loading branch information
schrockn committed Sep 22, 2023
1 parent b824aa3 commit b5aeae1
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, experimental_param, public
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX, SYSTEM_TAG_PREFIX
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
Expand Down Expand Up @@ -481,6 +481,10 @@ def __new__(
def label(self) -> str:
return " ".join(self.asset_key.path)

@property
def data_version(self) -> Optional[str]:
return self.tags.get(DATA_VERSION_TAG)


UNDEFINED_ASSET_KEY_PATH = ["__undefined__"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
AssetSpec,
)
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.source_asset import (
SourceAsset,
wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import OpExecutionContext


def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
Expand Down Expand Up @@ -53,21 +57,24 @@ def an_asset() -> None:
return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
def create_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
"Schedulable observable source assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {}
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
Expand All @@ -79,8 +86,22 @@ def create_unexecutable_observable_assets_def_from_source_asset(source_asset: So
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

kwargs["partitions_def"] = source_asset.partitions_def

if source_asset.observe_fn:
kwargs["resource_defs"] = source_asset.resource_defs

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
def shim_asset(context: OpExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
return_value is None,
"The wrapped decorated_fn should return a value. If this changes, this code path must"
" changed to process the events appopriately.",
)

return shim_asset
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn(

observe_fn_has_context = is_context_provided(get_function_params(observe_fn))

def fn(context: OpExecutionContext):
def fn(context: OpExecutionContext) -> None:
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def asset_observations_for_node(self, node_name: str) -> Sequence[AssetObservati
def get_asset_materialization_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_step_materialization]

def get_asset_observation_events(self) -> Sequence[DagsterEvent]:
return [event for event in self.all_events if event.is_asset_observation]

def get_asset_check_evaluations(self) -> Sequence[AssetCheckEvaluation]:
return [
cast(AssetCheckEvaluation, event.event_specific_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import (
create_assets_def_from_source_asset,
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition

Expand Down Expand Up @@ -112,7 +114,7 @@ def an_asset(source_asset: str) -> str:
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand Down Expand Up @@ -175,9 +177,9 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset)
shimmed_source_asset = create_assets_def_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
assets=[create_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)
Expand All @@ -193,3 +195,27 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed-2021-01-03"


def test_observable_source_asset_decorator() -> None:
@observable_source_asset
def an_observable_source_asset() -> DataVersion:
return DataVersion("foo")

defs = Definitions(assets=[create_assets_def_from_source_asset(an_observable_source_asset)])

instance = DagsterInstance.ephemeral()
result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance)

assert result.success
assert result.output_for_node("an_observable_source_asset") is None

all_observations = result.get_asset_observation_events()
assert len(all_observations) == 1
observation_event = all_observations[0]
assert observation_event.asset_observation_data.asset_observation.data_version == "foo"

all_materializations = result.get_asset_materialization_events()
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1

0 comments on commit b5aeae1

Please sign in to comment.