-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[7/n subset refactor] Use new asset backfill data serialization format #17929
[7/n subset refactor] Use new asset backfill data serialization format #17929
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
c8b3b0c
to
9c0eed0
Compare
ada6238
to
06c6bfa
Compare
fcefec8
to
e701159
Compare
f482441
to
0e7cab0
Compare
02fe3e8
to
3cc50a0
Compare
3cc50a0
to
baf19c5
Compare
baf19c5
to
73a020f
Compare
0e7cab0
to
a64597b
Compare
68d152d
to
329f806
Compare
a64597b
to
96a6b69
Compare
329f806
to
6850753
Compare
f5e0097
to
90bbff3
Compare
6850753
to
7d3db96
Compare
e490f29
to
5732fb7
Compare
7d3db96
to
323344d
Compare
f53075b
to
9a28f89
Compare
323344d
to
66d2810
Compare
9a28f89
to
f1a64ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could force the daemon to migrate these objects mid-backfill, but that value add is pretty low.
I definitely agree
("requested_subset", AssetGraphSubset), | ||
("failed_and_downstream_subset", AssetGraphSubset), | ||
("backfill_start_time", datetime), | ||
("partitions_def_ids_by_asset_key", Optional[Mapping[AssetKey, str]]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exactly do we need this? Because we want to fail the backfill if the partitions def changes during its execution?
Is this definitely still necessary now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we want to fail the backfill if the partitions def changes during its execution?
Yes
The alternative would be checking if each targeted partition is still existent in the partitions def
self.serialized_asset_backfill_data is not None or self.asset_backfill_data is not None | ||
) | ||
|
||
def get_asset_backfill_data(self, asset_graph: AssetGraph) -> Optional[AssetBackfillData]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the behavior of this function a little unexpected with respect to its signature. Just based on the signature, I would expect it to return None
in the case where there is no backfill data, not in the case where there's a deserialization issue. Thoughts on still handling the error inside the caller instead of here?
Seems like we then might be able to use this at [1] instead of having to directly call from_serialized
there?
self._logger.warning( | ||
f"Not considering assets in backfill {asset_backfill.backfill_id} since its" | ||
" data could not be deserialized" | ||
if asset_backfill.serialized_asset_backfill_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1]
previous_asset_backfill_data = AssetBackfillData.from_serialized( | ||
backfill.serialized_asset_backfill_data, asset_graph, backfill.backfill_timestamp | ||
) | ||
if backfill.serialized_asset_backfill_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it make sense to consolidate parts of this implementation with get_asset_backfill_data
?
66d2810
to
92b58a5
Compare
5e8bfc8
to
fc79f17
Compare
fc79f17
to
05a0f5d
Compare
@sryza This PR has been updated to address PR feedback above. The main update is removing
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last comment. Otherwise looks good to go!
else: | ||
# Check that all target partitions still exist. If so, the backfill can continue. | ||
for target_key in target_partitions_subset.get_partition_keys(): | ||
if not partitions_def.has_partition_key( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be O(n^2)? Is there a way to make it O(n)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming that N is the number of partitions on the partitions def. It's not O(N^2) -- each partitions def has its own has_partition_key
method that is optimized to not require fetching all partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For StaticPartitionsDefinition
, my read is that has_partition_key
looks inside self._partitions
, which is a sequence and thus O(# partitions) to check if it has a value. Is that off base?
And for dynamic partitions, will this not result in O(target_partitions_subset.get_partition_keys()) calls to the database to get all partition keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah.. yeah, looks like for static partitions defs the call will be O(N^2)
And for dynamic partitions, will this not result in O(target_partitions_subset.get_partition_keys()) calls to the database to get all partition keys?
And yes, this is true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've amended the behavior to instead use the AllPartitionsSubset
to build a subset of all valid partitions, then compare against that subset
This should be O(N) now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ship it!
At long last, we have arrived to the final part in this stack. This PR migrates
PartitionBackfill
logic to use the new serialization format ofAssetBackfillData
.When new backfills are created from now on, the UI will be able to display the asset backfill page even if the partitions defs are changed/removed. For old backfills, the UI continue to show the "partitions def has changed" message, and the asset backfill page will be blank.
Description of changes:
Adds an additional
asset_backfill_data
field toPartitionBackfill
serialized_asset_backfill_data
. We could force the daemon to migrate these objects mid-backfill, but that value add is pretty low. It also improves debug-ability by forcing old backfills to use the old serialization, and new backfills to use the new serialization.Serializes the unique ID of each partitions def in a field on
AssetBackfillData
. Adds a new method in asset backfill execution that uses the unique ID to check if partitions defs have changed, in which case we should stop execution.AssetGraphSubset
, but was unfortunately duplicated across each subset type (materialized, in-progress, failed)Adds tests cases to cover this new surface area