Skip to content

Commit

Permalink
handle graph snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 15, 2025
1 parent 70385a7 commit b3eaae4
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2571,6 +2571,9 @@
"mapped_solid_name": "never_runs_op"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -33745,6 +33748,9 @@
"name": "simple_graph",
"output_def_snaps": [],
"output_mapping_snaps": [],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -33784,7 +33790,7 @@
'''
# ---
# name: test_all_snapshot_ids[17]
'2f4a7e8dac16272a9e008a61e766b43bd473eb7e'
'cfb767a61be6a485474582ec75fc6834cd314c08'
# ---
# name: test_all_snapshot_ids[18]
'''
Expand Down Expand Up @@ -34920,6 +34926,9 @@
"mapped_solid_name": "adder_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
},
{
Expand Down Expand Up @@ -35002,6 +35011,9 @@
"mapped_solid_name": "adder_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
},
{
Expand Down Expand Up @@ -35084,6 +35096,9 @@
"mapped_solid_name": "div_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -35165,10 +35180,10 @@
'''
# ---
# name: test_all_snapshot_ids[19]
'f726acb538de96f43dfa0335f7bc79d1b52394c1'
'b336066bf8dc411b162f9869dd08da0b18057896'
# ---
# name: test_all_snapshot_ids[1]
'6c2e7891d74bdc0ff647f100a13f4a630081c2ba'
'fb54440831226acbda6ad87d0d7599c4f3424168'
# ---
# name: test_all_snapshot_ids[20]
'''
Expand Down Expand Up @@ -54872,6 +54887,9 @@
"mapped_solid_name": "never_runs_op"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -55018,7 +55036,7 @@
'''
# ---
# name: test_all_snapshot_ids[51]
'709d40d76a7309b11219cb331842fb349e161eb0'
'200c1bc946100875b19e0c6b7f26fbb9edab070b'
# ---
# name: test_all_snapshot_ids[52]
'''
Expand Down Expand Up @@ -80111,6 +80129,9 @@
"mapped_solid_name": "plus_one"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -80254,7 +80275,7 @@
'''
# ---
# name: test_all_snapshot_ids[99]
'cf140114fda778cc12c11ba13f7e26cd0fbccb3c'
'1959e3612cf7d53844928fff25df04128d1b15ff'
# ---
# name: test_all_snapshot_ids[9]
'cf67ad5622d23845499daf2adba4a11f6b23f9eb'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,13 @@ def tags(self) -> Mapping[str, str]:
"""The tags associated with the graph."""
return super().tags

@property
def pools(self) -> set[str]:
pools = set()
for node_def in self.node_defs:
pools.update(node_def.pools)
return pools

@public
def alias(self, name: str) -> "PendingNodeInvocation":
"""Aliases the graph with a new name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,7 @@ def get_op_handles(self, parent: "NodeHandle") -> AbstractSet["NodeHandle"]: ...
def get_op_output_handles(
self, parent: Optional["NodeHandle"]
) -> AbstractSet["NodeOutputHandle"]: ...

@property
@abstractmethod
def pools(self) -> set[str]: ...
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
from dagster._core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster._utils import IHasInternalInit
from dagster._utils.warnings import normalize_renamed_param
from dagster._utils.warnings import normalize_renamed_param, preview_warning

if TYPE_CHECKING:
from dagster._core.definitions.asset_layer import AssetLayer
Expand Down Expand Up @@ -298,6 +298,11 @@ def pool(self) -> Optional[str]:
"""Optional[str]: The concurrency group for this op."""
return self._pool

@property
def pools(self) -> set[str]:
"""Optional[str]: The concurrency group for this op."""
return {self._pool} if self._pool else set()

def is_from_decorator(self) -> bool:
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction

Expand Down Expand Up @@ -605,6 +610,7 @@ def _validate_pool(pool, tags):
)

if pool:
preview_warning("Pools")
return pool

if tag_concurrency_key:
Expand Down
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/_core/snap/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class GraphDefSnap:
dep_structure_snapshot: DependencyStructureSnapshot
input_mapping_snaps: Sequence[InputMappingSnap]
output_mapping_snaps: Sequence[OutputMappingSnap]
pools: set[str]

@cached_property
def input_def_map(self) -> Mapping[str, InputDefSnap]:
Expand Down Expand Up @@ -281,6 +282,7 @@ def build_graph_def_snap(graph_def: GraphDefinition) -> GraphDefSnap:
dep_structure_snapshot=build_dep_structure_snapshot_from_graph_def(graph_def),
input_mapping_snaps=list(map(build_input_mapping_snap, graph_def.input_mappings)),
output_mapping_snaps=list(map(build_output_mapping_snap, graph_def.output_mappings)),
pools=graph_def.pools,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
},
"step_output_versions": []
},
"pipeline_snapshot_id": "3057df527127ce93bc08075d3dc3149b5054065a",
"pipeline_snapshot_id": "b57cc7e00dc45fd4927a082265100e665a21f23d",
"snapshot_version": 1,
"step_keys_to_execute": [
"comp_1.return_one",
Expand Down

0 comments on commit b3eaae4

Please sign in to comment.