-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[6/n subset refactor] Serialize AssetGraphSubset and AssetBackfillData with whitelist_for_serdes #17844
[6/n subset refactor] Serialize AssetGraphSubset and AssetBackfillData with whitelist_for_serdes #17844
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
5481f79
to
164476f
Compare
239ef13
to
d038030
Compare
164476f
to
05c6643
Compare
d038030
to
65d470f
Compare
87445bb
to
5a12ac3
Compare
65d470f
to
662acf8
Compare
5a12ac3
to
7476390
Compare
662acf8
to
8afcf46
Compare
7476390
to
d3f559c
Compare
8afcf46
to
71ba29e
Compare
2b1ccab
to
fcefec8
Compare
NamedTuple( | ||
"_AssetGraphSubset", | ||
[ | ||
("partitions_subsets_by_serialized_asset_key", Mapping[str, PartitionsSubset]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately json cannot serialize dictionaries that aren't keyed by a primitive or a string, so we key by the serialized asset key instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it make sense to handle that in the custom serializer so it doesn't need to leak into the data model for the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, don't want to increase scope too much, but it could make sense to try to address this at the serialization layer if it's not too big a lift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, don't want to increase scope too much, but it could make sense to try to address this at the serialization layer if it's not too big a lift.
I did take a stab at implementing it this way.
- The properly serialized asset key
pack_value(asset_key....)
value is a dictionary, which cannot be used as a key to a dictionary - We could add custom logic in the serialization layer to convert an asset key to a string (i.e.
asset_key.to_user_string()
but this seems like a pain to deal with if we did decide to add additional fields to the AssetKey class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it make sense to handle that in the custom serializer so it doesn't need to leak into the data model for the class?
My initial version actually used this approach, it looked like this:
dagster/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py
Lines 42 to 69 in 79652f9
class PartitionsSubsetByAssetKeySerializer(FieldSerializer): | |
"""Packs and unpacks a mapping from AssetKey to PartitionsSubset. | |
In JSON, a key must be a str, int, float, bool, or None. This serializer packs the AssetKey | |
into a str, and unpacks it back into an AssetKey. | |
It also converts PartitionKeysTimeWindowPartitionsSubset into serializable TimeWindowPartitionsSubsets. | |
""" | |
def pack(self, mapping: Mapping[AssetKey, Any], **_kwargs) -> Mapping[str, Any]: | |
return { | |
serialize_value(key): serialize_value( | |
value.to_time_window_partitions_subset() | |
if isinstance(value, PartitionKeysTimeWindowPartitionsSubset) | |
else value | |
) | |
for key, value in mapping.items() | |
} | |
def unpack( | |
self, | |
mapping: Mapping[str, Any], | |
**_kwargs, | |
) -> Mapping[AssetKey, Any]: | |
return { | |
deserialize_value(key, AssetKey): deserialize_value(value, TimeWindowPartitionsSubset) | |
for key, value in mapping.items() | |
} |
I moved away from this implementation because it felt like a duplicate of the existing logic, with the exception of converting the asset key to a string, so maybe it would be better to reduce code surface area by just converting the asset keys to serialized form.
I feel mixed on this though -- I can see either making sense. It certainly is cleaner to not have to convert to/from serialized asset keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this seems like a pain to deal with if we did decide to add additional fields to the AssetKey class
I think that we can be confident that we are not going to do this. I suspect a lot of other places would break as well.
python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py
Outdated
Show resolved
Hide resolved
if partitions_def is None: | ||
check.failed("Can only call get_partitions_subset on a partitioned asset") | ||
def get_partitions_subset( | ||
self, asset_key: AssetKey, asset_graph: Optional[AssetGraph] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passed in asset graph because we'd like to get an empty subset instead of None if possible
the __oper__
callsite does not have access to the asset graph, so this is an optional param
def before_pack(self, value: "AssetGraphSubset") -> "AssetGraphSubset": | ||
converted_partitions_subsets_by_serialized_asset_key = {} | ||
for k, v in value.partitions_subsets_by_serialized_asset_key.items(): | ||
if isinstance(v, PartitionKeysTimeWindowPartitionsSubset): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to put the custom serializer on PartitionKeysTimeWindowPartitionsSubset
itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to not have to convert these subset objects everywhere they're used, but I think on principle objects that don't cross a serialization boundary should not be decorated with @whitelist_for_serdes
.
Ideally we have some parallel entity that means "convert to Y, then serialize/deserialize Y". Implementing something like that adds additional scope I'd prefer to avoid at this time
NamedTuple( | ||
"_AssetGraphSubset", | ||
[ | ||
("partitions_subsets_by_serialized_asset_key", Mapping[str, PartitionsSubset]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it make sense to handle that in the custom serializer so it doesn't need to leak into the data model for the class?
71ba29e
to
15244b0
Compare
fcefec8
to
e701159
Compare
8be2bba
to
3e699f0
Compare
baf19c5
to
73a020f
Compare
@sryza This PR has been updated to directly serialize |
3e699f0
to
31698f7
Compare
73a020f
to
68d152d
Compare
31698f7
to
39bcd25
Compare
68d152d
to
329f806
Compare
769d52c
to
656cd32
Compare
329f806
to
6850753
Compare
793517c
to
bd4854f
Compare
7d3db96
to
323344d
Compare
323344d
to
66d2810
Compare
66d2810
to
92b58a5
Compare
self._asset_graph = asset_graph | ||
self._partitions_subsets_by_asset_key = partitions_subsets_by_asset_key or {} | ||
self._non_partitioned_asset_keys = non_partitioned_asset_keys or set() | ||
class PartitionsSubsetMappingNamedTupleSerializer(NamedTupleSerializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sryza I've refactored this serializer to be generalizable to other named tuples that contain partitions subsets mappings. Unfortunately serializers are instantiated on demand rather than at definition time, so we can't easily provide field names to apply this custom logic.
Instead, I've added logic to detect when partitions subsets exist and to convert those accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could use short docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small remaining comments. Otherwise LGTM!
def non_partitioned_asset_keys(self) -> AbstractSet[AssetKey]: | ||
return self._non_partitioned_asset_keys | ||
@whitelist_for_serdes(serializer=PartitionsSubsetMappingNamedTupleSerializer) | ||
class AssetGraphSubset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this just be
@whitelist_for_serdes(...)
class AssetGraphSubset(NamedTuple):
partitions_subsets_by_asset_key: Mapping[AssetKey, PartitionsSubset]
non_partitioned_asset_keys: AbstractSet[AssetKey]
with no __new__
?
self._asset_graph = asset_graph | ||
self._partitions_subsets_by_asset_key = partitions_subsets_by_asset_key or {} | ||
self._non_partitioned_asset_keys = non_partitioned_asset_keys or set() | ||
class PartitionsSubsetMappingNamedTupleSerializer(NamedTupleSerializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense
self._asset_graph = asset_graph | ||
self._partitions_subsets_by_asset_key = partitions_subsets_by_asset_key or {} | ||
self._non_partitioned_asset_keys = non_partitioned_asset_keys or set() | ||
class PartitionsSubsetMappingNamedTupleSerializer(NamedTupleSerializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could use short docstring.
This PR makes
AssetGraphSubset
andAssetBackfillData
serializable viawhitelist_for_serdes
.This involves the following changes:
AssetGraphSubset
andAssetBackfillData
intoNamedTuples
PartitionKeysTimeWindowPartitionsSubset
toTimeWindowsPartitionsSubset
AssetGraphSubset
as it is not serializable. This has a cascading effect:asset_graph_subset.asset_graph
must instead have an asset graph passed inAssetGraphSubset
(i.e. within__or__
). This logic now must be updated to handle cases where a partitions subset is currentlyNone
AssetGraphSubset
or, and, and sub (|
,&
,-
) operations cannot operate directly against sets ofAssetKeyPartitionKey
s, since the asset graph is required to build subsets from theseAssetKeyPartitionKey
s