Skip to content

Commit

Permalink
Support snapshot management operations like creating tags by adding `…
Browse files Browse the repository at this point in the history
…ManageSnapshots` API (#728)
  • Loading branch information
chinmay-bhat authored Jun 15, 2024
1 parent c579e9f commit 1dde51a
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 0 deletions.
47 changes: 47 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,50 @@
(array(), map(), array(struct(1)))
"""
)

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_table_snapshot_operations (
number integer
)
USING iceberg
TBLPROPERTIES (
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (1)
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (2)
"""
)

spark.sql(
f"""
DELETE FROM {catalog_name}.default.test_table_snapshot_operations
WHERE number = 2
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (3)
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (4)
"""
)
22 changes: 22 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,28 @@ tbl.overwrite(df, snapshot_properties={"abc": "def"})
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Snapshot Management

Manage snapshots with operations through the `Table` API:

```python
# To run a specific operation
table.manage_snapshots().create_tag(snapshot_id, "tag123").commit()
# To run multiple operations
table.manage_snapshots()
.create_tag(snapshot_id1, "tag123")
.create_tag(snapshot_id2, "tag456")
.commit()
# Operations are applied on commit.
```

You can also use context managers to make more changes:

```python
with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
176 changes: 176 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
Expand Down Expand Up @@ -351,6 +352,88 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
updates = properties or kwargs
return self._apply((SetPropertiesUpdate(updates=updates),))

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def add_snapshot(self, snapshot: Snapshot) -> Transaction:
"""Add a new snapshot to the table.
Returns:
The transaction with the add-snapshot staged.
"""
updates = (AddSnapshotUpdate(snapshot=snapshot),)

return self._apply(updates, ())

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def set_ref_snapshot(
self,
snapshot_id: int,
parent_snapshot_id: Optional[int],
ref_name: str,
type: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> Transaction:
"""Update a ref to a snapshot.
Returns:
The transaction with the set-snapshot-ref staged
"""
updates = (
SetSnapshotRefUpdate(
snapshot_id=snapshot_id,
ref_name=ref_name,
type=type,
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
),
)

requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),)
return self._apply(updates, requirements)

def _set_ref_snapshot(
self,
snapshot_id: int,
ref_name: str,
type: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> UpdatesAndRequirements:
"""Update a ref to a snapshot.
Returns:
The updates and requirements for the set-snapshot-ref staged
"""
updates = (
SetSnapshotRefUpdate(
snapshot_id=snapshot_id,
ref_name=ref_name,
type=type,
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
),
)
requirements = (
AssertRefSnapshotId(
snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
ref=ref_name,
),
)

return updates, requirements

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.
Expand Down Expand Up @@ -1323,6 +1406,21 @@ def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log

def manage_snapshots(self) -> ManageSnapshots:
"""
Shorthand to run snapshot management operations like create branch, create tag, etc.
Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example,
with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.
Expand Down Expand Up @@ -1835,6 +1933,84 @@ def __enter__(self) -> U:
return self # type: ignore


class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
"""
Run snapshot management operations using APIs.
APIs include create branch, create tag, etc.
Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example,
with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""

_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
return self._updates, self._requirements

def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
"""
Create a new tag pointing to the given snapshot id.
Args:
snapshot_id (int): snapshot id of the existing snapshot to tag
tag_name (str): name of the tag
max_ref_age_ms (Optional[int]): max ref age in milliseconds
Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=tag_name,
type="tag",
max_ref_age_ms=max_ref_age_ms,
)
self._updates += update
self._requirements += requirement
return self

def create_branch(
self,
snapshot_id: int,
branch_name: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> ManageSnapshots:
"""
Create a new branch pointing to the given snapshot id.
Args:
snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
branch_name (str): name of the new branch
max_ref_age_ms (Optional[int]): max ref age in milliseconds
max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
min_snapshots_to_keep (Optional[int]): min number of snapshots to keep in milliseconds
Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=branch_name,
type="branch",
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
)
self._updates += update
self._requirements += requirement
return self


class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
_schema: Schema
_last_column_id: itertools.count[int]
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.table.refs import SnapshotRef


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 3
tag_snapshot_id = tbl.history()[-3].snapshot_id
tbl.manage_snapshots().create_tag(snapshot_id=tag_snapshot_id, tag_name="tag123").commit()
assert tbl.metadata.refs["tag123"] == SnapshotRef(snapshot_id=tag_snapshot_id, snapshot_ref_type="tag")


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 2
branch_snapshot_id = tbl.history()[-2].snapshot_id
tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name="branch123").commit()
assert tbl.metadata.refs["branch123"] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch")
24 changes: 24 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,30 @@ def test_update_metadata_add_snapshot(table_v2: Table) -> None:
assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms


def test_update_metadata_set_ref_snapshot(table_v2: Table) -> None:
update, _ = table_v2.transaction()._set_ref_snapshot(
snapshot_id=3051729675574597004,
ref_name="main",
type="branch",
max_ref_age_ms=123123123,
max_snapshot_age_ms=12312312312,
min_snapshots_to_keep=1,
)

new_metadata = update_table_metadata(table_v2.metadata, update)
assert len(new_metadata.snapshot_log) == 3
assert new_metadata.snapshot_log[2].snapshot_id == 3051729675574597004
assert new_metadata.current_snapshot_id == 3051729675574597004
assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms
assert new_metadata.refs["main"] == SnapshotRef(
snapshot_id=3051729675574597004,
snapshot_ref_type="branch",
min_snapshots_to_keep=1,
max_snapshot_age_ms=12312312312,
max_ref_age_ms=123123123,
)


def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None:
update = SetSnapshotRefUpdate(
ref_name="main",
Expand Down

0 comments on commit 1dde51a

Please sign in to comment.