Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partitioned write support #353

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test-integration:
docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}


test-integration-rebuild:
docker-compose -f dev/docker-compose-integration.yml kill
docker-compose -f dev/docker-compose-integration.yml rm -f
Expand Down
90 changes: 40 additions & 50 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1697,15 +1697,13 @@ def fill_parquet_file_metadata(

lower_bounds = {}
upper_bounds = {}

for k, agg in col_aggs.items():
_min = agg.min_as_bytes()
if _min is not None:
lower_bounds[k] = _min
_max = agg.max_as_bytes()
if _max is not None:
upper_bounds[k] = _max

for field_id in invalidate_col:
del lower_bounds[field_id]
del upper_bounds[field_id]
Expand All @@ -1722,54 +1720,46 @@ def fill_parquet_file_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
schema = table_metadata.schema()
arrow_file_schema = schema_to_pyarrow(schema)

fo = io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write_table(task.df, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
for task in tasks:
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}' # generate_data_file_filename
schema = table_metadata.schema()
arrow_file_schema = schema_to_pyarrow(schema)

fo = io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write_table(task.df, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_key.partition if task.partition_key else None,
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
return iter([data_file])
yield data_file


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
Expand Down
8 changes: 2 additions & 6 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
IcebergType,
IntegerType,
ListType,
Expand All @@ -51,8 +50,6 @@
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
)

Expand Down Expand Up @@ -289,10 +286,7 @@ def partition_field_to_data_file_partition_field(partition_field_type: IcebergTy


@partition_field_to_data_file_partition_field.register(LongType)
@partition_field_to_data_file_partition_field.register(DateType)
@partition_field_to_data_file_partition_field.register(TimeType)
@partition_field_to_data_file_partition_field.register(TimestampType)
@partition_field_to_data_file_partition_field.register(TimestamptzType)
def _(partition_field_type: PrimitiveType) -> IntegerType:
return IntegerType()

Expand All @@ -308,6 +302,7 @@ def data_file_with_partition(partition_type: StructType, format_version: Literal
field_id=field.field_id,
name=field.name,
field_type=partition_field_to_data_file_partition_field(field.field_type),
required=False,
)
for field in partition_type.fields
])
Expand Down Expand Up @@ -484,6 +479,7 @@ def update(self, value: Any) -> None:
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
types = [field.field_type for field in spec.partition_type(schema).fields]
field_stats = [PartitionFieldStats(field_type) for field_type in types]

for partition_keys in partitions:
for i, field_type in enumerate(types):
if not isinstance(field_type, PrimitiveType):
Expand Down
15 changes: 7 additions & 8 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:
for pos, value in enumerate(data.record_fields()):
partition_field = self.fields[pos]
value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value)

value_str = quote(value_str, safe='')
value_strs.append(value_str)
field_strs.append(partition_field.name)
Expand Down Expand Up @@ -388,7 +387,7 @@ def partition(self) -> Record: # partition key transformed with iceberg interna
raise ValueError("partition_fields must contain exactly one field.")
partition_field = partition_fields[0]
iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value)
iceberg_typed_value = arrow_to_iceberg_representation(iceberg_type, raw_partition_field_value.value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
iceberg_typed_key_values[partition_field.name] = transformed_value
return Record(**iceberg_typed_key_values)
Expand All @@ -398,26 +397,26 @@ def to_path(self) -> str:


@singledispatch
def _to_partition_representation(type: IcebergType, value: Any) -> Any:
def arrow_to_iceberg_representation(type: IcebergType, value: Any) -> Any:
return TypeError(f"Unsupported partition field type: {type}")


@_to_partition_representation.register(TimestampType)
@_to_partition_representation.register(TimestamptzType)
@arrow_to_iceberg_representation.register(TimestampType)
@arrow_to_iceberg_representation.register(TimestamptzType)
def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]:
return datetime_to_micros(value) if value is not None else None


@_to_partition_representation.register(DateType)
@arrow_to_iceberg_representation.register(DateType)
def _(type: IcebergType, value: Optional[date]) -> Optional[int]:
return date_to_days(value) if value is not None else None


@_to_partition_representation.register(UUIDType)
@arrow_to_iceberg_representation.register(UUIDType)
def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]:
return str(value) if value is not None else None


@_to_partition_representation.register(PrimitiveType)
@arrow_to_iceberg_representation.register(PrimitiveType)
def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]:
return value
Loading