diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 993be87ddd..4cb6c2ff4a 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations import logging +import re import uuid from abc import ABC, abstractmethod from dataclasses import dataclass @@ -74,6 +75,17 @@ LOCATION = "location" EXTERNAL_TABLE = "EXTERNAL_TABLE" +TABLE_METADATA_FILE_NAME_REGEX = re.compile( + r""" + (\d+) # version number + - # separator + ([\w-]{36}) # UUID (36 characters, including hyphens) + (?:\.\w+)? # optional codec name + \.metadata\.json # file extension + """, + re.X, +) + class CatalogType(Enum): REST = "rest" @@ -587,8 +599,38 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) @staticmethod - def _get_metadata_location(location: str) -> str: - return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json" + def _get_metadata_location(location: str, new_version: int = 0) -> str: + if new_version < 0: + raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") + version_str = f"{new_version:05d}" + return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json" + + @staticmethod + def _parse_metadata_version(metadata_location: str) -> int: + """Parse the version from the metadata location. + + The version is the first part of the file name, before the first dash. + For example, the version of the metadata file + `s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json` + is 1. + If the path does not comply with the pattern, the version is defaulted to be -1, ensuring + that the next metadata file is treated as having version 0. + + Args: + metadata_location (str): The location of the metadata file. + + Returns: + int: The version of the metadata file. -1 if the file name does not have valid version string + """ + file_name = metadata_location.split("/")[-1] + if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name): + try: + uuid.UUID(file_name_match.group(2)) + except ValueError: + return -1 + return int(file_name_match.group(1)) + else: + return -1 def _get_updated_props_and_update_summary( self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 723de2f335..6cf9462b71 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -40,6 +40,7 @@ ICEBERG, LOCATION, METADATA_LOCATION, + PREVIOUS_METADATA_LOCATION, TABLE_TYPE, Catalog, Identifier, @@ -47,6 +48,7 @@ PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, @@ -59,21 +61,40 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT - -def _construct_parameters(metadata_location: str) -> Properties: - return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location} - - -def _construct_create_table_input(table_name: str, metadata_location: str, properties: Properties) -> TableInputTypeDef: +# If Glue should skip archiving an old table version when creating a new version in a commit. By +# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default +# max number of archived table versions (can be increased). So for streaming use case with lots +# of commits, it is recommended to set this value to true. +GLUE_SKIP_ARCHIVE = "glue.skip-archive" +GLUE_SKIP_ARCHIVE_DEFAULT = True + + +def _construct_parameters( + metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None +) -> Properties: + new_parameters = glue_table.get("Parameters", {}) if glue_table else {} + new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}) + if prev_metadata_location: + new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location + return new_parameters + + +def _construct_table_input( + table_name: str, + metadata_location: str, + properties: Properties, + glue_table: Optional[TableTypeDef] = None, + prev_metadata_location: Optional[str] = None, +) -> TableInputTypeDef: table_input: TableInputTypeDef = { "Name": table_name, "TableType": EXTERNAL_TABLE, - "Parameters": _construct_parameters(metadata_location), + "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location), } if "Description" in properties: @@ -177,6 +198,28 @@ def _create_glue_table(self, database_name: str, table_name: str, table_input: T except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e + def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None: + try: + self.glue.update_table( + DatabaseName=database_name, + TableInput=table_input, + SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), + VersionId=version_id, + ) + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e + except self.glue.exceptions.ConcurrentModificationException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}" + ) from e + + def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef: + try: + load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) + return load_table_response["Table"] + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + def create_table( self, identifier: Union[str, Identifier], @@ -215,7 +258,7 @@ def create_table( io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) - table_input = _construct_create_table_input(table_name, metadata_location, properties) + table_input = _construct_table_input(table_name, metadata_location, properties) database_name, table_name = self.identifier_to_database_and_table(identifier) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) @@ -247,8 +290,52 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons Raises: NoSuchTableError: If a table with the given identifier does not exist. + CommitFailedException: If the commit failed. """ - raise NotImplementedError + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) + ) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + + current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + glue_table_version_id = current_glue_table.get("VersionId") + if not glue_table_version_id: + raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing") + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + base_metadata = current_table.metadata + + # Validate the update requirements + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + update_table_input = _construct_table_input( + table_name=table_name, + metadata_location=new_metadata_location, + properties=current_table.properties, + glue_table=current_glue_table, + prev_metadata_location=current_table.metadata_location, + ) + + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking + self._update_glue_table( + database_name=database_name, + table_name=table_name, + table_input=update_table_input, + version_id=glue_table_version_id, + ) + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. @@ -267,12 +354,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: """ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) - try: - load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) - except self.glue.exceptions.EntityNotFoundException as e: - raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e - return self._convert_glue_to_iceberg(load_table_response["Table"]) + return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name)) def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 2689ef14d3..99f0adac40 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -31,6 +31,7 @@ TableAlreadyExistsError, ) from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType from tests.conftest import clean_up, get_bucket_name, get_s3_path # The number of tables/databases used in list_table/namespace test @@ -61,6 +62,7 @@ def test_create_table( assert table.identifier == (CATALOG_NAME,) + identifier metadata_location = table.metadata_location.split(get_bucket_name())[1][1:] s3.head_object(Bucket=get_bucket_name(), Key=metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None: @@ -82,6 +84,7 @@ def test_create_table_with_default_location( assert table.identifier == (CATALOG_NAME,) + identifier metadata_location = table.metadata_location.split(get_bucket_name())[1][1:] s3.head_object(Bucket=get_bucket_name(), Key=metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None: @@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_na assert table.identifier == loaded_table.identifier assert table.metadata_location == loaded_table.metadata_location assert table.metadata == loaded_table.metadata + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None: @@ -126,6 +130,7 @@ def test_rename_table( new_table_name = f"rename-{table_name}" identifier = (database_name, table_name) table = test_catalog.create_table(identifier, table_schema_nested) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 assert table.identifier == (CATALOG_NAME,) + identifier new_identifier = (new_database_name, new_table_name) test_catalog.rename_table(identifier, new_identifier) @@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str) else: assert k in update_report.removed assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"] + + +def test_commit_table_update_schema( + test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + identifier = (database_name, table_name) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + original_table_metadata = table.metadata + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + assert original_table_metadata.current_schema_id == 0 + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) + assert new_schema + assert new_schema == update._apply() + assert new_schema.find_field("b").field_type == IntegerType() diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 6aaeb2898c..2a33c8e23d 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -31,6 +31,7 @@ TableAlreadyExistsError, ) from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX @@ -45,6 +46,7 @@ def test_create_table_with_database_location( table = test_catalog.create_table(identifier, table_schema_nested) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 @mock_glue @@ -58,6 +60,7 @@ def test_create_table_with_default_warehouse( table = test_catalog.create_table(identifier, table_schema_nested) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 @mock_glue @@ -73,6 +76,7 @@ def test_create_table_with_given_location( ) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 @mock_glue @@ -98,6 +102,7 @@ def test_create_table_with_strips( table = test_catalog.create_table(identifier, table_schema_nested) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 @mock_glue @@ -111,6 +116,7 @@ def test_create_table_with_strips_bucket_root( table_strip = test_catalog.create_table(identifier, table_schema_nested) assert table_strip.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location) + assert test_catalog._parse_metadata_version(table_strip.metadata_location) == 0 @mock_glue @@ -147,6 +153,7 @@ def test_load_table( table = test_catalog.load_table(identifier) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 @mock_glue @@ -229,6 +236,7 @@ def test_rename_table( table = test_catalog.create_table(identifier, table_schema_nested) assert table.identifier == (catalog_name,) + identifier assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 test_catalog.rename_table(identifier, new_identifier) new_table = test_catalog.load_table(new_identifier) assert new_table.identifier == (catalog_name,) + new_identifier @@ -507,3 +515,36 @@ def test_passing_profile_name() -> None: mock_session.assert_called_with(**session_properties) assert test_catalog.glue is mock_session().client() + + +@mock_glue +def test_commit_table_update_schema( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + original_table_metadata = table.metadata + + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + assert original_table_metadata.current_schema_id == 0 + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) + assert new_schema + assert new_schema == update._apply() + assert new_schema.find_field("b").field_type == IntegerType() diff --git a/tests/conftest.py b/tests/conftest.py index 1197bf2feb..e54f1f54d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1586,7 +1586,7 @@ def fixture_aws_credentials() -> Generator[None, None, None]: os.environ.pop("AWS_DEFAULT_REGION") -MOTO_SERVER = ThreadedMotoServer(port=5000) +MOTO_SERVER = ThreadedMotoServer(ip_address="localhost", port=5000) def pytest_sessionfinish( @@ -1687,7 +1687,7 @@ def database_list(database_name: str) -> List[str]: TABLE_METADATA_LOCATION_REGEX = re.compile( r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/ my_iceberg_table-[a-z]{20}/metadata/ - 00000-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""", + [0-9]{5}-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""", re.X, )