Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
omkenge authored Nov 5, 2024
2 parents 6240f4c + e771190 commit 2364936
Show file tree
Hide file tree
Showing 19 changed files with 647 additions and 645 deletions.
8 changes: 4 additions & 4 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ catalog:

<!-- prettier-ignore-start -->

!!! warning "Deprecated Properties"
`profile_name`, `region_name`, `botocore_session`, `aws_access_key_id`, `aws_secret_access_key`, `aws_session_token` are deprecated and will be removed in 0.8.0:
!!! warning "Removed Properties"
The properties `profile_name`, `region_name`, `botocore_session`, `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token` were deprecated and removed in 0.8.0

<!-- prettier-ignore-end -->

Expand Down Expand Up @@ -396,8 +396,8 @@ catalog:

<!-- prettier-ignore-start -->

!!! warning "Deprecated Properties"
`profile_name`, `region_name`, `botocore_session`, `aws_access_key_id`, `aws_secret_access_key`, `aws_session_token` are deprecated and will be removed in 0.8.0:
!!! warning "Removed Properties"
The properties `profile_name`, `region_name`, `botocore_session`, `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token` were deprecated and removed in 0.8.0

<!-- prettier-ignore-end -->

Expand Down
2 changes: 1 addition & 1 deletion mkdocs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ mkdocstrings-python==1.11.1
mkdocs-literate-nav==0.6.1
mkdocs-autorefs==1.2.0
mkdocs-gen-files==0.5.0
mkdocs-material==9.5.36
mkdocs-material==9.5.42
mkdocs-material-extensions==1.3.1
mkdocs-section-index==0.3.9
776 changes: 397 additions & 379 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyiceberg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.

__version__ = "0.7.1"
__version__ = "0.8.0"
25 changes: 1 addition & 24 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,6 @@
re.X,
)

DEPRECATED_PROFILE_NAME = "profile_name"
DEPRECATED_REGION = "region_name"
DEPRECATED_BOTOCORE_SESSION = "botocore_session"
DEPRECATED_ACCESS_KEY_ID = "aws_access_key_id"
DEPRECATED_SECRET_ACCESS_KEY = "aws_secret_access_key"
DEPRECATED_SESSION_TOKEN = "aws_session_token"
DEPRECATED_PROPERTY_NAMES = {
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
}


class CatalogType(Enum):
REST = "rest"
Expand Down Expand Up @@ -794,14 +779,6 @@ class MetastoreCatalog(Catalog, ABC):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

for property_name in DEPRECATED_PROPERTY_NAMES:
if self.properties.get(property_name):
deprecation_message(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message=f"The property {property_name} is deprecated. Please use properties that start with client., glue., and dynamo. instead",
)

def create_table_transaction(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -1011,4 +988,4 @@ def _empty_table_metadata() -> TableMetadata:
Returns:
TableMetadata: An empty TableMetadata instance.
"""
return TableMetadataV1(location="", last_column_id=-1, schema=Schema())
return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())
23 changes: 5 additions & 18 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@
import boto3

from pyiceberg.catalog import (
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
ICEBERG,
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
Expand Down Expand Up @@ -102,18 +96,11 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

session = boto3.Session(
profile_name=get_first_property_value(properties, DYNAMODB_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(
properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=get_first_property_value(
properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=get_first_property_value(
properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
)
self.dynamodb = session.client(DYNAMODB_CLIENT)
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
Expand Down
25 changes: 6 additions & 19 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@
)

from pyiceberg.catalog import (
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
EXTERNAL_TABLE,
ICEBERG,
LOCATION,
Expand Down Expand Up @@ -303,18 +297,11 @@ def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)

session = boto3.Session(
profile_name=get_first_property_value(properties, GLUE_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(
properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=get_first_property_value(
properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=get_first_property_value(
properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
profile_name=properties.get(GLUE_PROFILE_NAME),
region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION),
aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN),
)
self.glue: GlueClient = session.client("glue", endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT))

Expand Down Expand Up @@ -784,4 +771,4 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:

@staticmethod
def __is_iceberg_table(table: TableTypeDef) -> bool:
return table["Parameters"] is not None and table["Parameters"][TABLE_TYPE].lower() == ICEBERG
return table.get("Parameters", {}).get("table_type", "").lower() == ICEBERG
36 changes: 16 additions & 20 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
)
from urllib.parse import urlparse

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
Expand Down Expand Up @@ -812,7 +811,17 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
all_chunks = positional_deletes[0]
else:
all_chunks = pa.chunked_array(itertools.chain(*[arr.chunks for arr in positional_deletes]))
return np.subtract(np.setdiff1d(np.arange(start_index, end_index), all_chunks, assume_unique=False), start_index)

# Create the full range array with pyarrow
full_range = pa.array(range(start_index, end_index))
# When available, replace with Arrow generator to improve performance
# See https://github.com/apache/iceberg-python/issues/1271 for details

# Filter out values in all_chunks from full_range
result = pc.filter(full_range, pc.invert(pc.is_in(full_range, value_set=all_chunks)))

# Subtract the start_index from each element in the result
return pc.subtract(result, pa.scalar(start_index))


def pyarrow_to_schema(
Expand Down Expand Up @@ -1679,23 +1688,6 @@ def project_batches(
total_row_count += len(batch)


@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="The public API for 'to_requested_schema' is deprecated and is replaced by '_to_requested_schema'",
)
def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table:
struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema))

arrays = []
fields = []
for pos, field in enumerate(requested_schema.fields):
array = struct_array.field(pos)
arrays.append(array)
fields.append(pa.field(field.name, array.type, field.optional))
return pa.Table.from_arrays(arrays, schema=pa.schema(fields))


def _to_requested_schema(
requested_schema: Schema,
file_schema: Schema,
Expand Down Expand Up @@ -1807,7 +1799,11 @@ def struct(
else:
raise ResolveError(f"Field is required, and could not be found in the file: {field}")

return pa.StructArray.from_arrays(arrays=field_arrays, fields=pa.struct(fields))
return pa.StructArray.from_arrays(
arrays=field_arrays,
fields=pa.struct(fields),
mask=struct_array.is_null() if isinstance(struct_array, pa.StructArray) else None,
)

def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]:
return field_array
Expand Down
71 changes: 13 additions & 58 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataclasses import dataclass
from functools import cached_property
from itertools import chain
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -33,6 +34,7 @@
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
Expand Down Expand Up @@ -87,7 +89,6 @@
from pyiceberg.table.update import (
AddPartitionSpecUpdate,
AddSchemaUpdate,
AddSnapshotUpdate,
AddSortOrderUpdate,
AssertCreate,
AssertRefSnapshotId,
Expand Down Expand Up @@ -237,9 +238,12 @@ def __enter__(self) -> Transaction:
"""Start a transaction to update the table."""
return self

def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Close and commit the transaction."""
self.commit_transaction()
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Close and commit the transaction if no exceptions have been raised."""
if exctype is None and excinst is None and exctb is None:
self.commit_transaction()

def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
"""Check if the requirements are met, and applies the updates to the metadata."""
Expand Down Expand Up @@ -309,55 +313,6 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
updates = properties or kwargs
return self._apply((SetPropertiesUpdate(updates=updates),))

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def add_snapshot(self, snapshot: Snapshot) -> Transaction:
"""Add a new snapshot to the table.
Returns:
The transaction with the add-snapshot staged.
"""
updates = (AddSnapshotUpdate(snapshot=snapshot),)

return self._apply(updates, ())

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def set_ref_snapshot(
self,
snapshot_id: int,
parent_snapshot_id: Optional[int],
ref_name: str,
type: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> Transaction:
"""Update a ref to a snapshot.
Returns:
The transaction with the set-snapshot-ref staged
"""
updates = (
SetSnapshotRefUpdate(
snapshot_id=snapshot_id,
ref_name=ref_name,
type=type,
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
),
)

requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),)
return self._apply(updates, requirements)

def _set_ref_snapshot(
self,
snapshot_id: int,
Expand Down Expand Up @@ -698,22 +653,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:

schema: Schema = table_metadata.schema()
self._updates += (
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True),
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
)

spec: PartitionSpec = table_metadata.spec()
if spec.is_unpartitioned():
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
else:
self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=spec),)
self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
if sort_order is None or sort_order.is_unsorted:
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
else:
self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

self._updates += (
Expand Down
16 changes: 16 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,5 +587,21 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata:
else:
raise ValidationError(f"Unknown format version: {format_version}")

@staticmethod
def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadata:
"""Construct table metadata from an existing table without performing validation.
This method is useful during a sequence of table updates when the model needs to be re-constructed but is not yet ready for validation.
"""
if table_metadata.format_version is None:
raise ValidationError(f"Missing format-version in TableMetadata: {table_metadata}")

if table_metadata.format_version == 1:
return TableMetadataV1.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 2:
return TableMetadataV2.model_construct(**dict(table_metadata))
else:
raise ValidationError(f"Unknown format version: {table_metadata.format_version}")


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")] # type: ignore
6 changes: 5 additions & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import time
import warnings
from collections import defaultdict
from enum import Enum
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional
Expand Down Expand Up @@ -182,7 +183,10 @@ class Summary(IcebergBaseModel, Mapping[str, str]):
operation: Operation = Field()
_additional_properties: Dict[str, str] = PrivateAttr()

def __init__(self, operation: Operation, **data: Any) -> None:
def __init__(self, operation: Optional[Operation] = None, **data: Any) -> None:
if operation is None:
warnings.warn("Encountered invalid snapshot summary: operation is missing, defaulting to overwrite")
operation = Operation.OVERWRITE
super().__init__(operation=operation, **data)
self._additional_properties = data

Expand Down
Loading

0 comments on commit 2364936

Please sign in to comment.