-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[3/n subset refactor] Add whitelist_for_serdes to TimeWindowPartitionsSubset #17702
[3/n subset refactor] Add whitelist_for_serdes to TimeWindowPartitionsSubset #17702
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
dbaca7d
to
a138c9f
Compare
).get_last_partition_window(current_time=current_time) | ||
|
||
if not first_tw or not last_tw: | ||
check.failed("No partitions found") | ||
|
||
if len(self.included_time_windows) == 0: | ||
if len(self.get_included_time_windows()) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be cleaner in this method if there was a single included_time_windows = self.get_included_time_windows()
up here, rather than invoking get_included_time_windows()
a bunch below
from abc import ABC, abstractproperty
from typing import NamedTuple
class Superclass(ABC):
@abstractproperty
def foo(self) -> str:
...
class Subclass(Superclass, NamedTuple("_Subclass", [("foo", str)])):
@property
def foo(self):
return self._asdict()["foo"]
bar = Subclass("fdsjkfld")
print(bar.foo) |
I think reversing the order of the parent classes also helps: class Subclass(NamedTuple("_Subclass", [("foo", str)]), Superclass):
... |
d264af4
to
f320752
Compare
5335af4
to
8ccd9d2
Compare
Ahhh this is great!! Such subtle arts.... |
e5bd107
to
f667fd8
Compare
f320752
to
0b5fa15
Compare
f667fd8
to
41b7804
Compare
0b5fa15
to
d629763
Compare
2cdad9c
to
e97b5b2
Compare
7e1cb7a
to
f72f939
Compare
f72f939
to
927eb63
Compare
python_modules/dagster/dagster/_core/definitions/time_window_partitions.py
Outdated
Show resolved
Hide resolved
# is needed to improve performance. When serializing, we want to serialize the number of | ||
# partitions, so we force calculatation. | ||
def before_pack(self, value: "TimeWindowPartitionsSubset") -> "TimeWindowPartitionsSubset": | ||
if value._asdict()["num_partitions"] is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value.num_partitions
doesn't work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value.num_partitions
will calculate the # partitions if the field in the tuple is None
What we really want is to check if the field value is None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see - I think either worth including a comment or exposing a boolean property that tells whether it's computed.
python_modules/dagster/dagster/_core/definitions/time_window_partitions.py
Outdated
Show resolved
Hide resolved
@property | ||
def included_time_windows(self) -> Sequence[TimeWindow]: | ||
return self._included_time_windows | ||
return _num_partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this is a local variable set to self._asdict()["num_partitions"]
included_time_windows, "included_time_windows", of_type=TimeWindow | ||
check.sequence_param(included_time_windows, "included_time_windows", of_type=TimeWindow) | ||
|
||
time_windows_with_timezone = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exactly is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this change, the time windows will remain in UTC upon deserialization (since that is how the DatetimeFieldSerializer
deserializes).
We currently have logic in from_serialized
to convert these time windows to the timezone of the partitions def, so I added these lines to ensure that the timezone of the time windows is correct.
I'm not sure that it's strictly necessary (since all the tests were passing), but I figured this would probably be a beneficial change regardless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this change, the time windows will remain in UTC upon deserialization (since that is how the DatetimeFieldSerializer deserializes).
Could we make DatetimeFieldSerializer
preserve the timezone? It seems risky that the deserialized datetime could have a different datetime than the pre-serialized datetime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call -- I added a change that serializes with timezone
9d11a25
to
1aadacf
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 1aadacf. |
1aadacf
to
f88b440
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note comments, otherwise great!
|
||
@cached_property | ||
def num_partitions(self) -> int: | ||
if self._num_partitions is None: | ||
_num_partitions = self._asdict()["num_partitions"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the underscore notation here is a bit weird, this could just be "num_partitions"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or num_partitions_
if you want to disambiguate from the function name
f88b440
to
cd6e384
Compare
@@ -54,24 +63,59 @@ | |||
from .partition_key_range import PartitionKeyRange | |||
|
|||
|
|||
# UTCTimestampWithTimezone is used to preserve timezone information when serializing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To elaborate on this a bit more, datetime.isoformat()
will just store the UTC offset of the datetime. When this value is unpacked, adding timedeltas (or similar) is inexact because the IANA timezone now longer exists on the object. (More details on stack overflow)
In order to prevent any lossy serialization, this implementation serializes both the datetime float and the IANA timezone so that deserialization yields the exact datetime before serialization.
# We can't store datetime.isoformat() because it only preserves UTC offsets, which vary depending on | ||
# daylight savings time. | ||
@whitelist_for_serdes | ||
class UTCTimestampWithTimezone(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming nitpicks:
- I think this could be just called
TimestampWithTimezone
, because timestamps (unlike datetimes) are usually assumed to be in UTC. datetime_float
->timestamp
. And worth including a comment that this refers to seconds since the Unix epoch.
continue time window partitions subset changes asset backfill serialization partition mapping update continue refactor fix more tests more test fixes fix partition mapping tests adjust test fix more tests add tests
Co-authored-by: Sandy Ryza <[email protected]>
cd6e384
to
0e3a4fe
Compare
This PR converts
TimeWindowPartitionsSubset
to a named tuple decorated with@whitelist_for_serdes
. This is mostly field renames (i.e.self._included_time_windows -> self.included_time_windows
).There is a logic change to add a
before_pack
hook inwhitelist_for_serdes
. This enables mutating the named tuple before it is serialized, which is used in this PR to force calculating the # partitions in theTimeWindowPartitionsSubset
. This has been added because there is perf logic to delay calculating the # partitions inTimeWindowPartitionsSubset
until necessary.