Skip to content

Commit

Permalink
Initial partition evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Jan 20, 2024
1 parent 567ec49 commit 21dd373
Show file tree
Hide file tree
Showing 3 changed files with 575 additions and 9 deletions.
143 changes: 135 additions & 8 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@
# under the License.
from __future__ import annotations

from abc import ABC, abstractmethod
from functools import cached_property
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar

from pydantic import (
BeforeValidator,
Expand All @@ -34,7 +29,18 @@
from typing_extensions import Annotated

from pyiceberg.schema import Schema
from pyiceberg.transforms import Transform, parse_transform
from pyiceberg.transforms import (
BucketTransform,
DayTransform,
HourTransform,
IdentityTransform,
Transform,
TruncateTransform,
UnknownTransform,
VoidTransform,
YearTransform,
parse_transform,
)
from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import NestedField, StructType

Expand Down Expand Up @@ -85,6 +91,20 @@ def __str__(self) -> str:
"""Return the string representation of the PartitionField class."""
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"

def __hash__(self) -> int:
"""Return the hash of the partition field."""
return hash((self.name, self.source_id, self.field_id, repr(self.transform)))

def __eq__(self, other: Any) -> bool:
"""Return True if two partition fields are considered equal, False otherwise."""
return (
isinstance(other, PartitionField)
and other.field_id == self.field_id
and other.name == self.name
and other.source_id == self.source_id
and repr(other.transform) == repr(self.transform)
)


class PartitionSpec(IcebergBaseModel):
"""
Expand Down Expand Up @@ -215,3 +235,110 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
)
)
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


T = TypeVar("T")


class PartitionSpecVisitor(Generic[T], ABC):
@abstractmethod
def identity(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit identity partition field."""

@abstractmethod
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
"""Visit bucket partition field."""

@abstractmethod
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
"""Visit truncate partition field."""

@abstractmethod
def year(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit year partition field."""

@abstractmethod
def month(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit month partition field."""

@abstractmethod
def day(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit day partition field."""

@abstractmethod
def hour(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit hour partition field."""

@abstractmethod
def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
"""Visit void partition field."""

@abstractmethod
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
"""Visit unknown partition field."""
raise ValueError(f"Unknown transform {transform} is not supported")


class _PartitionNameGenerator(PartitionSpecVisitor[str]):
def identity(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name

def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> str:
return source_name + "_bucket_" + str(num_buckets)

def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> str:
return source_name + "_trunc_" + str(width)

def year(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_year"

def month(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_month"

def day(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_day"

def hour(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_hour"

def always_null(self, field_id: int, source_name: str, source_id: int) -> str:
return source_name + "_null"

def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> str:
return super().unknown(field_id, source_name, source_id, transform)


R = TypeVar("R")


def _visit(spec: PartitionSpec, schema: Schema, visitor: PartitionSpecVisitor[R]) -> List[R]:
results = []
for field in spec.fields:
results.append(_visit_field(schema, field, visitor))
return results


def _visit_field(schema: Schema, field: PartitionField, visitor: PartitionSpecVisitor[R]) -> R:
source_name = schema.find_column_name(field.source_id)
if not source_name:
raise ValueError(f"Could not find column with id {field.source_id}")

transform = field.transform
if isinstance(transform, IdentityTransform):
return visitor.identity(field.field_id, source_name, field.source_id)
elif isinstance(transform, BucketTransform):
return visitor.bucket(field.field_id, source_name, field.source_id, transform.num_buckets)
elif isinstance(transform, TruncateTransform):
return visitor.truncate(field.field_id, source_name, field.source_id, transform.width)
elif isinstance(transform, DayTransform):
return visitor.day(field.field_id, source_name, field.source_id)
elif isinstance(transform, HourTransform):
return visitor.hour(field.field_id, source_name, field.source_id)
elif isinstance(transform, YearTransform):
return visitor.year(field.field_id, source_name, field.source_id)
elif isinstance(transform, VoidTransform):
return visitor.always_null(field.field_id, source_name, field.source_id)
elif isinstance(transform, UnknownTransform):
return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform))
else:
raise ValueError(f"Unknown transform {transform}")
Loading

0 comments on commit 21dd373

Please sign in to comment.