Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue catalog commit table #140

Merged
merged 39 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d53785a
Implement table metadata updater first draft
HonahX Nov 5, 2023
274b91b
fix updater error and add tests
HonahX Nov 5, 2023
8e8d39d
implement _commit_table for glue
HonahX Nov 6, 2023
c3e1311
implement apply_metadata_update which is simpler
HonahX Nov 11, 2023
2b7a7d1
remove old implementation
HonahX Nov 11, 2023
4fc25df
re-organize method place
HonahX Nov 11, 2023
facb43b
fix nit
HonahX Nov 11, 2023
ce4311f
Merge branch 'table_metadata_update' into table_metadata_update_and_g…
HonahX Nov 11, 2023
116c6fd
fix test
HonahX Nov 11, 2023
66a4f46
add another test
HonahX Nov 12, 2023
2882d0d
clear TODO
HonahX Nov 12, 2023
8a8d4ff
add a combined test
HonahX Nov 12, 2023
70b64d8
Merge remote-tracking branch 'origin/main' into table_metadata_update
HonahX Nov 12, 2023
1cfe9d2
Fix merge conflict
HonahX Nov 12, 2023
252dc36
Merge branch 'table_metadata_update' into table_metadata_update_and_g…
HonahX Nov 12, 2023
501e7a9
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Dec 8, 2023
7afe318
update table metadata merged
HonahX Dec 8, 2023
f769101
implement requirements validation
HonahX Dec 10, 2023
4282d37
change the exception to CommitFailedException
HonahX Dec 10, 2023
94cfc69
add docstring
HonahX Dec 10, 2023
bb58e09
Merge branch 'table_update_requirements' into table_metadata_update_a…
HonahX Dec 10, 2023
6d4efc8
use regex to parse the metadata version
HonahX Dec 11, 2023
5efb155
fix lint issue
HonahX Dec 11, 2023
413935e
fix CI issue
HonahX Dec 11, 2023
fe7da26
Merge branch 'table_update_requirements' into table_metadata_update_a…
HonahX Dec 11, 2023
e8666dc
make base_metadata optional and add null check
HonahX Dec 11, 2023
52ceaf8
make base_metadata optional and add null check
HonahX Dec 11, 2023
8dfaf93
Merge branch 'table_update_requirements' into table_metadata_update_a…
HonahX Dec 12, 2023
5a23638
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Dec 12, 2023
ccb787c
add integration test
HonahX Dec 14, 2023
8a29796
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Dec 16, 2023
5e78af9
default skip-archive to true and comments
HonahX Dec 16, 2023
93f2cec
refactor tests
HonahX Dec 16, 2023
6932cee
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Dec 20, 2023
0d38337
add doc and fix test after merge
HonahX Dec 20, 2023
0ad4909
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Dec 22, 2023
dccad75
make regex more robust, thanks Fokko!
HonahX Dec 22, 2023
b967284
Merge branch 'main' into table_metadata_update_and_glue_commit
HonahX Jan 1, 2024
bf06d26
Fix review comments, thanks Patrick!
HonahX Jan 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import logging
import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand Down Expand Up @@ -74,6 +75,8 @@
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"

TABLE_METADATA_FILE_NAME_REGEX = re.compile(r"""\d*-.*\.json""", re.X)
HonahX marked this conversation as resolved.
Show resolved Hide resolved


class CatalogType(Enum):
REST = "rest"
Expand Down Expand Up @@ -587,8 +590,34 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))

@staticmethod
def _get_metadata_location(location: str) -> str:
return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
def _get_metadata_location(location: str, new_version: int = 0) -> str:
if new_version < 0:
raise ValueError(f"Table metadata version: {new_version} cannot be negative")
Copy link
Contributor

@pdames pdames Dec 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also return a more friendly error in the off chance that new_version is None?

Suggested change
if new_version < 0:
raise ValueError(f"Table metadata version: {new_version} cannot be negative")
if new_version is None or new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Since this is an internal method, we can rely on the type check to ensure new_version is never None. I think one benefit of doing type-checking is that it reduces the number of None checks in our code. Also, I've incorporated your suggested change to the error message.

Please let me know if you think the type-check is not enough and we'd better do the None-check here. Thanks!

version_str = f"{new_version:05d}"
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"

@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
"""Parse the version from the metadata location.

The version is the first part of the file name, before the first dash.
For example, the version of the metadata file
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
is 1.
HonahX marked this conversation as resolved.
Show resolved Hide resolved
If the path does not comply with the pattern, the version is defaulted to be -1, ensuring
that the next metadata file is treated as having version 0.

Args:
metadata_location (str): The location of the metadata file.

Returns:
int: The version of the metadata file. -1 if the file name does not have valid version string
"""
file_name = metadata_location.split("/")[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also gracefully return -1 if metadata_location happens to be undefined?

Suggested change
file_name = metadata_location.split("/")[-1]
file_name = metadata_location.split("/")[-1] if metadata_location else ""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar reason as above, since this is an internal method, I think we can rely on the type-check to ensure metadata_location is not None. But please let me know if you think type-check is not enough.

if TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
return int(file_name.split("-")[0])
else:
return -1

def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
Expand Down
113 changes: 98 additions & 15 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
ICEBERG,
LOCATION,
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
Identifier,
Properties,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
Expand All @@ -59,21 +61,40 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT


def _construct_parameters(metadata_location: str) -> Properties:
return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}


def _construct_create_table_input(table_name: str, metadata_location: str, properties: Properties) -> TableInputTypeDef:
# If Glue should skip archiving an old table version when creating a new version in a commit. By
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
# max number of archived table versions (can be increased). So for streaming use case with lots
# of commits, it is recommended to set this value to true.
GLUE_SKIP_ARCHIVE = "glue.skip-archive"
GLUE_SKIP_ARCHIVE_DEFAULT = True


def _construct_parameters(
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
) -> Properties:
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
if prev_metadata_location:
new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
return new_parameters


def _construct_table_input(
table_name: str,
metadata_location: str,
properties: Properties,
glue_table: Optional[TableTypeDef] = None,
prev_metadata_location: Optional[str] = None,
) -> TableInputTypeDef:
table_input: TableInputTypeDef = {
"Name": table_name,
"TableType": EXTERNAL_TABLE,
"Parameters": _construct_parameters(metadata_location),
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
HonahX marked this conversation as resolved.
Show resolved Hide resolved
}

if "Description" in properties:
Expand Down Expand Up @@ -177,6 +198,28 @@ def _create_glue_table(self, database_name: str, table_name: str, table_input: T
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
try:
self.glue.update_table(
DatabaseName=database_name,
TableInput=table_input,
SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
VersionId=version_id,
)
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
HonahX marked this conversation as resolved.
Show resolved Hide resolved
except self.glue.exceptions.ConcurrentModificationException as e:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update"
HonahX marked this conversation as resolved.
Show resolved Hide resolved
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
return load_table_response["Table"]
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

def create_table(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -215,7 +258,7 @@ def create_table(
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)

table_input = _construct_create_table_input(table_name, metadata_location, properties)
table_input = _construct_table_input(table_name, metadata_location, properties)
database_name, table_name = self.identifier_to_database_and_table(identifier)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

Expand Down Expand Up @@ -247,8 +290,52 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons

Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: If the commit failed.
"""
raise NotImplementedError
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if glue_table_version_id is None:
HonahX marked this conversation as resolved.
Show resolved Hide resolved
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
HonahX marked this conversation as resolved.
Show resolved Hide resolved
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand All @@ -267,12 +354,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

return self._convert_glue_to_iceberg(load_table_response["Table"])
return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Expand Down
33 changes: 33 additions & 0 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path

# The number of tables/databases used in list_table/namespace test
Expand Down Expand Up @@ -61,6 +62,7 @@ def test_create_table(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None:
Expand All @@ -82,6 +84,7 @@ def test_create_table_with_default_location(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
Expand All @@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_na
assert table.identifier == loaded_table.identifier
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None:
Expand All @@ -126,6 +130,7 @@ def test_rename_table(
new_table_name = f"rename-{table_name}"
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert table.identifier == (CATALOG_NAME,) + identifier
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
Expand Down Expand Up @@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str)
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]


def test_commit_table_update_schema(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata

assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0

transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()

updated_table_metadata = table.metadata

assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
41 changes: 41 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX


Expand All @@ -45,6 +46,7 @@ def test_create_table_with_database_location(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_glue
Expand All @@ -58,6 +60,7 @@ def test_create_table_with_default_warehouse(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_glue
Expand All @@ -73,6 +76,7 @@ def test_create_table_with_given_location(
)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_glue
Expand All @@ -98,6 +102,7 @@ def test_create_table_with_strips(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_glue
Expand All @@ -111,6 +116,7 @@ def test_create_table_with_strips_bucket_root(
table_strip = test_catalog.create_table(identifier, table_schema_nested)
assert table_strip.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location)
assert test_catalog._parse_metadata_version(table_strip.metadata_location) == 0


@mock_glue
Expand Down Expand Up @@ -147,6 +153,7 @@ def test_load_table(
table = test_catalog.load_table(identifier)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_glue
Expand Down Expand Up @@ -229,6 +236,7 @@ def test_rename_table(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.identifier == (catalog_name,) + new_identifier
Expand Down Expand Up @@ -507,3 +515,36 @@ def test_passing_profile_name() -> None:

mock_session.assert_called_with(**session_properties)
assert test_catalog.glue is mock_session().client()


@mock_glue
def test_commit_table_update_schema(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata

assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0

transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()

updated_table_metadata = table.metadata

assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
Loading