-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Construction of filenames for partitioned writes #453
Conversation
@jqin61 Nice! Thanks for working on this. It is getting late here, but this is on my list for tomorrow 👍 |
6e16690
to
7fcf75a
Compare
|
||
@dataclass(frozen=True) | ||
class PartitionKey: | ||
raw_partition_field_values: List[PartitionFieldValue] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark builds a row accessor that takes in an arrow table row and converts it to key values. The accessor seems a little unnecessary since the partition field could not be nested or a map/list, so here the class just uses a naive list of field-value pairs. Willing to change it if this is inappropriate.
[False], | ||
Record(boolean_field=False), | ||
"boolean_field=False", | ||
# pyiceberg writes False while spark writes false, so justification (compare expected value with spark behavior) would fail. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip justification (set spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification to None) as it will fail: spark writes hive partition path as 'false' while pyiceberg writes hive partition path as 'False'. Shall we align with spark?
"float_field=3.14", | ||
# spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) | ||
# so justification (compare expected value with spark behavior) would fail. | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a partitioned column with float/double value of 3.14, spark-iceberg has the partition in manifest entry as Record[float_field=3.140000104904175] while iceberg has it as [float_field=3.14]
[PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], | ||
[b'example'], | ||
Record(binary_field=b'example'), | ||
"binary_field=ZXhhbXBsZQ%3D%3D", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-iceberg replaces '=' to '%3D', ':' to '%3A' (and there are other url replacements) in the hive partition path. To conform to this, the PR code currently applies urllib.parse.quote() after to_human_string().
"timestamp_field=2023-01-01T12%3A00%3A00", | ||
# spark writes differently as pyiceberg, Record[timestamp_field=1672574400000000] path:timestamp_field=2023-01-01T12%3A00Z (the Z is the difference) | ||
# so justification (compare expected value with spark behavior) would fail. | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-iceberg writes the hive partition path for timestamp in a way that it ends with 'Z' while pyiceberg currently writes without 'Z'.
spark_path_for_justification = ( | ||
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path | ||
) | ||
assert spark_partition_for_justification == expected_partition_record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to justify that the expected path and expected partition come from existing spark behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thanks for working on this. Secondly, I appreciate the nicely constructed PR description with the nice summary of questions:
For boolean type partition, spark writes the hive partition part of the path as "field=false/true" while Pyiceberg (from current underlying utilities) writes as "field=False/True". This difference comes from Python boolean is capitalized.
I think it is better to lowercase the true/false in this instance. Another good source of information on how to handle these things is in the Literal class, where we ignore the casing when converting a string to a boolean.
Spark writes the path conforming to URL format, meaning, in the value part after 'field=', any characters of '=' is replaced by "%3D" and ":" is replaced by "%3A" and etc. Shall we apply urllib.parse.quote to conform to spark behavior?
I think that's a good idea! 👍 I'm not sure what the complete background is, but I don't think we want to pass everything unescaped into the path:
python3
Python 3.11.7 (main, Dec 4 2023, 18:10:11) [Clang 15.0.0 (clang-1500.1.0.2.5)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from urllib import parse
>>> parse.quote("😊")
'%F0%9F%98%8A'
For timestamp(tz) type, spark writes the hive partition part of the path as "2023-01-01T12%3A00%3A01Z", with %3A representing the ':', the timestamp ends with Z while existing Pyiceberg utilities use
(EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat()
to write, which does not have 'Z'
I would expect the timestamptz
to be stored with a Z, and timesetamp
without a Z. Without the Z would mean local time, see https://en.wikipedia.org/wiki/ISO_8601.
I did a quick check, and this seems to be the case here as well. We must follow this on the PyIceberg side as well:
For float and double
A partitioned float field with value of 3.14 would end up in the manifest entry in the manifest file as Record[double_field=3.140000104904175]. So far for Pyiceberg, we are doing it as Record[double_field=3.14] which i think is better.
I agree that 3.14 is better. For the evaluator itself, we should have proper tests around that, but that's outside of the scope of this PR.
@jqin61 I wanted to do a second round, but I think you forgot to push? :) |
Hi Fokko sorry for the delayed push of the fixes. It took a little time to think through how to use the literal function. I made the changes according to your last comment except for the literal one. I think the literal function and Literal Class might not solve the issue I am encountering in the PartitionKey class - I need some utilities to convert a python datetime to micros and convert python date to days. The literal() function could only take in python primitive types and it seems wrong to extend it to take datetime/date to return TimestampLiteral and DateLiteral. Also thanks for pointing out that timestamp in iceberg corresponds to timestamp_ntz in spark. I added tests for it and discovered some new discrepancies: I am not sure whether the path generation behavior is part of iceberg's spec and we should work towards making them exactly the same between spark-iceberg and Pyiceberg - it seems as long as the partition in the manifest entry is correct the query planning could leverage the partition info to do pruning. |
pyiceberg/partitioning.py
Outdated
|
||
|
||
@singledispatch | ||
def _to_iceberg_type(type: IcebergType, value: Any) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the names of this function _to_iceberg_type
and the variable iceberg_typed_value
are causing us a bit of confusion. It looks like what we are trying to do is convert a date or datetime value to its respective EPOCH value (days from epoch, or microseconds to epoch), so that it can be used as an integer value that can be used in this line:
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
Should we call this variable epoch
(instead of iceberg_typed_value
) and change this function name to _to_epoch
? and we can keep the conversion functions as we currently have it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for types other than date and datetime, will this pattern seem weird to call _to_epoch on them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The int
is the internal value that we use to store a datetime/date/time, another one is the uuid
where we accept a UUID
and convert it to a string (I believe, I can check with Trino quickly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add uuid to the function dispatching. Also let me rename the function to _to_iceberg_internal_representation()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the changes in commit 1a48d83
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some final small comments, but apart from that it looks good 👍
pyiceberg/partitioning.py
Outdated
field_strs = [] | ||
value_strs = [] | ||
for pos, value in enumerate(data.record_fields()): | ||
partition_field = self.fields[pos] # partition field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition_field = self.fields[pos] # partition field | |
partition_field = self.fields[pos] |
pyiceberg/partitioning.py
Outdated
|
||
|
||
@singledispatch | ||
def _to_iceberg_internal_representation(type: IcebergType, value: Any) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid confusion later on. Can we change this name to _to_partition_representation
? The internal representation of a UUID is bytes
and not str
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked this line after spark write into iceberg for a table partitioned on uuid type column.
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition
and get
spark_partition_for_justification=Record[uuid_field='f47ac10b-58cc-4372-a567-0e02b2c3d479']
so looks like it is string representation in the data_file.partition?
iceberg_typed_value = _to_iceberg_internal_representation(iceberg_type, raw_partition_field_value.value) | ||
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) | ||
iceberg_typed_key_values[partition_field.name] = transformed_value | ||
return Record(**iceberg_typed_key_values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're now getting into the realm of premature optimization, but ideally you don't need to set the names of the keys. The concept of a Record is that is only contains the data. Just below: self.partition_spec.partition_to_path(self.partition, self.schema)
you can see that you both pass in the partition, and the schema itself. The positions of the schema should match with the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Fokko, thanks for the guidance! My intention of adding the keys is because this PartitionKey.partition is not only used for generating the file path but also used to initiate the Datafile.partition in the io.pyarrow.write_file(). As the integration test shows,
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition
prints
Record(timestamp_field=1672574401000000)
So I assume this data_file.partition is Record with keys.
Let me know what you think about it, thank you!
1a48d83
to
1034823
Compare
rebased; removed the comment; renamed the ambiguous function name |
Let's move this forward, thanks for working on this 👍 |
* PartitionKey Class And Tests * fix linting; add decimal input transform test * fix bool to path lower case; fix timestamptz tests; other pr comments * clean up * add uuid partition type * clean up; rename ambiguous function name
* PartitionKey Class And Tests * fix linting; add decimal input transform test * fix bool to path lower case; fix timestamptz tests; other pr comments * clean up * add uuid partition type * clean up; rename ambiguous function name
Scope
Add PartitionKey class which:
In terms of how PartitionKey is used, please check this PR for [partitioned write support] (#353). I separate this PR out of the partitioned write to make the latter more manageable, but willing to combine the 2 if suggested.
Tests
Object Under Test:
To achieve the goal of comparison against spark, expected path and expected partition are justified against 2 counterpart spark sqls of creating partitioned table and data insertion.
With such justifications, we found these discrepancies between the underlying utility functions in Pyiceberg and the existing spark behavior:
(EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat()
to write, which does not have 'Z'
A partitioned float field with value of 3.14 would end up in the manifest entry in the manifest file as Record[double_field=3.140000104904175]. So far for Pyiceberg, we are doing it as Record[double_field=3.14] which i think is better.
For these discrepancies, should we conform to spark's behaviors?