Skip to content

Commit

Permalink
fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 12, 2024
1 parent 97c2d41 commit 08255d1
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame:

class SmokeIOManager(InMemoryIOManager):
def load_input(self, context):
if context.asset_key not in context.step_context.job_def.asset_layer.asset_keys:
if context.asset_key not in context.step_context.job_def.asset_layer.target_asset_keys:
column_schema = context.upstream_output.metadata["column_schema"]
return empty_dataframe_from_column_schema(column_schema)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,11 +1052,9 @@ def test_asset_node_is_executable(self, graphql_context: WorkspaceRequestContext
assert result.data
assert result.data["assetNodes"]

assert len(result.data["assetNodes"]) == 2
assert len(result.data["assetNodes"]) == 1
exec_asset_node = result.data["assetNodes"][0]
assert exec_asset_node["isExecutable"] is True
unexec_asset_node = result.data["assetNodes"][1]
assert unexec_asset_node["isExecutable"] is False

def test_asset_partitions_in_pipeline(self, graphql_context: WorkspaceRequestContext):
selector = infer_job_selector(graphql_context, "two_assets_job")
Expand Down
37 changes: 19 additions & 18 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
Expand Down Expand Up @@ -118,34 +118,30 @@ def all_asset_keys(self) -> AbstractSet[AssetKey]:

@property
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if v != AssetExecutionType.UNEXECUTABLE
}
return {k for k, v in self._execution_types_by_key.items() if len(v) > 0}

@property
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.MATERIALIZATION
if AssetExecutionType.MATERIALIZATION in v
}

@property
def observable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.OBSERVATION
if AssetExecutionType.OBSERVATION in v
}

@property
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if v != AssetExecutionType.MATERIALIZATION
if AssetExecutionType.MATERIALIZATION not in v
}

@property
Expand All @@ -160,7 +156,7 @@ def root_asset_keys(self) -> AbstractSet[AssetKey]:
return AssetSelection.keys(*self.materializable_asset_keys).roots().resolve(self)

@functools.cached_property
def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]:
def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable or observable source asset keys that have no parents which are
materializable or observable.
"""
Expand All @@ -181,7 +177,7 @@ def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]
return self._backfill_policies_by_key

@property
def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]:
def execution_types_by_key(self) -> Mapping[AssetKey, Sequence[AssetExecutionType]]:
return self._execution_types_by_key

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
Expand All @@ -203,7 +199,7 @@ def from_assets(
AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]
] = {}
group_names_by_key: Dict[AssetKey, Optional[str]] = {}
execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}
execution_types_by_key: Dict[AssetKey, List[AssetExecutionType]] = {}
freshness_policies_by_key: Dict[AssetKey, Optional[FreshnessPolicy]] = {}
auto_materialize_policies_by_key: Dict[AssetKey, Optional[AutoMaterializePolicy]] = {}
backfill_policies_by_key: Dict[AssetKey, Optional[BackfillPolicy]] = {}
Expand All @@ -221,7 +217,9 @@ def from_assets(
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = AssetExecutionType.OBSERVATION if asset.is_observable else AssetExecutionType.UNEXECUTABLE
execution_types_by_key[asset.key] = (
[AssetExecutionType.OBSERVATION] if asset.is_observable else []
)
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
Expand All @@ -234,8 +232,11 @@ def from_assets(
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = asset.execution_type

execution_types_by_key[key] = (
[]
if asset.execution_type == AssetExecutionType.UNEXECUTABLE
else [asset.execution_type]
)

# Set auto_observe_interval_minutes for external observable assets
# This can be removed when/if we have a a solution for mapping
Expand Down Expand Up @@ -324,10 +325,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool:
)

def is_observable(self, asset_key: AssetKey) -> bool:
return self._execution_types_by_key.get(asset_key) == AssetExecutionType.OBSERVATION
return AssetExecutionType.OBSERVATION in self._execution_types_by_key.get(asset_key, [])

def is_materializable(self, asset_key: AssetKey) -> bool:
return self._execution_types_by_key.get(asset_key) == AssetExecutionType.MATERIALIZATION
return AssetExecutionType.MATERIALIZATION in self._execution_types_by_key.get(asset_key, [])

def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets that depend on the given asset."""
Expand Down Expand Up @@ -834,7 +835,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand Down
30 changes: 16 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class AssetLayer(NamedTuple):
keys for each asset key produced by this job.
"""

asset_keys_to_execute: AbstractSet[AssetKey]
target_asset_keys: AbstractSet[AssetKey]
assets_defs_by_key: Mapping[AssetKey, "AssetsDefinition"]
assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"]
asset_keys_by_node_input_handle: Mapping[NodeInputHandle, AssetKey]
Expand Down Expand Up @@ -584,13 +584,13 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
},
}

asset_keys_to_execute = {
target_asset_keys = {
key
for assets_def in assets_to_execute_by_node_handle.values()
for key in assets_def.keys
}
return AssetLayer(
asset_keys_to_execute=asset_keys_to_execute,
target_asset_keys=target_asset_keys,
asset_keys_by_node_input_handle=asset_key_by_input,
asset_info_by_node_output_handle=asset_info_by_output,
check_key_by_node_output_handle=check_key_by_output,
Expand All @@ -617,21 +617,15 @@ def downstream_assets_for_asset(self, asset_key: AssetKey) -> AbstractSet[AssetK
return {k for k, v in self.asset_deps.items() if asset_key in v}

@property
def target_asset_keys(self) -> Iterable[AssetKey]:
return self.asset_keys_to_execute
# return set(self.dependency_node_handles_by_asset_key.keys()) & self.executable_asset_keys

@property
def asset_keys(self) -> Iterable[AssetKey]:
# return self.dependency_node_handles_by_asset_key.keys()
def all_asset_keys(self) -> Iterable[AssetKey]:
return self.assets_defs_by_key.keys()

@property
def observable_asset_keys(self) -> Iterable[AssetKey]:
def executable_asset_keys(self) -> Iterable[AssetKey]:
return {
asset_key
for asset_key, assets_def in self.assets_defs_by_key.items()
if assets_def.is_observable
if assets_def.is_executable
}

@property
Expand All @@ -643,11 +637,19 @@ def materializable_asset_keys(self) -> Iterable[AssetKey]:
}

@property
def executable_asset_keys(self) -> Iterable[AssetKey]:
def observable_asset_keys(self) -> Iterable[AssetKey]:
return {
asset_key
for asset_key, assets_def in self.assets_defs_by_key.items()
if assets_def.is_executable
if assets_def.is_observable
}

@property
def external_asset_keys(self) -> AbstractSet[AssetKey]:
return {
asset_key
for asset_key, assets_def in self.assets_defs_by_key.items()
if assets_def.is_external
}

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
if len(selection) == 0:
return selection
all_upstream = _fetch_all_upstream(selection, asset_graph, self.depth, self.include_self)
return {key for key in all_upstream if key not in asset_graph.source_asset_keys}
return {key for key in all_upstream if key in asset_graph.materializable_asset_keys}

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
return self.replace(child=self.child.to_serializable_asset_selection(asset_graph))
Expand Down Expand Up @@ -825,7 +825,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
if len(selection) == 0:
return selection
all_upstream = _fetch_all_upstream(selection, asset_graph)
return {key for key in all_upstream if key in asset_graph.source_asset_keys}
return {key for key in all_upstream if key in asset_graph.external_asset_keys}

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
return self.replace(child=self.child.to_serializable_asset_selection(asset_graph))
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ def asset2(asset1):
*(asset.node_def for asset in assets_to_execute),
*(asset_check.node_def for asset_check in asset_checks),
]
for node_def in node_defs:
check.invariant(node_def.is_executable, f"Node {node_def.name} is not executable.")

graph = GraphDefinition(
name=name,
node_defs=node_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def evaluate_for_asset(
"""
from .asset_condition import AssetConditionResult

if context.asset_key in context.asset_graph.root_materializable_or_observable_asset_keys:
if context.asset_key in context.asset_graph.root_executable_asset_keys:
handled_subset = self.get_handled_subset(context)
unhandled_candidates = (
context.candidate_subset
Expand Down
23 changes: 18 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.storage.tags import UNEXECUTABLE_NODE_TAG
from dagster._utils.warnings import disable_dagster_warnings


Expand Down Expand Up @@ -145,20 +146,32 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
else {}
),
}
execution_type = (
AssetExecutionType.UNEXECUTABLE
if source_asset.observe_fn is None
else AssetExecutionType.OBSERVATION
)
op_tags = (
{UNEXECUTABLE_NODE_TAG: "true"}
if execution_type == AssetExecutionType.UNEXECUTABLE
else None
)

with disable_dagster_warnings():
_execution_type = (
AssetExecutionType.UNEXECUTABLE
if source_asset.observe_fn is None
else AssetExecutionType.OBSERVATION
)

@asset(
key=source_asset.key,
metadata=metadata,
group_name=source_asset.group_name,
description=source_asset.description,
partitions_def=source_asset.partitions_def,
_execution_type=(
AssetExecutionType.UNEXECUTABLE
if source_asset.observe_fn is None
else AssetExecutionType.OBSERVATION
),
_execution_type=execution_type,
op_tags=op_tags,
io_manager_key=source_asset.io_manager_key,
# We don't pass the `io_manager_def` because it will already be present in
# `resource_defs` (it is added during `SourceAsset` initialization).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand Down Expand Up @@ -150,13 +150,11 @@ def from_repository_handles_and_external_asset_nodes(
if not node.is_source
}

execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}
execution_types_by_key: Dict[AssetKey, List[AssetExecutionType]] = {}
for _, node in repo_handle_external_asset_nodes:
execution_types_by_key[node.asset_key] = (
_merge_execution_types(execution_types_by_key[node.asset_key], node.execution_type)
if node.asset_key in execution_types_by_key
else node.execution_type
)
execution_types = execution_types_by_key.setdefault(node.asset_key, [])
if node.execution_type != AssetExecutionType.UNEXECUTABLE:
execution_types.append(node.execution_type)

all_non_source_keys = {
node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source
Expand Down Expand Up @@ -321,19 +319,3 @@ def split_asset_keys_by_repository(
asset_key
)
return list(asset_keys_by_repo.values())


# Rank execution types by (descending) priority. When an asset is present in two code locations with
# different execution types, the canonical execution type in the `ExternalAssetGraph` will be the
# highest-priority type of the two.
_EXECUTION_TYPE_RANKING = [
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
AssetExecutionType.UNEXECUTABLE,
]


def _merge_execution_types(
type_1: AssetExecutionType, type_2: AssetExecutionType
) -> AssetExecutionType:
return min(type_1, type_2, key=_EXECUTION_TYPE_RANKING.index)
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def _get_job_def_for_asset_selection(
check.opt_set_param(asset_check_selection, "asset_check_selection", AssetCheckKey)

nonexistent_assets = [
asset for asset in asset_selection if asset not in self.asset_layer.asset_keys
asset for asset in asset_selection if asset not in self.asset_layer.target_asset_keys
]
nonexistent_asset_strings = [
asset_str
Expand Down Expand Up @@ -1254,7 +1254,7 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) ->
for source_asset in source_assets_list
}
return AssetLayer(
asset_keys_to_execute=set(),
target_asset_keys=set(),
assets_defs_by_node_handle={},
asset_keys_by_node_input_handle=asset_keys_by_node_input_handle,
asset_info_by_node_output_handle={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dagster._core.definitions.configurable import NamedConfigurableDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.tags import UNEXECUTABLE_NODE_TAG

from .hook_definition import HookDefinition
from .utils import check_valid_name, validate_tags
Expand Down Expand Up @@ -134,6 +135,10 @@ def output_defs(self) -> Sequence["OutputDefinition"]:
def output_dict(self) -> Mapping[str, "OutputDefinition"]:
return self._output_dict

@property
def is_executable(self) -> bool:
return UNEXECUTABLE_NODE_TAG not in self.tags

def has_input(self, name: str) -> bool:
check.str_param(name, "name")
return name in self._input_dict
Expand Down
Loading

0 comments on commit 08255d1

Please sign in to comment.