Skip to content

Commit

Permalink
snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 21, 2024
1 parent 78e4dc5 commit 72747d0
Show file tree
Hide file tree
Showing 9 changed files with 2,711 additions and 3,270 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4826,94 +4826,6 @@
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
'name': 'unexecutable_asset',
}),
'invocations': list([
dict({
'pipeline': dict({
'name': '__ASSET_JOB_0',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_1',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_2',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_3',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_4',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_5',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_6',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_7',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_8',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_9',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
Expand Down
19 changes: 8 additions & 11 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,24 +894,21 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

included_observables = [asset for asset in included_source_assets if asset.is_observable]
final_assets = [*included_assets, *included_observables]
final_asset_checks = included_checks_defs
final_source_assets = [
*(
source_asset
for source_asset in source_assets
if source_asset not in included_observables
),
all_included_assets = [*included_assets, *included_source_assets]
executable_assets = [asset for asset in all_included_assets if asset.is_executable]
loadable_assets = [
*(asset for asset in all_included_assets if not asset.is_executable),
*(asset for asset in source_assets if asset not in included_source_assets),
*excluded_assets,
]
final_asset_checks = included_checks_defs

return build_assets_job(
name=name,
executable_assets=final_assets,
executable_assets=executable_assets,
asset_checks=final_asset_checks,
config=config,
loadable_assets=final_source_assets,
loadable_assets=loadable_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
6 changes: 1 addition & 5 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,7 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:
def execution_type(self) -> AssetExecutionType:
value = self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if value is None:
return (
AssetExecutionType.UNEXECUTABLE
if len(self.keys) == 0
else AssetExecutionType.MATERIALIZATION
)
return AssetExecutionType.MATERIALIZATION
elif isinstance(value, str):
return AssetExecutionType[value]
else:
Expand Down
25 changes: 16 additions & 9 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ def asset2(asset1):

check.str_param(name, "name")
check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset))
for asset in executable_assets:
if not asset.is_executable:
keys = [asset.key] if isinstance(asset, SourceAsset) else asset.keys
check.failed(f"Passed unexecutable keys to executable_assets: {keys}")

loadable_assets = check.opt_sequence_param(
loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
Expand All @@ -194,25 +199,27 @@ def asset2(asset1):
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)]
source_assets = [asset for asset in executable_assets if isinstance(asset, SourceAsset)]
resolved_source_assets = [
asset for asset in executable_assets if isinstance(asset, SourceAsset)
]
for asset in loadable_assets or []:
if isinstance(asset, AssetsDefinition):
source_assets += asset.to_source_assets()
resolved_source_assets += asset.to_source_assets()
elif isinstance(asset, SourceAsset):
source_assets.append(asset)
resolved_source_assets.append(asset)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
)

# attempt to resolve cycles using multi-asset subsetting
if _has_cycles(deps):
assets = _attempt_resolve_cycles(assets, source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)
assets = _attempt_resolve_cycles(assets, resolved_source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)

deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
Expand All @@ -227,7 +234,7 @@ def asset2(asset1):
else:
node_defs = []
observable_source_assets_by_node_handle: Mapping[NodeHandle, SourceAsset] = {}
for asset in source_assets:
for asset in resolved_source_assets:
if (
isinstance(asset, SourceAsset)
and asset.is_observable
Expand All @@ -250,14 +257,14 @@ def asset2(asset1):
asset_layer = AssetLayer.from_graph_and_assets_node_mapping(
graph_def=graph,
asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle,
source_assets=source_assets,
source_assets=resolved_source_assets,
resolved_asset_deps=resolved_asset_deps,
assets_defs_by_outer_node_handle=assets_defs_by_node_handle,
observable_source_assets_by_node_handle=observable_source_assets_by_node_handle,
)

all_resource_defs = get_all_resource_defs(
assets, asset_checks, source_assets, wrapped_resource_defs
assets, asset_checks, resolved_source_assets, wrapped_resource_defs
)

if _asset_selection_data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ def external_repository_data_from_def(
resource_datas = repository_def.get_top_level_resources()
asset_graph = external_asset_nodes_from_defs(
jobs,
assets_defs_by_key=repository_def.assets_defs_by_key,
source_assets_by_key=repository_def.source_assets_by_key,
)

Expand Down Expand Up @@ -1555,6 +1556,7 @@ def external_asset_checks_from_defs(

def external_asset_nodes_from_defs(
job_defs: Sequence[JobDefinition],
assets_defs_by_key: Mapping[AssetKey, AssetsDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[
Expand Down Expand Up @@ -1781,6 +1783,31 @@ def external_asset_nodes_from_defs(
)
)

# Ensure any external assets that are have no nodes in any job are included in the asset graph
for asset in assets_defs_by_key.values():
for key in [key for key in asset.keys if key not in node_defs_by_asset_key]:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
execution_type=asset.execution_type,
job_names=[],
op_description=asset.descriptions_by_key.get(key),
metadata=asset.metadata_by_key.get(key),
group_name=asset.group_names_by_key.get(key),
is_source=True,
is_observable=asset.is_observable,
auto_observe_interval_minutes=asset.auto_observe_interval_minutes,
partitions_def_data=(
external_partitions_definition_from_def(asset.partitions_def)
if asset.partitions_def
else None
),
freshness_policy=asset.freshness_policies_by_key.get(key),
)
)

defined = set()
for node in asset_nodes:
if node.asset_key in defined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def repo():
return assets + (asset_checks or [])

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key={}
repo.get_all_jobs(), repo.assets_defs_by_key, source_assets_by_key={}
)
return ExternalAssetGraph.from_repository_handles_and_external_asset_nodes(
[(MagicMock(), asset_node) for asset_node in external_asset_nodes],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,9 @@ def external_asset_graph_from_assets_by_repo_name(
repo = Definitions(assets=assets).get_repository_def()

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(),
repo.assets_defs_by_key,
source_assets_by_key=repo.source_assets_by_key,
)
repo_handle = MagicMock(repository_name=repo_name)
from_repository_handles_and_external_asset_nodes.extend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _get_external_asset_nodes_from_definitions(
) -> Sequence[ExternalAssetNode]:
repo = defs.get_repository_def()
return external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(), repo.assets_defs_by_key, source_assets_by_key=repo.source_assets_by_key
)


Expand Down

0 comments on commit 72747d0

Please sign in to comment.