Skip to content

Commit

Permalink
Table Metadata Update: Support SetPropertiesUpdate and RemoveProperti…
Browse files Browse the repository at this point in the history
…esUpdate (#266)

* Support table properties update

* Add test for glue catalog
  • Loading branch information
HonahX authored Jan 16, 2024
1 parent d047f1b commit 3085c40
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 0 deletions.
25 changes: 25 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,31 @@ def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, context:
return TableMetadataUtil.parse_obj(updated_metadata_data)


@_apply_table_update.register(SetPropertiesUpdate)
def _(update: SetPropertiesUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if len(update.updates) == 0:
return base_metadata

properties = dict(base_metadata.properties)
properties.update(update.updates)

context.add_update(update)
return base_metadata.model_copy(update={"properties": properties})


@_apply_table_update.register(RemovePropertiesUpdate)
def _(update: RemovePropertiesUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if len(update.removals) == 0:
return base_metadata

properties = dict(base_metadata.properties)
for key in update.removals:
properties.pop(key)

context.add_update(update)
return base_metadata.model_copy(update={"properties": properties})


@_apply_table_update.register(AddSchemaUpdate)
def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if update.last_column_id < base_metadata.last_column_id:
Expand Down
17 changes: 17 additions & 0 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,20 @@ def test_commit_table_update_schema(
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()


def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
22 changes: 22 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,25 @@ def test_commit_table_update_schema(
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()


@mock_glue
def test_commit_table_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
53 changes: 53 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
AssertLastAssignedPartitionId,
AssertRefSnapshotId,
AssertTableUUID,
RemovePropertiesUpdate,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SnapshotRef,
Expand Down Expand Up @@ -529,6 +530,51 @@ def test_add_nested_list_type_column(table_v2: Table) -> None:
assert new_schema.highest_field_id == 7


def test_apply_set_properties_update(table_v2: Table) -> None:
base_metadata = table_v2.metadata

new_metadata_no_update = update_table_metadata(base_metadata, (SetPropertiesUpdate(updates={}),))
assert new_metadata_no_update == base_metadata

new_metadata = update_table_metadata(
base_metadata, (SetPropertiesUpdate(updates={"read.split.target.size": "123", "test_a": "test_a", "test_b": "test_b"}),)
)

assert base_metadata.properties == {"read.split.target.size": "134217728"}
assert new_metadata.properties == {"read.split.target.size": "123", "test_a": "test_a", "test_b": "test_b"}

new_metadata_add_only = update_table_metadata(new_metadata, (SetPropertiesUpdate(updates={"test_c": "test_c"}),))

assert new_metadata_add_only.properties == {
"read.split.target.size": "123",
"test_a": "test_a",
"test_b": "test_b",
"test_c": "test_c",
}


def test_apply_remove_properties_update(table_v2: Table) -> None:
base_metadata = update_table_metadata(
table_v2.metadata,
(SetPropertiesUpdate(updates={"test_a": "test_a", "test_b": "test_b", "test_c": "test_c", "test_d": "test_d"}),),
)

new_metadata_no_removal = update_table_metadata(base_metadata, (RemovePropertiesUpdate(removals=[]),))

assert base_metadata == new_metadata_no_removal

new_metadata = update_table_metadata(base_metadata, (RemovePropertiesUpdate(removals=["test_a", "test_c"]),))

assert base_metadata.properties == {
"read.split.target.size": "134217728",
"test_a": "test_a",
"test_b": "test_b",
"test_c": "test_c",
"test_d": "test_d",
}
assert new_metadata.properties == {"read.split.target.size": "134217728", "test_b": "test_b", "test_d": "test_d"}


def test_apply_add_schema_update(table_v2: Table) -> None:
transaction = table_v2.transaction()
update = transaction.update_schema()
Expand Down Expand Up @@ -625,6 +671,8 @@ def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
schema_update_1.add_column(path="b", field_type=IntegerType())
schema_update_1.commit()

transaction.set_properties(owner="test", test_a="test_a", test_b="test_b", test_c="test_c")

test_updates = transaction._updates # pylint: disable=W0212

new_snapshot = Snapshot(
Expand All @@ -639,6 +687,7 @@ def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:

test_updates += (
AddSnapshotUpdate(snapshot=new_snapshot),
SetPropertiesUpdate(updates={"test_a": "test_a1"}),
SetSnapshotRefUpdate(
ref_name="main",
type="branch",
Expand All @@ -647,6 +696,7 @@ def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
max_snapshot_age_ms=12312312312,
min_snapshots_to_keep=1,
),
RemovePropertiesUpdate(removals=["test_c", "test_b"]),
)

new_metadata = update_table_metadata(base_metadata, test_updates)
Expand Down Expand Up @@ -681,6 +731,9 @@ def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
max_ref_age_ms=123123123,
)

# Set/RemovePropertiesUpdate
assert new_metadata.properties == {"owner": "test", "test_a": "test_a1"}


def test_metadata_isolation_from_illegal_updates(table_v1: Table) -> None:
base_metadata = table_v1.metadata
Expand Down

0 comments on commit 3085c40

Please sign in to comment.