-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[external-assets] Implement AssetGraph with AssetNode and RemoteAssetNode #20114
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @smackesey and the rest of your teammates on Graphite |
38dcce2
to
25c8694
Compare
25c8694
to
d491e2b
Compare
6a12f5b
to
83c7c4f
Compare
d491e2b
to
6f52933
Compare
83c7c4f
to
aa004b0
Compare
6f52933
to
25bea97
Compare
25bea97
to
ff8d5c8
Compare
42cda64
to
bb6c63c
Compare
ff8d5c8
to
f29fe01
Compare
f0065f1
to
abb5af5
Compare
f29fe01
to
dfc5cb1
Compare
abb5af5
to
ac8a9c6
Compare
dfc5cb1
to
d7f3f4b
Compare
d7f3f4b
to
0529c67
Compare
ee4e10a
to
e7f3967
Compare
0529c67
to
287503a
Compare
e7f3967
to
b052dac
Compare
287503a
to
cf187b8
Compare
b052dac
to
59f3d5f
Compare
cf187b8
to
dc8fb63
Compare
162284f
to
2fed007
Compare
bd0a85c
to
bc5235f
Compare
class BaseAssetNode(ABC): | ||
key: AssetKey | ||
_children: Optional[AbstractSet[Self]] | ||
_parents: Optional[AbstractSet[Self]] | ||
|
||
# Since both parent and child asset nodes contain refereneces to each other, it is impossible to | ||
# construct a graph of all asset nodes with single-step construction. The nodes must first be | ||
# constructed and then `set_neighbors` must be called to bind the references. | ||
|
||
@property | ||
def children(self) -> AbstractSet[Self]: | ||
if self._children is None: | ||
self._neighbors_unbound_error("child", "children") | ||
return self._children | ||
|
||
@property | ||
def child_keys(self) -> AbstractSet[AssetKey]: | ||
if self._children is None: | ||
self._neighbors_unbound_error("child", "children") | ||
return {child.key for child in self._children} | ||
|
||
def set_children(self, children: AbstractSet[Self]) -> None: | ||
self._children = children |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard no on this one.
You have two options here:
- Change BaseAssetNode to have a reference to the AssetGraph from whence it came. This would allow you to navigate up and down the tree
- Have BaseAssetNode know only about its upstream deps (like AssetsDefinition does). Downstream deps have to come through other abstractions, like the AssetGraph or AssetGraphView.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend option 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated this to now store child_keys
and parent_keys
instead of direct node references (children
and parents
). Child and parent nodes can be resolved with `AssetGraph.get_{children,parents}(node).
I explored retaining direct references under option (2) but it doesn't work because RemoteAssetGraph
needs to be cycle tolerant, so any 1-stage construction with direct references is a no-go.
@property | ||
@cached_method | ||
def group_name(self) -> Optional[str]: | ||
return self._priority_node.group_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cached_method
is about 20x more expensive than bare property access, so we need to make sure that calculating it is worth. This very well may be slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, removed
bc5235f
to
580f7fe
Compare
[INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes-1]
b03c797
to
b3ba639
Compare
@@ -89,7 +89,7 @@ def asset3(asset1, asset2): ... | |||
assert asset_graph.is_partitioned(asset1.key) | |||
assert asset_graph.have_same_partitioning(asset1.key, asset2.key) | |||
assert not asset_graph.have_same_partitioning(asset1.key, asset3.key) | |||
assert asset_graph.get_children(asset0.key) == {asset1.key, asset2.key} | |||
assert asset_graph.get_child_asset_keys(asset0.key) == {asset1.key, asset2.key} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_children
callsites needed to be changed because get_children
now returns nodes instead of keys
b3ba639
to
d58d492
Compare
d58d492
to
3a653f2
Compare
@cached_property | ||
def _observable_node(self) -> "ExternalAssetNode": | ||
return next((node for node in self._external_asset_nodes if node.is_observable)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm shouldn't this be Optional
? I don't see how this is guaranteed to return a value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's intended to error if no observable node is defined (as is _materializable_node
). It's a private implementation detail used only when we know something is observable
# Build an index of execution units by key. An execution unit is a set of assets and checks | ||
# that must be executed together. ExternalAssetNodes and ExternalAssetChecks already have an | ||
# optional execution_set_id set. A null execution_set_id indicates that the node or check | ||
# can be executed independently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execution set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(in comments still says unit)
def all_job_names(self) -> AbstractSet[str]: | ||
return {job_name for node in self.asset_nodes for job_name in node.job_names} | ||
def external_asset_nodes_by_key(self) -> Mapping[AssetKey, "ExternalAssetNode"]: | ||
# This exists to support existing callsites but it should be removed ASAP. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment as to why it need to be removed
def get_materialization_job_names(self, asset_key: AssetKey) -> Sequence[str]: | ||
"""Returns the names of jobs that materialize this asset.""" | ||
return self.get_asset_node(asset_key).job_names | ||
# This is a poorly named method because it will expose observation job names for assets with | ||
# a defined observation but no materialization. | ||
return self.get(asset_key).job_names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to rename in follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A great step forward. Please heed my final comments.
Great makes sense
…On Thu, Mar 7 2024 at 9:19 AM, Sean Mackesey < ***@***.*** > wrote:
***@***.**** commented on this pull request.
In python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py (
#20114 (comment) ) :
> + @cached_property
+ def _observable_node(self) ->
"ExternalAssetNode":
+ return next((node for node in
self._external_asset_nodes if node.is_observable))
It's intended to error if no observable node is defined (as is _materializable_node
). It's a private implementation detail used only when we know something
is observable
—
Reply to this email directly, view it on GitHub (
#20114 (comment) ) ,
or unsubscribe (
https://github.com/notifications/unsubscribe-auth/AG3IK6PBGN7XBQPJ5KFJZHLYXBZPRAVCNFSM6AAAAABD5BA3JCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTSMRSGU3DENRYGE
).
You are receiving this because your review was requested. Message ID: <dagster-io/dagster/pull/20114/review/1922562681
@ github. com>
|
Re: the erroring little method I pointed out, it would be nice to add a |
3a653f2
to
f60e7cb
Compare
[INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes-1]
f60e7cb
to
cbccceb
Compare
…Node (#20114) ## Summary & Motivation Internal companion PR: dagster-io/internal#8537 Initial implementation of asset nodes for the `AssetGraph`. - `BaseAssetGraph` is now generic in a new `BaseAssetNode` class that exposes the metadata for an asset. - The node class for `AssetGraph` is `AssetNode`. It wraps an `AssetsDefinition`. - The node class for `RemoteAssetGraph` is `RemoteAssetNode`. It wraps a list of `ExternalAssetNode` (to be renamed upstack) objects sourced from one or more code locations. - Moving to nodes with a common interface allows many property accessor methods to be deleted on `BaseAssetGraph` and exposed on `BaseAssetNode` instead. The use of a common interface on the two kinds of nodes allows other method impls to be hoisted to the base `AssetGraph` class. - To reduce noise in this PR, I have not changed callsites (with a few exceptions), and instead just swapped out property accessor method impls. Callsites are changed in an upstack PR, where e.g. `asset_graph.get(<key>).auto_materialize_policy` is used. ## How I Tested These Changes Existing test suite.
Summary & Motivation
Internal companion PR: https://github.com/dagster-io/internal/pull/8537
Initial implementation of asset nodes for the
AssetGraph
.BaseAssetGraph
is now generic in a newBaseAssetNode
class that exposes the metadata for an asset.AssetGraph
isAssetNode
. It wraps anAssetsDefinition
.RemoteAssetGraph
isRemoteAssetNode
. It wraps a list ofExternalAssetNode
(to be renamed upstack) objects sourced from one or more code locations.BaseAssetGraph
and exposed onBaseAssetNode
instead. The use of a common interface on the two kinds of nodes allows other method impls to be hoisted to the baseAssetGraph
class.asset_graph.get(<key>).auto_materialize_policy
is used.How I Tested These Changes
Existing test suite.