Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition Evolution #245

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,63 @@ with table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column("some_field")
```

## Partition evolution

PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution)
for more details.

The API to use when evolving partitions is the `update_spec` API on the table.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
```

Updating the partition spec can also be done as part of a transaction with other operations.

```python
with table.transaction() as transaction:
with transaction.update_spec() as update_spec:
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
update_spec.add_field("event_ts", DayTransform(), "day_ts")
# ... Update properties etc
```

### Add fields

New partition fields can be added via the `add_field` API which takes in the field name to partition on,
the partition transform, and an optional partition name. If the partition name is not specified,
one will be created.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
# identity is a shortcut API for adding an IdentityTransform
update.identity("some_field")
```

### Remove fields

Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields.

```python
with table.update_spec() as update:some_partition_name
# Remove the partition field with the name
update.remove_field("some_partition_name")
```

### Rename fields

Partition fields can also be renamed via the `rename_field` API.

```python
with table.update_spec() as update:
# Rename the partition field with the name bucketed_id to sharded_id
update.rename_field("bucketed_id", "sharded_id")
```

## Table properties

Set and remove properties through the `Transaction` API:
Expand Down
131 changes: 121 additions & 10 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from abc import ABC, abstractmethod
from functools import cached_property, singledispatch
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar

from pydantic import (
BeforeValidator,
Expand All @@ -34,7 +29,18 @@
from typing_extensions import Annotated

from pyiceberg.schema import Schema
from pyiceberg.transforms import Transform, parse_transform
from pyiceberg.transforms import (
BucketTransform,
DayTransform,
HourTransform,
IdentityTransform,
Transform,
TruncateTransform,
UnknownTransform,
VoidTransform,
YearTransform,
parse_transform,
)
from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import NestedField, StructType

Expand Down Expand Up @@ -143,7 +149,7 @@ def is_unpartitioned(self) -> bool:
def last_assigned_field_id(self) -> int:
if self.fields:
return max(pf.field_id for pf in self.fields)
return PARTITION_FIELD_ID_START
return PARTITION_FIELD_ID_START - 1

@cached_property
def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
Expand Down Expand Up @@ -215,3 +221,108 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
)
)
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


T = TypeVar("T")


class PartitionSpecVisitor(Generic[T], ABC):
@abstractmethod
def identity(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit identity partition field."""

@abstractmethod
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
"""Visit bucket partition field."""

@abstractmethod
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
"""Visit truncate partition field."""

@abstractmethod
def year(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit year partition field."""

@abstractmethod
def month(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit month partition field."""

@abstractmethod
def day(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit day partition field."""

@abstractmethod
def hour(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit hour partition field."""

@abstractmethod
def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit void partition field."""

@abstractmethod
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
"""Visit unknown partition field."""
raise ValueError(f"Unknown transform is not supported: {transform}")


class _PartitionNameGenerator(PartitionSpecVisitor[str]):
def identity(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name

def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> str:
return f"{source_name}_bucket_{num_buckets}"

def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> str:
return source_name + "_trunc_" + str(width)

def year(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_year"

def month(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_month"

def day(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_day"

def hour(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_hour"

def always_null(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_null"

def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> str:
return super().unknown(field_id, source_name, source_id, transform)


R = TypeVar("R")


@singledispatch
def _visit(spec: PartitionSpec, schema: Schema, visitor: PartitionSpecVisitor[R]) -> List[R]:
return [_visit_partition_field(schema, field, visitor) for field in spec.fields]


def _visit_partition_field(schema: Schema, field: PartitionField, visitor: PartitionSpecVisitor[R]) -> R:
source_name = schema.find_column_name(field.source_id)
if not source_name:
raise ValueError(f"Could not find field with id {field.source_id}")

transform = field.transform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have: Do we also want to have a single dispatch to map these types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue for this?

if isinstance(transform, IdentityTransform):
return visitor.identity(field.field_id, source_name, field.source_id)
elif isinstance(transform, BucketTransform):
return visitor.bucket(field.field_id, source_name, field.source_id, transform.num_buckets)
elif isinstance(transform, TruncateTransform):
return visitor.truncate(field.field_id, source_name, field.source_id, transform.width)
elif isinstance(transform, DayTransform):
return visitor.day(field.field_id, source_name, field.source_id)
elif isinstance(transform, HourTransform):
return visitor.hour(field.field_id, source_name, field.source_id)
elif isinstance(transform, YearTransform):
return visitor.year(field.field_id, source_name, field.source_id)
elif isinstance(transform, VoidTransform):
return visitor.always_null(field.field_id, source_name, field.source_id)
elif isinstance(transform, UnknownTransform):
return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform))
else:
raise ValueError(f"Unknown transform {transform}")
Loading