Skip to content

Commit

Permalink
Source Asset -> Unexecutable Asset Adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 19, 2023
1 parent 373510e commit dee2615
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.execution.with_resources import with_resources
from dagster._core.executor.base import Executor
Expand Down Expand Up @@ -540,6 +541,14 @@ def get_implicit_job_def_for_assets(
) -> Optional[JobDefinition]:
return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys)

def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition:
asset_key = AssetKey.from_coercible(key)
for assets_def in self.get_asset_graph().assets:
if asset_key in assets_def.keys:
return assets_def

raise DagsterInvariantViolationError(f"Could not find asset {asset_key}")

@cached_method
def get_repository_def(self) -> RepositoryDefinition:
"""Definitions is implemented by wrapping RepositoryDefinition. Get that underlying object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
AssetSpec,
AssetVarietal,
)
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvariantViolationError


Expand Down Expand Up @@ -46,3 +47,39 @@ def an_asset() -> None:
)

return an_asset


def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset):
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.partitions_def is None,
"Observable source assets not supported yet: partitions_def should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_VARIETAL: AssetVarietal.UNEXECUTABLE.value},
},
"group_name": source_asset.group_name,
"description": source_asset.description,
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def shim_asset() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

return shim_asset
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
DagsterInstance,
Definitions,
IOManager,
SourceAsset,
_check as check,
asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.observable_asset import (
create_unexecutable_observable_assets_def,
create_unexecutable_observable_assets_def_from_source_asset,
)


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -72,3 +79,46 @@ def test_observable_asset_creation_with_deps() -> None:
assert assets_def.asset_deps[expected_key] == {
AssetKey(["observable_asset_two"]),
}


def test_how_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
pass

def load_input(self, context) -> str:
return "hardcoded"

source_asset = SourceAsset(key="source_asset", io_manager_def=DummyIOManager())

@asset
def an_asset(source_asset: str) -> str:
return "hardcoded" + "-computed"

defs_with_source = Definitions(assets=[source_asset, an_asset])

instance = DagsterInstance.ephemeral()

result_one = defs_with_source.get_implicit_global_asset_job_def().execute_in_process(
instance=instance
)

assert result_one.success
assert result_one.output_for_node("an_asset") == "hardcoded-computed"

defs_with_shim = Definitions(
assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset]
)

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

result_two = defs_with_shim.get_implicit_global_asset_job_def().execute_in_process(
instance=instance,
# currently we have to explicitly select the asset to exclude the source from execution
asset_selection=[
AssetKey("an_asset")
],
)

assert result_two.success
assert result_two.output_for_node("an_asset") == "hardcoded-computed"

0 comments on commit dee2615

Please sign in to comment.