Skip to content

Commit

Permalink
[external-assets] Allow mixed asset jobs
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes]
  • Loading branch information
smackesey committed Mar 5, 2024
1 parent 6596798 commit ec160de
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,6 @@ def resolve(
"""Resolve this UnresolvedAssetJobDefinition into a JobDefinition."""
assets = asset_graph.assets_defs
selected_asset_keys = self.selection.resolve(asset_graph)

if (
len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0
and len(selected_asset_keys & asset_graph.external_asset_keys) > 0
):
raise DagsterInvalidDefinitionError(
f"Asset selection for job '{self.name}' specified both regular assets and external "
"assets. This is not currently supported. Selections must be all regular "
"assets or all source assets.",
)

selected_asset_checks = self.selection.resolve_checks(asset_graph)

asset_keys_by_partitions_def = defaultdict(set)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_selection import AssetSelection, CoercibleToAssetSelection
from dagster._core.definitions.assets_job import get_base_asset_jobs
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.dependency import NodeHandle, NodeInvocation
from dagster._core.definitions.executor_definition import in_process_executor
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
Expand Down Expand Up @@ -2876,3 +2877,36 @@ def ghost():
).get_job_def("foo_job")

assert foo_job.execute_in_process().success


def test_mixed_asset_job():
with disable_dagster_warnings():

class MyIOManager(IOManager):
def handle_output(self, context, obj):
pass

def load_input(self, context):
return 5

@observable_source_asset
def foo():
return DataVersion("alpha")

@asset
def bar(foo):
return foo + 1

defs = Definitions(
assets=[foo, bar],
jobs=[define_asset_job("mixed_assets_job", [foo, bar])],
resources={"io_manager": MyIOManager()},
)

job_def = defs.get_job_def("mixed_assets_job")
result = job_def.execute_in_process()
assert result.success
assert len(result.asset_materializations_for_node("foo")) == 0
assert len(result.asset_observations_for_node("foo")) == 1
assert len(result.asset_materializations_for_node("bar")) == 1
assert len(result.asset_observations_for_node("bar")) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,6 @@ def baz():
)


def test_mixed_source_asset_observation_job():
@observable_source_asset
def foo(_context) -> DataVersion:
return DataVersion("alpha")

@asset(deps=["foo"])
def bar(context):
return 1

with pytest.raises(
DagsterInvalidDefinitionError, match=r"specified both regular assets and external assets"
):
Definitions(
assets=[foo, bar],
jobs=[define_asset_job("mixed_job", [foo, bar])],
).get_all_job_defs()


@pytest.mark.parametrize(
"is_valid,resource_defs",
[(True, {"bar": ResourceDefinition.hardcoded_resource("bar")}), (False, {})],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
AssetsDefinition,
DagsterInvalidDefinitionError,
DailyPartitionsDefinition,
Definitions,
GraphDefinition,
IOManager,
JobDefinition,
Expand Down Expand Up @@ -1482,23 +1481,3 @@ def repo():
AutoMaterializeSensorDefinition("a", asset_selection=[asset1]),
AutoMaterializeSensorDefinition("b", asset_selection=[asset1, asset2]),
]


def test_invalid_asset_selection():
source_asset = SourceAsset("source_asset")

@asset
def asset1(): ...

@sensor(asset_selection=[source_asset, asset1])
def sensor1(): ...

Definitions(assets=[source_asset, asset1], sensors=[sensor1])

with pytest.raises(
DagsterInvalidDefinitionError, match="specified both regular assets and external"
):
Definitions(
assets=[source_asset, asset1],
jobs=[define_asset_job("foo", selection=[source_asset, asset1])],
).get_all_job_defs()

0 comments on commit ec160de

Please sign in to comment.