Skip to content

Commit

Permalink
Glue catalog commit table (apache#140)
Browse files Browse the repository at this point in the history
* Implement table metadata updater first draft

* fix updater error and add tests

* implement _commit_table for glue

* implement apply_metadata_update which is simpler

* remove old implementation

* re-organize method place

* fix nit

* fix test

* add another test

* clear TODO

* add a combined test

* Fix merge conflict

* update table metadata merged

* implement requirements validation

* change the exception to CommitFailedException

* add docstring

* use regex to parse the metadata version

* fix lint issue

* fix CI issue

* make base_metadata optional and add null check

* make base_metadata optional and add null check

* add integration test

* default skip-archive to true and comments

* refactor tests

* add doc and fix test after merge

* make regex more robust, thanks Fokko!

* Fix review comments, thanks Patrick!
  • Loading branch information
HonahX authored and sungwy committed Jan 13, 2024
1 parent f5b4be8 commit afa482f
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 19 deletions.
46 changes: 44 additions & 2 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import logging
import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand Down Expand Up @@ -74,6 +75,17 @@
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"

TABLE_METADATA_FILE_NAME_REGEX = re.compile(
r"""
(\d+) # version number
- # separator
([\w-]{36}) # UUID (36 characters, including hyphens)
(?:\.\w+)? # optional codec name
\.metadata\.json # file extension
""",
re.X,
)


class CatalogType(Enum):
REST = "rest"
Expand Down Expand Up @@ -587,8 +599,38 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))

@staticmethod
def _get_metadata_location(location: str) -> str:
return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
def _get_metadata_location(location: str, new_version: int = 0) -> str:
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
version_str = f"{new_version:05d}"
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"

@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
"""Parse the version from the metadata location.
The version is the first part of the file name, before the first dash.
For example, the version of the metadata file
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
is 1.
If the path does not comply with the pattern, the version is defaulted to be -1, ensuring
that the next metadata file is treated as having version 0.
Args:
metadata_location (str): The location of the metadata file.
Returns:
int: The version of the metadata file. -1 if the file name does not have valid version string
"""
file_name = metadata_location.split("/")[-1]
if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
try:
uuid.UUID(file_name_match.group(2))
except ValueError:
return -1
return int(file_name_match.group(1))
else:
return -1

def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
Expand Down
113 changes: 98 additions & 15 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
ICEBERG,
LOCATION,
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
Identifier,
Properties,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
Expand All @@ -59,21 +61,40 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT


def _construct_parameters(metadata_location: str) -> Properties:
return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}


def _construct_create_table_input(table_name: str, metadata_location: str, properties: Properties) -> TableInputTypeDef:
# If Glue should skip archiving an old table version when creating a new version in a commit. By
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
# max number of archived table versions (can be increased). So for streaming use case with lots
# of commits, it is recommended to set this value to true.
GLUE_SKIP_ARCHIVE = "glue.skip-archive"
GLUE_SKIP_ARCHIVE_DEFAULT = True


def _construct_parameters(
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
) -> Properties:
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
if prev_metadata_location:
new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
return new_parameters


def _construct_table_input(
table_name: str,
metadata_location: str,
properties: Properties,
glue_table: Optional[TableTypeDef] = None,
prev_metadata_location: Optional[str] = None,
) -> TableInputTypeDef:
table_input: TableInputTypeDef = {
"Name": table_name,
"TableType": EXTERNAL_TABLE,
"Parameters": _construct_parameters(metadata_location),
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
}

if "Description" in properties:
Expand Down Expand Up @@ -177,6 +198,28 @@ def _create_glue_table(self, database_name: str, table_name: str, table_input: T
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
try:
self.glue.update_table(
DatabaseName=database_name,
TableInput=table_input,
SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
VersionId=version_id,
)
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e
except self.glue.exceptions.ConcurrentModificationException as e:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
return load_table_response["Table"]
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

def create_table(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -215,7 +258,7 @@ def create_table(
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)

table_input = _construct_create_table_input(table_name, metadata_location, properties)
table_input = _construct_table_input(table_name, metadata_location, properties)
database_name, table_name = self.identifier_to_database_and_table(identifier)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

Expand Down Expand Up @@ -247,8 +290,52 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: If the commit failed.
"""
raise NotImplementedError
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand All @@ -267,12 +354,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

return self._convert_glue_to_iceberg(load_table_response["Table"])
return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Expand Down
33 changes: 33 additions & 0 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path

# The number of tables/databases used in list_table/namespace test
Expand Down Expand Up @@ -61,6 +62,7 @@ def test_create_table(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None:
Expand All @@ -82,6 +84,7 @@ def test_create_table_with_default_location(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
Expand All @@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_na
assert table.identifier == loaded_table.identifier
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None:
Expand All @@ -126,6 +130,7 @@ def test_rename_table(
new_table_name = f"rename-{table_name}"
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert table.identifier == (CATALOG_NAME,) + identifier
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
Expand Down Expand Up @@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str)
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]


def test_commit_table_update_schema(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata

assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0

transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()

updated_table_metadata = table.metadata

assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
Loading

0 comments on commit afa482f

Please sign in to comment.