Skip to content

Commit

Permalink
fix more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 8, 2023
1 parent ebb715c commit e5bd107
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from dagster._serdes import (
whitelist_for_serdes,
)
from dagster._serdes.serdes import FieldSerializer, deserialize_value, serialize_value
from dagster._serdes.serdes import (
FieldSerializer,
NamedTupleSerializer,
deserialize_value,
serialize_value,
)
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
Expand Down Expand Up @@ -1871,10 +1876,22 @@ def unpack(
).to_time_window_partitions_def()


class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer):
# TimeWindowPartitionsSubsets have custom logic to delay calculating num_partitions until it
# is needed to improve performance. When serializing, we want to serialize the number of
# partitions, so we force calculatation.
def before_pack(self, value: "TimeWindowPartitionsSubset") -> "TimeWindowPartitionsSubset":
if value._asdict()["num_partitions"] is None:
return value._replace(num_partitions=value.num_partitions)
return value


@whitelist_for_serdes(
field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer}
field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer},
serializer=TimeWindowPartitionsSubsetSerializer,
)
class TimeWindowPartitionsSubset(
BaseTimeWindowPartitionsSubset,
NamedTuple(
"_TimeWindowPartitionsSubset",
[
Expand All @@ -1883,7 +1900,6 @@ class TimeWindowPartitionsSubset(
("included_time_windows", Sequence[TimeWindow]),
],
),
BaseTimeWindowPartitionsSubset,
):
def __new__(
cls,
Expand Down
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_serdes/serdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def pack(
) -> Dict[str, JsonSerializableValue]:
packed: Dict[str, JsonSerializableValue] = {}
packed["__class__"] = self.get_storage_name()
for key, inner_value in value._asdict().items():
for key, inner_value in self.before_pack(value)._asdict().items():
if key in self.skip_when_empty_fields and inner_value in EMPTY_VALUES_TO_SKIP:
continue
storage_key = self.storage_field_names.get(key, key)
Expand All @@ -531,6 +531,10 @@ def pack(
packed = self.after_pack(**packed)
return packed

# Hook: Modify the contents of the named tuple before packing
def before_pack(self, value: T_NamedTuple) -> T_NamedTuple:
return value

# Hook: Modify the contents of the packed, json-serializable dict before it is converted to a
# string.
def after_pack(self, **packed_dict: JsonSerializableValue) -> Dict[str, JsonSerializableValue]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,23 @@ def test_time_window_partitions_subset_serialization_deserialization(
)
assert deserialized == subset
assert deserialized.get_partition_keys() == ["2023-01-01"]


def test_time_window_partitions_subset_num_partitions_serialization():
daily_partitions_def = DailyPartitionsDefinition("2023-01-01")
time_partitions_def = TimeWindowPartitionsDefinition(
start=daily_partitions_def.start,
end=daily_partitions_def.end,
cron_schedule="0 0 * * *",
fmt="%Y-%m-%d",
timezone=daily_partitions_def.timezone,
end_offset=daily_partitions_def.end_offset,
)

tw = time_partitions_def.time_window_for_partition_key("2023-01-01")

subset = TimeWindowPartitionsSubset(
time_partitions_def, num_partitions=None, included_time_windows=[tw]
)
deserialized = deserialize_value(serialize_value(subset), TimeWindowPartitionsSubset)
assert deserialized._asdict()["num_partitions"] is not None

0 comments on commit e5bd107

Please sign in to comment.