Skip to content

Commit

Permalink
[external-assets] remove cruft
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 12, 2024
1 parent da95774 commit 5e77ea6
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 187 deletions.
2 changes: 0 additions & 2 deletions python_modules/dagster/dagster/_cli/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,

asset_keys = parse_asset_selection(
assets_defs=list(repo_def.assets_defs_by_key.values()),
source_assets=list(repo_def.source_assets_by_key.values()),
asset_selection=kwargs["select"].split(","),
)

Expand Down Expand Up @@ -97,7 +96,6 @@ def asset_list_command(**kwargs):
if select is not None:
asset_keys = parse_asset_selection(
assets_defs=list(repo_def.assets_defs_by_key.values()),
source_assets=list(repo_def.source_assets_by_key.values()),
asset_selection=select.split(","),
raise_on_clause_has_no_matches=False,
)
Expand Down
105 changes: 37 additions & 68 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from .asset_check_spec import AssetCheckKey
from .asset_checks import AssetChecksDefinition
from .assets import AssetsDefinition
from .assets import AssetsDefinition, normalize_assets
from .backfill_policy import BackfillPolicy
from .events import AssetKey, AssetKeyPartitionKey
from .freshness_policy import FreshnessPolicy
Expand Down Expand Up @@ -76,7 +76,6 @@ class AssetGraph:
def __init__(
self,
asset_dep_graph: DependencyGraph[AssetKey],
source_asset_keys: AbstractSet[AssetKey],
partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]],
partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]],
group_names_by_key: Mapping[AssetKey, Optional[str]],
Expand All @@ -91,7 +90,6 @@ def __init__(
execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
self._partitions_defs_by_key = partitions_defs_by_key
self._partition_mappings_by_key = partition_mappings_by_key
self._group_names_by_key = group_names_by_key
Expand All @@ -100,7 +98,6 @@ def __init__(
self._backfill_policies_by_key = backfill_policies_by_key
self._code_versions_by_key = code_versions_by_key
self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key
# source assets keys can sometimes appear in the upstream dict
self._required_assets_and_checks_by_key = required_assets_and_checks_by_key
self._execution_types_by_key = execution_types_by_key

Expand Down Expand Up @@ -144,10 +141,6 @@ def external_asset_keys(self) -> AbstractSet[AssetKey]:
if AssetExecutionType.MATERIALIZATION not in v
}

@property
def source_asset_keys(self) -> AbstractSet[AssetKey]:
return self._source_asset_keys

@functools.cached_property
def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable asset keys that have no materializable parents."""
Expand Down Expand Up @@ -192,8 +185,9 @@ def from_assets(
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
)

all_assets = normalize_assets(all_assets)

assets_defs: List[AssetsDefinition] = []
source_assets: List[SourceAsset] = []
partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {}
partition_mappings_by_key: Dict[
AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]
Expand All @@ -210,59 +204,43 @@ def from_assets(
] = {}

for asset in all_assets:
if isinstance(asset, SourceAsset):
source_assets.append(asset)
partitions_defs_by_key[asset.key] = asset.partitions_def
group_names_by_key[asset.key] = asset.group_name
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = (
[AssetExecutionType.OBSERVATION] if asset.is_observable else []
)
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
{key: asset.partition_mappings for key in asset.keys}
assets_defs.append(asset)
partition_mappings_by_key.update({key: asset.partition_mappings for key in asset.keys})
partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys})
group_names_by_key.update(asset.group_names_by_key)
freshness_policies_by_key.update(asset.freshness_policies_by_key)
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = (
[]
if asset.execution_type == AssetExecutionType.UNEXECUTABLE
else [asset.execution_type]
)
partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys})
group_names_by_key.update(asset.group_names_by_key)
freshness_policies_by_key.update(asset.freshness_policies_by_key)
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = (
[]
if asset.execution_type == AssetExecutionType.UNEXECUTABLE
else [asset.execution_type]
)

# Set auto_observe_interval_minutes for external observable assets
# This can be removed when/if we have a a solution for mapping
# `auto_observe_interval_minutes` to an AutoMaterialzePolicy
first_key = next(iter(asset.keys), None)
if (
first_key
and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
in asset.metadata_by_key[first_key]
):
interval = asset.metadata_by_key[first_key][
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
]
auto_observe_interval_minutes_by_key.update(
{key: interval for key in asset.keys}
)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
if len(all_required_keys) > 1:
for key in all_required_keys:
required_assets_and_checks_by_key[key] = all_required_keys

# Set auto_observe_interval_minutes for external observable assets
# This can be removed when/if we have a a solution for mapping
# `auto_observe_interval_minutes` to an AutoMaterialzePolicy
first_key = next(iter(asset.keys), None)
if (
first_key
and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
in asset.metadata_by_key[first_key]
):
interval = asset.metadata_by_key[first_key][
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
]
auto_observe_interval_minutes_by_key.update({key: interval for key in asset.keys})

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
if len(all_required_keys) > 1:
for key in all_required_keys:
required_assets_and_checks_by_key[key] = all_required_keys

return InternalAssetGraph(
asset_dep_graph=generate_asset_dep_graph(assets_defs, source_assets),
source_asset_keys={source_asset.key for source_asset in source_assets},
asset_dep_graph=generate_asset_dep_graph(assets_defs),
partitions_defs_by_key=partitions_defs_by_key,
partition_mappings_by_key=partition_mappings_by_key,
group_names_by_key=group_names_by_key,
Expand All @@ -271,7 +249,6 @@ def from_assets(
backfill_policies_by_key=backfill_policies_by_key,
assets=assets_defs,
asset_checks=asset_checks or [],
source_assets=source_assets,
code_versions_by_key=code_versions_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
Expand Down Expand Up @@ -820,15 +797,13 @@ class InternalAssetGraph(AssetGraph):
def __init__(
self,
asset_dep_graph: DependencyGraph[AssetKey],
source_asset_keys: AbstractSet[AssetKey],
partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]],
partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]],
group_names_by_key: Mapping[AssetKey, Optional[str]],
freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]],
auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]],
backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]],
assets: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Sequence[AssetChecksDefinition],
code_versions_by_key: Mapping[AssetKey, Optional[str]],
auto_observe_interval_minutes_by_key: Mapping[AssetKey, Optional[float]],
Expand All @@ -839,7 +814,6 @@ def __init__(
):
super().__init__(
asset_dep_graph=asset_dep_graph,
source_asset_keys=source_asset_keys,
partitions_defs_by_key=partitions_defs_by_key,
partition_mappings_by_key=partition_mappings_by_key,
group_names_by_key=group_names_by_key,
Expand All @@ -852,7 +826,6 @@ def __init__(
execution_types_by_key=execution_types_by_key,
)
self._assets = assets
self._source_assets = source_assets
self._asset_checks = asset_checks

asset_check_keys = set()
Expand All @@ -870,10 +843,6 @@ def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
def assets(self) -> Sequence[AssetsDefinition]:
return self._assets

@property
def source_assets(self) -> Sequence[SourceAsset]:
return self._source_assets

@property
def asset_checks(self) -> Sequence[AssetChecksDefinition]:
return self._asset_checks
Expand Down
7 changes: 7 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,3 +1556,10 @@ def get_self_dep_time_window_partition_mapping(

return time_partition_mapping.partition_mapping
return None


def normalize_assets(
assets: Iterable[Union[AssetsDefinition, SourceAsset]],
) -> Sequence[AssetsDefinition]:
"""Utility function to convert a mixed list of AssetsDefinition and SourceAsset into all AssetsDefinition."""
return [a.to_assets_def() if isinstance(a, SourceAsset) else a for a in assets]
52 changes: 19 additions & 33 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from .asset_checks import AssetChecksDefinition
from .asset_layer import AssetLayer
from .assets import AssetsDefinition
from .assets import AssetsDefinition, normalize_assets
from .config import ConfigMapping
from .dependency import (
BlockingAssetChecksDependencyDefinition,
Expand Down Expand Up @@ -140,11 +140,10 @@ def build_assets_job(
Args:
name (str): The name of the job.
assets (List[AssetsDefinition]): A list of assets or
multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset`
decorator.
source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
assets that are not materialized by this job, but that assets in this job depend on.
assets_to_execute (Sequence[AssetsDefinition, SourceAsset]): The assets that will be
materialized or observed by the job. Passed `SourceAsset` objects must be observable.
other_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): Assets that will
not be materialized or observed, but that can be loaded as inputs for executed assets.
resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in
this job.
description (Optional[str]): A description of the job.
Expand All @@ -167,13 +166,16 @@ def asset2(asset1):
Returns:
JobDefinition: A job that materializes the given assets.
"""
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from dagster._core.execution.build_resources import wrap_resources_for_execution

check.str_param(name, "name")
check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset))
other_assets = check.opt_sequence_param(
other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
assets_to_execute = normalize_assets(
check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset))
)
other_assets = normalize_assets(
check.opt_sequence_param(
other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
)
asset_checks = check.opt_sequence_param(
asset_checks, "asset_checks", of_type=AssetChecksDefinition
Expand All @@ -188,16 +190,6 @@ def asset2(asset1):
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

# normalize SourceAssets to AssetsDefinition (external assets)
assets_to_execute = [
create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a
for a in assets_to_execute
]
other_assets = [
create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a
for a in other_assets or []
]

all_assets = [*assets_to_execute, *other_assets]

resolved_asset_deps = ResolvedAssetDependencies(
Expand Down Expand Up @@ -245,7 +237,7 @@ def asset2(asset1):
resolved_asset_deps=resolved_asset_deps,
)

all_resource_defs = get_all_resource_defs(all_assets, asset_checks, [], wrapped_resource_defs)
all_resource_defs = get_all_resource_defs(all_assets, asset_checks, wrapped_resource_defs)

if _asset_selection_data:
original_job = _asset_selection_data.parent_job_def
Expand Down Expand Up @@ -506,7 +498,7 @@ def _attempt_resolve_cycles(
"""
from dagster._core.selector.subset_selector import generate_asset_dep_graph

asset_deps = generate_asset_dep_graph([*assets_to_execute, *other_assets], [])
asset_deps = generate_asset_dep_graph([*assets_to_execute, *other_assets])
assets_to_execute_by_asset_key = {k: ad for ad in assets_to_execute for k in ad.keys}

# color for each asset
Expand Down Expand Up @@ -562,13 +554,11 @@ def _dfs(key, cur_color):

def _ensure_resources_dont_conflict(
assets: Iterable[AssetsDefinition],
source_assets: Sequence[SourceAsset],
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that resources between assets, source assets, and provided resource dictionary do not conflict."""
resource_defs_from_assets = {}
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for asset in assets:
for resource_key, resource_def in asset.resource_defs.items():
if resource_key not in resource_defs_from_assets:
resource_defs_from_assets[resource_key] = resource_def
Expand All @@ -595,18 +585,16 @@ def _ensure_resources_dont_conflict(

def check_resources_satisfy_requirements(
assets: Iterable[AssetsDefinition],
source_assets: Sequence[SourceAsset],
asset_checks: Iterable[AssetChecksDefinition],
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that between the provided resources on an asset and the resource_defs mapping, that all resource requirements are satisfied.
Note that resources provided on assets cannot satisfy resource requirements provided on other assets.
"""
_ensure_resources_dont_conflict(assets, source_assets, resource_defs)
_ensure_resources_dont_conflict(assets, resource_defs)

all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for asset in assets:
ensure_requirements_satisfied(
merge_dicts(resource_defs, asset.resource_defs), list(asset.get_resource_requirements())
)
Expand All @@ -620,15 +608,13 @@ def check_resources_satisfy_requirements(
def get_all_resource_defs(
assets: Iterable[AssetsDefinition],
asset_checks: Iterable[AssetChecksDefinition],
source_assets: Sequence[SourceAsset],
resource_defs: Mapping[str, ResourceDefinition],
) -> Mapping[str, ResourceDefinition]:
# Ensures that no resource keys conflict, and each asset has its resource requirements satisfied.
check_resources_satisfy_requirements(assets, source_assets, asset_checks, resource_defs)
check_resources_satisfy_requirements(assets, asset_checks, resource_defs)

all_resource_defs = dict(resource_defs)
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for asset in assets:
all_resource_defs = merge_dicts(all_resource_defs, asset.resource_defs)

for asset_check in asset_checks:
Expand Down
Loading

0 comments on commit 5e77ea6

Please sign in to comment.