Skip to content

Commit

Permalink
[exploration] unexecutable assets not backed by jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Sep 20, 2023
1 parent 8fe8cdf commit 47435a4
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 50 deletions.
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ def is_asset_executable(self, asset_key: AssetKey) -> bool:
self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
)

def is_executable(self):
for key in self.keys:
if not self.is_asset_executable(key):
return False
return True

def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

Expand Down
28 changes: 17 additions & 11 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,45 @@ def is_base_asset_job_name(name: str) -> bool:


def get_base_asset_jobs(
assets: Sequence[AssetsDefinition],
assets_defs: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
# bucket the passed in AssetsDefinitions that are executable by partition.
# un-executable assets are intentionally omitted.
exe_assets_by_partitions_def: Dict[Optional[PartitionsDefinition], List[AssetsDefinition]] = (
defaultdict(list)
)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)
for assets_def in assets_defs:
if assets_def.is_executable():
exe_assets_by_partitions_def[assets_def.partitions_def].append(assets_def)

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
if observable.partitions_def not in exe_assets_by_partitions_def:
exe_assets_by_partitions_def[observable.partitions_def] = []

if len(exe_assets_by_partitions_def.keys()) == 0 or exe_assets_by_partitions_def.keys() == {
None
}:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets=exe_assets_by_partitions_def.get(None, []),
asset_checks=asset_checks,
source_assets=source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
unpartitioned_assets = exe_assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
k: v for k, v in exe_assets_by_partitions_def.items() if k is not None
}
jobs = []

Expand All @@ -102,7 +108,7 @@ def get_base_asset_jobs(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
source_assets=[*source_assets, *assets_defs],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]):
)

@multi_asset(specs=new_specs)
def an_asset() -> None:
def an_asset():
raise DagsterInvariantViolationError(
f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ def __init__(
sensors, "sensors", key_type=str, value_type=(SensorDefinition, FunctionType)
)
check.mapping_param(
source_assets_by_key, "source_assets_by_key", key_type=AssetKey, value_type=SourceAsset
source_assets_by_key,
"source_assets_by_key",
key_type=AssetKey, # value_type=SourceAsset
)
check.mapping_param(
assets_defs_by_key, "assets_defs_by_key", key_type=AssetKey, value_type=AssetsDefinition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_caching_repository_data_from_list(

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
assets_defs=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
resource_defs=top_level_resources,
Expand All @@ -202,6 +202,11 @@ def build_caching_repository_data_from_list(
jobs[job_def.name] = job_def

source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}
for assets_def in assets_defs:
if not assets_def.is_executable():
for key in assets_def.keys:
source_assets_by_key[key] = assets_def

assets_defs_by_key = {key: asset for asset in assets_defs for key in asset.keys}
else:
source_assets_by_key = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -1461,7 +1462,7 @@ def external_asset_checks_from_defs(

def external_asset_graph_from_defs(
job_defs: Sequence[JobDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
source_assets_by_key: Mapping[AssetKey, Union[SourceAsset, AssetsDefinition]],
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[AssetKey, List[Tuple[NodeOutputHandle, JobDefinition]]] = (
defaultdict(list)
Expand Down Expand Up @@ -1551,46 +1552,64 @@ def external_asset_graph_from_defs(
for asset_key in asset_keys_without_definitions
]

for source_asset in source_assets_by_key.values():
if source_asset.key not in node_defs_by_asset_key:
job_names = (
[
job_def.name
for job_def in job_defs
if source_asset.key in job_def.asset_layer.source_assets_by_key
and (
# explicit source-asset observation job
not job_def.asset_layer.has_assets_defs
# "base asset job" will have both source and materializable assets
or is_base_asset_job_name(job_def.name)
for key, source_asset in source_assets_by_key.items():
if isinstance(source_asset, SourceAsset):
if source_asset.key not in node_defs_by_asset_key:
job_names = (
[
job_def.name
for job_def in job_defs
if source_asset.key in job_def.asset_layer.source_assets_by_key
and (
source_asset.partitions_def is None
or source_asset.partitions_def == job_def.partitions_def
# explicit source-asset observation job
not job_def.asset_layer.has_assets_defs
# "base asset job" will have both source and materializable assets
or is_base_asset_job_name(job_def.name)
and (
source_asset.partitions_def is None
or source_asset.partitions_def == job_def.partitions_def
)
)
]
if source_asset.node_def is not None
else []
)
asset_nodes.append(
ExternalAssetNode(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
job_names=job_names,
op_description=source_asset.description,
metadata=source_asset.metadata,
group_name=source_asset.group_name,
is_source=True,
is_observable=source_asset.is_observable,
auto_observe_interval_minutes=source_asset.auto_observe_interval_minutes,
partitions_def_data=(
external_partitions_definition_from_def(source_asset.partitions_def)
if source_asset.partitions_def
else None
),
)
]
if source_asset.node_def is not None
else []
)
)
elif isinstance(source_asset, AssetsDefinition):
check.invariant(not source_asset.is_executable())
asset_nodes.append(
ExternalAssetNode(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
job_names=job_names,
op_description=source_asset.description,
metadata=source_asset.metadata,
group_name=source_asset.group_name,
is_source=True,
is_observable=source_asset.is_observable,
auto_observe_interval_minutes=source_asset.auto_observe_interval_minutes,
partitions_def_data=(
external_partitions_definition_from_def(source_asset.partitions_def)
if source_asset.partitions_def
else None
),
asset_key=key,
# need to build up these dep structures correctly
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
# should be enough to trigger
metadata=source_asset.metadata_by_key[key],
group_name=DEFAULT_GROUP_NAME,
# is_source=True,
# is_observable=source_asset.is_observable,
)
)
else:
check.failed(f'unexpected "source asset" {source_asset}')

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ def hourly_asset(): ...
def unpartitioned_asset(): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
daily_asset,
daily_asset2,
daily_asset_different_start_date,
Expand Down Expand Up @@ -2004,7 +2004,7 @@ def asset_b(): ...
def asset_x(asset_b: B): ...

jobs = get_base_asset_jobs(
assets=[
assets_defs=[
asset_x,
],
source_assets=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
Definitions,
_check as check,
asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.errors import DagsterInvalidSubsetError
from dagster._core.host_representation.external_data import external_asset_graph_from_defs


def test_observable_asset_basic_creation() -> None:
Expand Down Expand Up @@ -72,3 +75,35 @@ def test_observable_asset_creation_with_deps() -> None:
assert assets_def.asset_deps[expected_key] == {
AssetKey(["observable_asset_two"]),
}


def test_execute_job_that_includes_non_executable_asset() -> None:
upstream_asset = create_unexecutable_observable_assets_def(
specs=[AssetSpec("upstream_asset")],
)

@asset(deps=[upstream_asset])
def downstream_asset() -> None: ...

defs = Definitions(assets=[upstream_asset, downstream_asset])

assert defs.get_implicit_global_asset_job_def().execute_in_process().success

# ensure that explict selection fails
with pytest.raises(
DagsterInvalidSubsetError,
match=(
r'Assets provided in asset_selection argument \["upstream_asset"\] do not exist in'
r" parent asset group or job"
),
):
defs.get_implicit_global_asset_job_def().execute_in_process(
asset_selection=[AssetKey("upstream_asset")]
)

external_asset_nodes = external_asset_graph_from_defs(
[],
{upstream_asset.key: upstream_asset},
)
print(external_asset_nodes)
assert False

0 comments on commit 47435a4

Please sign in to comment.