Skip to content

Commit

Permalink
Partition Evolution Support
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Feb 2, 2024
1 parent 0f08806 commit f2d34f6
Show file tree
Hide file tree
Showing 4 changed files with 874 additions and 10 deletions.
57 changes: 57 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,63 @@ with table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column("some_field")
```

## Partition evolution

PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution)
for more details.

The API to use when evolving partitions is the `update_spec` API on the table.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
```

Updating the partition spec can also be done as part of a transaction with other operations.

```python
with table.transaction() as transaction:
with transaction.update_spec() as update_spec:
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
update_spec.add_field("event_ts", DayTransform(), "day_ts")
# ... Update properties etc
```

### Add fields

New partition fields can be added via the `add_field` API which takes in the field name to partition on,
the partition transform, and an optional partition name. If the partition name is not specified,
one will be created.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
# identity is a shortcut API for adding an IdentityTransform
update.identity("some_field")
```

### Remove fields

Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields.

```python
with table.update_spec() as update:some_partition_name
# Remove the partition field with the name
update.remove_field("some_partition_name")
```

### Rename fields

Partition fields can also be renamed via the `rename_field` API.

```python
with table.update_spec() as update:
# Rename the partition field with the name bucketed_id to sharded_id
update.rename_field("bucketed_id", "sharded_id")
```

## Table properties

Set and remove properties through the `Transaction` API:
Expand Down
129 changes: 120 additions & 9 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from abc import ABC, abstractmethod
from functools import cached_property, singledispatch
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar

from pydantic import (
BeforeValidator,
Expand All @@ -34,7 +29,18 @@
from typing_extensions import Annotated

from pyiceberg.schema import Schema
from pyiceberg.transforms import Transform, parse_transform
from pyiceberg.transforms import (
BucketTransform,
DayTransform,
HourTransform,
IdentityTransform,
Transform,
TruncateTransform,
UnknownTransform,
VoidTransform,
YearTransform,
parse_transform,
)
from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import NestedField, StructType

Expand Down Expand Up @@ -215,3 +221,108 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
)
)
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


T = TypeVar("T")


class PartitionSpecVisitor(Generic[T], ABC):
@abstractmethod
def identity(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit identity partition field."""

@abstractmethod
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
"""Visit bucket partition field."""

@abstractmethod
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
"""Visit truncate partition field."""

@abstractmethod
def year(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit year partition field."""

@abstractmethod
def month(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit month partition field."""

@abstractmethod
def day(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit day partition field."""

@abstractmethod
def hour(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit hour partition field."""

@abstractmethod
def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit void partition field."""

@abstractmethod
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
"""Visit unknown partition field."""
raise ValueError(f"Unknown transform is not supported: {transform}")


class _PartitionNameGenerator(PartitionSpecVisitor[str]):
def identity(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name

def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> str:
return f"{source_name}_bucket_{num_buckets}"

def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> str:
return source_name + "_trunc_" + str(width)

def year(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_year"

def month(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_month"

def day(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_day"

def hour(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_hour"

def always_null(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_null"

def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> str:
return super().unknown(field_id, source_name, source_id, transform)


R = TypeVar("R")


@singledispatch
def _visit(spec: PartitionSpec, schema: Schema, visitor: PartitionSpecVisitor[R]) -> List[R]:
return [_visit_partition_field(schema, field, visitor) for field in spec.fields]


def _visit_partition_field(schema: Schema, field: PartitionField, visitor: PartitionSpecVisitor[R]) -> R:
source_name = schema.find_column_name(field.source_id)
if not source_name:
raise ValueError(f"Could not find field with id {field.source_id}")

transform = field.transform
if isinstance(transform, IdentityTransform):
return visitor.identity(field.field_id, source_name, field.source_id)
elif isinstance(transform, BucketTransform):
return visitor.bucket(field.field_id, source_name, field.source_id, transform.num_buckets)
elif isinstance(transform, TruncateTransform):
return visitor.truncate(field.field_id, source_name, field.source_id, transform.width)
elif isinstance(transform, DayTransform):
return visitor.day(field.field_id, source_name, field.source_id)
elif isinstance(transform, HourTransform):
return visitor.hour(field.field_id, source_name, field.source_id)
elif isinstance(transform, YearTransform):
return visitor.year(field.field_id, source_name, field.source_id)
elif isinstance(transform, VoidTransform):
return visitor.always_null(field.field_id, source_name, field.source_id)
elif isinstance(transform, UnknownTransform):
return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform))
else:
raise ValueError(f"Unknown transform {transform}")
Loading

0 comments on commit f2d34f6

Please sign in to comment.