Skip to content

Commit

Permalink
Make commit_table public (apache#1112)
Browse files Browse the repository at this point in the history
* Make `commit_table` public

* Comments

* Thanks Kevin!

* Update tests
  • Loading branch information
Fokko authored Sep 5, 2024
1 parent f2f428e commit 052a9cd
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 82 deletions.
27 changes: 19 additions & 8 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableRequirement,
TableUpdate,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
Expand Down Expand Up @@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
"""

@abstractmethod
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand Down Expand Up @@ -881,13 +886,19 @@ def _create_staged_table(
catalog=self,
)

def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
for requirement in table_request.requirements:
def _update_and_stage_table(
self,
current_table: Optional[Table],
table_identifier: Identifier,
requirements: Tuple[TableRequirement, ...],
updates: Tuple[TableUpdate, ...],
) -> StagedTable:
for requirement in requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
updates=updates,
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)
Expand All @@ -896,7 +907,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)

return StagedTable(
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
identifier=table_identifier,
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
Expand Down
13 changes: 9 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -57,7 +58,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -69,9 +70,10 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
return self.load_table(identifier=identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)

current_glue_table: Optional[TableTypeDef]
glue_table_version_id: Optional[str]
Expand All @@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
glue_table_version_id = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
26 changes: 15 additions & 11 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
List,
Optional,
Set,
Tuple,
Type,
Union,
)
Expand Down Expand Up @@ -79,11 +80,12 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
TableProperties,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -421,11 +423,15 @@ def _do_wait_for_lock() -> LockResponse:

return _do_wait_for_lock()

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -434,10 +440,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
Expand All @@ -448,7 +452,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

hive_table: Optional[HiveTable]
current_table: Optional[Table]
Expand All @@ -459,7 +463,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down Expand Up @@ -489,7 +493,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand Down
8 changes: 6 additions & 2 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
List,
Optional,
Set,
Tuple,
Union,
)

from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError

def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
Expand Down
17 changes: 13 additions & 4 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
StagedTable,
Table,
TableIdentifier,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
Expand Down Expand Up @@ -753,11 +755,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers]

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -767,9 +773,12 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""
identifier = self._identifier_to_tuple_without_catalog(table.identifier)
table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates)
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
data=table_request.model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
25 changes: 14 additions & 11 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -60,7 +61,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
namespace_tuple = Catalog.namespace_from(table_identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(table_identifier)

current_table: Optional[Table]
try:
current_table = self.load_table(identifier_tuple)
current_table = self.load_table(table_identifier)
except NoSuchTableError:
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table.identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
8 changes: 1 addition & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,13 +1673,7 @@ def refs(self) -> Dict[str, SnapshotRef]:
return self.metadata.refs

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]),
updates=updates,
requirements=requirements,
)
) # pylint: disable=W0212
response = self.catalog.commit_table(self, requirements, updates)
self.metadata = response.metadata
self.metadata_location = response.metadata_location

Expand Down
Loading

0 comments on commit 052a9cd

Please sign in to comment.