-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[external-assets] Add AssetGraph subsetting, make build_asset_job use AssetGraph #20405
[external-assets] Add AssetGraph subsetting, make build_asset_job use AssetGraph #20405
Conversation
8ce31aa
to
0138965
Compare
b18dd6e
to
f5586e7
Compare
0138965
to
bff9f3d
Compare
f5586e7
to
c25b92d
Compare
bff9f3d
to
6674779
Compare
c25b92d
to
4e2354e
Compare
4e2354e
to
f9ec200
Compare
f9ec200
to
7a6fc05
Compare
c9db47c
to
b8efbf5
Compare
7a6fc05
to
2ffd2c2
Compare
b8efbf5
to
b7571a0
Compare
2ffd2c2
to
33bad43
Compare
b7571a0
to
42e33eb
Compare
33bad43
to
f97e45f
Compare
42e33eb
to
486dc75
Compare
f97e45f
to
c6eba10
Compare
c6eba10
to
71257fb
Compare
9832ead
to
d529c71
Compare
… in AssetGraph (#20435) ## Summary & Motivation There is a bug in the asset graph that I surfaced in an upstack PR, which is that the `AssetsDefinition` for an asset check isn't available from the asset graph if an `AssetsDefinition` has been subsetted to only a check with no assets. This refactors `AssetGraph` to support this niche case, which opens the door to using `AssetGraph` as the basis for `AssetLayer`. ## How I Tested These Changes Existing test suite. The "new" functionality is tested upstack by #20405
c365f7a
to
ebc8a9a
Compare
ebc8a9a
to
c8b64fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is glorious, ofc. Enough changes to do a cycle.
python_modules/dagster/dagster/_core/definitions/asset_graph.py
Outdated
Show resolved
Hide resolved
executable_assets_defs, raw_loadable_assets_defs = subset_assets_defs( | ||
self.assets_defs, executable_asset_keys, asset_check_keys | ||
) | ||
loadable_assets_defs = [ | ||
unexecutable_ad | ||
for ad in raw_loadable_assets_defs | ||
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming is a bit odd and it is hard to understand what is going on
What is the different between "raw_loadable" and "loadable"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this comment:
# subset_assets_defs returns two lists of Assetsfinitions-- those included and those
# excluded by the selection. These collections retain their original execution type. We need
# to convert the excluded assets to unexecutable external assets.
and changed raw_loadable_assets_defs
to excluded_assets_defs
.
# ignore check keys that don't correspond to an AssetChecksDefinition | ||
asset_checks_defs = list( | ||
{ | ||
acd | ||
for key, acd in self._asset_check_compute_defs_by_key.items() | ||
if key in (asset_check_keys or []) and isinstance(acd, AssetChecksDefinition) | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably wait for your diff to kill AssetChecksDefinition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's actually stacked on this one so it's easier to land this first
executable_assets_defs = [asset for asset in included_assets if asset.is_executable] | ||
unexecutable_assets_defs = [ | ||
unexecutable_ad | ||
for ad in ( | ||
*(asset for asset in included_assets if not asset.is_executable), | ||
*excluded_assets, | ||
) | ||
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about encoding this in the type system with a new type??
ExecutableAssetDefs = NewType(List[AssetsDefinition], "ExecutableAssetDefs")
I think this pattern would be nice for being more disciplined about lists of assets defs separated by "kind"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer not to do this because there is no enforcement that the AssetsDefinitions
in the list would actually be executable, which could be pretty confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would have a conversion function that checks that invariant and then returns the new type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
from typing import Iterable, List, NewType
from dagster import AssetsDefinition
UnexecutableAssetsDefinitions = NewType("UnexecutableAssetsDefinitions", List[AssetsDefinition])
def to_unexecutable_assets_definitions(
assets_definitions: Iterable[AssetsDefinition],
) -> UnexecutableAssetsDefinitions:
return UnexecutableAssetsDefinitions([ad for ad in assets_definitions if not ad.is_executable])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. IMO if we did this I think we'd want to use the singular, so you'd have to_unexecutable_assets_definition
and List[UnexecutableAssetsDefinition]
. I think we should assess when the smoke clears from this PR, the AssetChecksDefinition
removal, and the hollowing out of AssetLayer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singular makes sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yes no need to do it here.
executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), | ||
loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), | ||
asset_checks=asset_graph.asset_checks_defs, | ||
# For now, to preserve behavior keep all asset checks in all base jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you further elaborate here? Why is the ideal behavior? Why is this "For now"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checks should probably just be in the base job with their target asset, and I'm almost certain that will need to be the case when they support partitions. I've added a comment to that effect.
@@ -155,70 +146,38 @@ def asset2(asset1): | |||
""" | |||
from dagster._core.execution.build_resources import wrap_resources_for_execution | |||
|
|||
check.str_param(name, "name") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This codeblock forced me to switch to graphite to find an appropriate meme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def create_unexecutable_external_assets_from_assets_def( | ||
assets_def: AssetsDefinition, | ||
) -> Sequence[AssetsDefinition]: | ||
if not assets_def.is_executable: | ||
return [assets_def] | ||
else: | ||
return [create_external_asset_from_source_asset(sa) for sa in assets_def.to_source_assets()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm struggling to connect this name to the behavior.
When can an assets_def
returns True
out of is_executable
contain source assets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All assets defs can be converted to source assets-- none of them "contain" source assets. This behavior has been used to make assets available for loading to jobs that don't materialize them. We're just doing the same thing here but using an unexecutable assets def for the loadable representation instead of a source asset.
This is important because the assets defs are the source of truth for what assets are being materialized by a job. Previously it was roughly "if there is an assets def for it it's being materialized". That logic no longer applies because loadable assets are now also represented by assets defs-- so it has to be "if there is a materializable assets def for it it's being materialized". So when we want to make an existing materialiazble assets def available exclusively for loading in a job, we convert it to unexecutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That all makes sense.
When the dust settles from all this you should write a document describing the new ontology. Will be a super useful reference.
create_unexecutable_representation_of_assets_def
or a name like could be more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of a tough one to name, but I prefer the current name because:
- it's consistent with the other
create_external_asset...
names in this module - "unexecutable representation of assets def" sounds kind of like it could be creating something that isn't an assets def. Also it would maybe need to be "representations" since we are returning multiple assets def
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah nothing is totally satisfying. This name plus an explanatory comment suffices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
c8b64fa
to
abce53f
Compare
check.invariant( | ||
not invalid_executable_keys, | ||
"Provided executable asset keys must be a subset of existing executable asset keys." | ||
f" Invalid provided keys: {invalid_executable_keys}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recommend only conditionally building this string to it is not constructed on every invocation
abce53f
to
ac8a157
Compare
… in AssetGraph (#20435) ## Summary & Motivation There is a bug in the asset graph that I surfaced in an upstack PR, which is that the `AssetsDefinition` for an asset check isn't available from the asset graph if an `AssetsDefinition` has been subsetted to only a check with no assets. This refactors `AssetGraph` to support this niche case, which opens the door to using `AssetGraph` as the basis for `AssetLayer`. ## How I Tested These Changes Existing test suite. The "new" functionality is tested upstack by #20405
… AssetGraph (#20405) ## Summary & Motivation Cluster of related changes that allow for using `AssetGraph` as our general representation of "collection of assets", which allows it to be used as the source of truth for an asset job. - Add `AssetGraph.get_subset`. This returns a new `AssetGraph`. You pass a set of executable asset keys and asset check keys to get a subset-- the result will automatically include any parent assets as unexecutable. - Add a `create_unexecutable_external_assets_from_assets_def` function. This is fulfilling the same role as `AssetsDefinition.to_source_assets`. It would be cleaner to create a single unexecutable `AssetsDefinition` from the passed-in one, but for now we are returning multiple `AssetsDefinition` (once for each key) since this can be implemented in terms of the existing `.to_source_assets`. - Change `build_assets_job` to accept an `AssetGraph` instead of separate lists of executable and loadable assets definitions. Modify the two callsites to pass an `AssetGraph`. The next step is composing this with `AssetLayer`. ## How I Tested These Changes Existing test suite.
Summary & Motivation
Cluster of related changes that allow for using
AssetGraph
as our general representation of "collection of assets", which allows it to be used as the source of truth for an asset job.AssetGraph.get_subset
. This returns a newAssetGraph
. You pass a set of executable asset keys and asset check keys to get a subset-- the result will automatically include any parent assets as unexecutable.create_unexecutable_external_assets_from_assets_def
function. This is fulfilling the same role asAssetsDefinition.to_source_assets
. It would be cleaner to create a single unexecutableAssetsDefinition
from the passed-in one, but for now we are returning multipleAssetsDefinition
(once for each key) since this can be implemented in terms of the existing.to_source_assets
.build_assets_job
to accept anAssetGraph
instead of separate lists of executable and loadable assets definitions. Modify the two callsites to pass anAssetGraph
.The next step is composing this with
AssetLayer
.How I Tested These Changes
Existing test suite.