Skip to content

Commit

Permalink
Add logic for Sort Order updates (apache#476)
Browse files Browse the repository at this point in the history
* Implement sort order update

* Cleanup

* Add test

* Add test

* Update pyiceberg/table/metadata.py

Co-authored-by: Fokko Driesprong <[email protected]>

* Lint

* Nits

* Nits

---------

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
2 people authored and hpal committed Mar 1, 2024
1 parent ea3ec04 commit 29f1c04
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
37 changes: 37 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ def is_added_schema(self, schema_id: int) -> bool:
update.schema_.schema_id == schema_id for update in self._updates if update.action == TableUpdateAction.add_schema
)

def is_added_sort_order(self, sort_order_id: int) -> bool:
return any(
update.sort_order.order_id == sort_order_id
for update in self._updates
if update.action == TableUpdateAction.add_sort_order
)


@singledispatch
def _apply_table_update(update: TableUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
Expand Down Expand Up @@ -759,6 +766,36 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(AddSortOrderUpdate)
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
return base_metadata.model_copy(
update={
"sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)


@_apply_table_update.register(SetDefaultSortOrderUpdate)
def _(update: SetDefaultSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
new_sort_order_id = update.sort_order_id
if new_sort_order_id == -1:
# The last added sort order should be in base_metadata.sort_orders at this point
new_sort_order_id = max(sort_order.order_id for sort_order in base_metadata.sort_orders)
if not context.is_added_sort_order(new_sort_order_id):
raise ValueError("Cannot set current sort order to the last added one when no sort order has been added")

if new_sort_order_id == base_metadata.default_sort_order_id:
return base_metadata

sort_order = base_metadata.sort_order_by_id(new_sort_order_id)
if sort_order is None:
raise ValueError(f"Sort order with id {new_sort_order_id} does not exist")

context.add_update(update)
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})


def update_table_metadata(base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...]) -> TableMetadata:
"""Update the table metadata with the given updates in one transaction.
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def schema_by_id(self, schema_id: int) -> Optional[Schema]:
"""Get the schema by schema_id."""
return next((schema for schema in self.schemas if schema.schema_id == schema_id), None)

def sort_order_by_id(self, sort_order_id: int) -> Optional[SortOrder]:
"""Get the sort order by sort_order_id."""
return next((sort_order for sort_order in self.sort_orders if sort_order.order_id == sort_order_id), None)


class TableMetadataV1(TableMetadataCommonFields, IcebergBaseModel):
"""Represents version 1 of the Table Metadata.
Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/table/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def __repr__(self) -> str:
UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)


def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema) -> SortOrder:
def assign_fresh_sort_order_ids(
sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: int = INITIAL_SORT_ORDER_ID
) -> SortOrder:
if sort_order.is_unsorted:
return UNSORTED_SORT_ORDER

Expand All @@ -189,4 +191,4 @@ def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh
)
)

return SortOrder(*fresh_fields, order_id=INITIAL_SORT_ORDER_ID)
return SortOrder(*fresh_fields, order_id=sort_order_id)
22 changes: 22 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table import (
AddSnapshotUpdate,
AddSortOrderUpdate,
AssertCreate,
AssertCurrentSchemaId,
AssertDefaultSortOrderId,
Expand All @@ -52,6 +53,7 @@
AssertRefSnapshotId,
AssertTableUUID,
RemovePropertiesUpdate,
SetDefaultSortOrderUpdate,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SnapshotRef,
Expand Down Expand Up @@ -664,6 +666,26 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None:
)


def test_update_metadata_add_update_sort_order(table_v2: Table) -> None:
new_sort_order = SortOrder(order_id=table_v2.sort_order().order_id + 1)
new_metadata = update_table_metadata(
table_v2.metadata,
(AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1)),
)
assert len(new_metadata.sort_orders) == 2
assert new_metadata.sort_orders[-1] == new_sort_order
assert new_metadata.default_sort_order_id == new_sort_order.order_id


def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None:
with pytest.raises(ValueError, match="Cannot set current sort order to the last added one when no sort order has been added"):
update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=-1),))

invalid_order_id = 10
with pytest.raises(ValueError, match=f"Sort order with id {invalid_order_id} does not exist"):
update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=invalid_order_id),))


def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
base_metadata = table_v1.metadata
transaction = table_v1.transaction()
Expand Down

0 comments on commit 29f1c04

Please sign in to comment.