diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 690e9c4a40..0a42460075 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -51,11 +51,12 @@ from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, - CommitTableRequest, CommitTableResponse, CreateTableTransaction, StagedTable, Table, + TableRequirement, + TableUpdate, update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata @@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U """ @abstractmethod - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update one or more tables. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -881,13 +886,19 @@ def _create_staged_table( catalog=self, ) - def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable: - for requirement in table_request.requirements: + def _update_and_stage_table( + self, + current_table: Optional[Table], + table_identifier: Identifier, + requirements: Tuple[TableRequirement, ...], + updates: Tuple[TableUpdate, ...], + ) -> StagedTable: + for requirement in requirements: requirement.validate(current_table.metadata if current_table else None) updated_metadata = update_table_metadata( base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, + updates=updates, enforce_validation=current_table is None, metadata_location=current_table.metadata_location if current_table else None, ) @@ -896,7 +907,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request: new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) return StagedTable( - identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]), + identifier=table_identifier, metadata=updated_metadata, metadata_location=new_metadata_location, io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index b308678826..52973f6de6 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -23,6 +23,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -57,7 +58,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 05990325d2..f8af38bd79 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -23,6 +23,7 @@ List, Optional, Set, + Tuple, Union, cast, ) @@ -69,9 +70,10 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, Table, + TableRequirement, + TableUpdate, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) return self.load_table(identifier=identifier) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) current_glue_table: Optional[TableTypeDef] glue_table_version_id: Optional[str] @@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons glue_table_version_id = None current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index ce725ccd23..a7a69a38d5 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -26,6 +26,7 @@ List, Optional, Set, + Tuple, Type, Union, ) @@ -79,11 +80,12 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, StagedTable, Table, TableProperties, + TableRequirement, + TableUpdate, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -421,11 +423,15 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -434,10 +440,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: @@ -448,7 +452,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons if lock.state == LockState.WAITING: self._wait_for_lock(database_name, table_name, lock.lockid, open_client) else: - raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") hive_table: Optional[HiveTable] current_table: Optional[Table] @@ -459,7 +463,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons hive_table = None current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) @@ -489,7 +493,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) self._create_hive_table(open_client, hive_table) except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index 0f16b6909f..f29bdd94b6 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -19,6 +19,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -26,10 +27,11 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, CreateTableTransaction, Table, + TableRequirement, + TableUpdate, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: raise NotImplementedError def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index cc6d891e63..f54a698dc5 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -69,6 +69,8 @@ StagedTable, Table, TableIdentifier, + TableRequirement, + TableUpdate, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids @@ -753,11 +755,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers] @retry(**_RETRY_ARGS) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -767,9 +773,12 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ + identifier = self._identifier_to_tuple_without_catalog(table.identifier) + table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) + table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), - data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8), + data=table_request.model_dump_json().encode(UTF8), ) try: response.raise_for_status() diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index c7e546ba2b..6931515b05 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -20,6 +20,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -60,7 +61,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update one or more tables. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - namespace_tuple = Catalog.namespace_from(identifier_tuple) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) + namespace_tuple = Catalog.namespace_from(table_identifier) namespace = Catalog.namespace_to_string(namespace_tuple) - table_name = Catalog.table_name_from(identifier_tuple) + table_name = Catalog.table_name_from(table_identifier) current_table: Optional[Table] try: - current_table = self.load_table(identifier_tuple) + current_table = self.load_table(table_identifier) except NoSuchTableError: current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table.identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0316b40443..43e79fb1cf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1673,13 +1673,7 @@ def refs(self) -> Dict[str, SnapshotRef]: return self.metadata.refs def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: - response = self.catalog._commit_table( # pylint: disable=W0212 - CommitTableRequest( - identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]), - updates=updates, - requirements=requirements, - ) - ) # pylint: disable=W0212 + response = self.catalog.commit_table(self, requirements, updates) self.metadata = response.metadata self.metadata_location = response.metadata_location diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index e87de9b1ab..3f1753c943 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -24,6 +24,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -45,12 +46,11 @@ from pyiceberg.schema import Schema from pyiceberg.table import ( AddSchemaUpdate, - CommitTableRequest, CommitTableResponse, - Namespace, SetCurrentSchemaUpdate, Table, - TableIdentifier, + TableRequirement, + TableUpdate, update_table_metadata, ) from pyiceberg.table.metadata import new_table_metadata @@ -128,17 +128,17 @@ def create_table( def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier) current_table = self.load_table(identifier_tuple) base_metadata = current_table.metadata - for requirement in table_request.requirements: + for requirement in requirements: requirement.validate(base_metadata) - updated_metadata = update_table_metadata(base_metadata, table_request.updates) + updated_metadata = update_table_metadata(base_metadata, updates) if updated_metadata == base_metadata: # no changes, do nothing return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) @@ -673,14 +673,13 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: ) # When - response = given_table.catalog._commit_table( # pylint: disable=W0212 - CommitTableRequest( - identifier=TableIdentifier(namespace=Namespace(given_table._identifier[:-1]), name=given_table._identifier[-1]), - updates=[ - AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), - SetCurrentSchemaUpdate(schema_id=-1), - ], - ) + response = given_table.catalog.commit_table( + given_table, + updates=( + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + ), + requirements=(), ) # Then diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index d7b5b673b9..f05e15df38 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -40,7 +40,7 @@ from pyiceberg.io import load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import CommitTableRequest, Table, TableIdentifier +from pyiceberg.table import Table from pyiceberg.table.metadata import TableMetadataV1 from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import IdentityTransform, TruncateTransform @@ -1289,22 +1289,28 @@ def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None: assert catalog.uri == "https://other-service.io/api" -def test_table_identifier_in_commit_table_request(rest_mock: Mocker, example_table_metadata_v2: Dict[str, Any]) -> None: - test_table_request = CommitTableRequest( - identifier=TableIdentifier(namespace=("catalog_name", "namespace"), name="table_name"), - updates=[], - requirements=[], - ) +def test_table_identifier_in_commit_table_request( + rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_v2: Dict[str, Any] +) -> None: + metadata_location = "s3://some_bucket/metadata.json" rest_mock.post( url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name", json={ "metadata": example_table_metadata_v2, - "metadata-location": "test", + "metadata-location": metadata_location, }, status_code=200, request_headers=TEST_HEADERS, ) - RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)._commit_table(test_table_request) + catalog = RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN) + table = Table( + identifier=("namespace", "table_name"), + metadata=None, # type: ignore + metadata_location=metadata_location, + io=None, # type: ignore + catalog=catalog, + ) + catalog.commit_table(table, (), ()) assert ( rest_mock.last_request.text == """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""