Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external-assets] Allow asset jobs to combine materializations and observations #19667

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,6 @@ def resolve(
"""Resolve this UnresolvedAssetJobDefinition into a JobDefinition."""
assets = asset_graph.assets_defs
selected_asset_keys = self.selection.resolve(asset_graph)

if (
len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0
and len(selected_asset_keys & asset_graph.external_asset_keys) > 0
):
raise DagsterInvalidDefinitionError(
f"Asset selection for job '{self.name}' specified both regular assets and external "
"assets. This is not currently supported. Selections must be all regular "
"assets or all source assets.",
)

selected_asset_checks = self.selection.resolve_checks(asset_graph)

asset_keys_by_partitions_def = defaultdict(set)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_selection import AssetSelection, CoercibleToAssetSelection
from dagster._core.definitions.assets_job import get_base_asset_jobs
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.dependency import NodeHandle, NodeInvocation
from dagster._core.definitions.executor_definition import in_process_executor
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
Expand Down Expand Up @@ -2876,3 +2877,36 @@ def ghost():
).get_job_def("foo_job")

assert foo_job.execute_in_process().success


def test_mixed_asset_job():
with disable_dagster_warnings():

class MyIOManager(IOManager):
def handle_output(self, context, obj):
pass

def load_input(self, context):
return 5

@observable_source_asset
def foo():
return DataVersion("alpha")

@asset
def bar(foo):
return foo + 1

defs = Definitions(
assets=[foo, bar],
jobs=[define_asset_job("mixed_assets_job", [foo, bar])],
resources={"io_manager": MyIOManager()},
)

job_def = defs.get_job_def("mixed_assets_job")
result = job_def.execute_in_process()
assert result.success
assert len(result.asset_materializations_for_node("foo")) == 0
assert len(result.asset_observations_for_node("foo")) == 1
assert len(result.asset_materializations_for_node("bar")) == 1
assert len(result.asset_observations_for_node("bar")) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,6 @@ def baz():
)


def test_mixed_source_asset_observation_job():
@observable_source_asset
def foo(_context) -> DataVersion:
return DataVersion("alpha")

@asset(deps=["foo"])
def bar(context):
return 1

with pytest.raises(
DagsterInvalidDefinitionError, match=r"specified both regular assets and external assets"
):
Definitions(
assets=[foo, bar],
jobs=[define_asset_job("mixed_job", [foo, bar])],
).get_all_job_defs()


@pytest.mark.parametrize(
"is_valid,resource_defs",
[(True, {"bar": ResourceDefinition.hardcoded_resource("bar")}), (False, {})],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
AssetsDefinition,
DagsterInvalidDefinitionError,
DailyPartitionsDefinition,
Definitions,
GraphDefinition,
IOManager,
JobDefinition,
Expand Down Expand Up @@ -1482,23 +1481,3 @@ def repo():
AutoMaterializeSensorDefinition("a", asset_selection=[asset1]),
AutoMaterializeSensorDefinition("b", asset_selection=[asset1, asset2]),
]


def test_invalid_asset_selection():
source_asset = SourceAsset("source_asset")

@asset
def asset1(): ...

@sensor(asset_selection=[source_asset, asset1])
def sensor1(): ...

Definitions(assets=[source_asset, asset1], sensors=[sensor1])

with pytest.raises(
DagsterInvalidDefinitionError, match="specified both regular assets and external"
):
Definitions(
assets=[source_asset, asset1],
jobs=[define_asset_job("foo", selection=[source_asset, asset1])],
).get_all_job_defs()
Loading