Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for REPLACE TABLE operation #433

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ac59bd6
Method to generate IDs based on schema
anupam-saini Feb 15, 2024
efc71de
Support generating IDs based on existing schema
anupam-saini Feb 15, 2024
e1f81f0
Merge branch 'main' into as-replace-table-as-select
anupam-saini Feb 15, 2024
359b8ec
Fix bug
anupam-saini Feb 15, 2024
5a9f078
Add API for replacing table
anupam-saini Feb 15, 2024
b344516
Cleanup
anupam-saini Feb 15, 2024
24fcda4
Cleanup
anupam-saini Feb 15, 2024
93587fb
Fix bug
anupam-saini Feb 15, 2024
b1ea25c
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Feb 16, 2024
ec36946
Catalog API for create or replace table
anupam-saini Feb 16, 2024
2e6b8ec
Merge branch 'as-replace-table-as-select' of github.com:anupam-saini/…
anupam-saini Feb 16, 2024
34f559a
Fix bug
anupam-saini Feb 16, 2024
bb7a591
Remove visiting_schema param
anupam-saini Feb 16, 2024
1f397a8
Fix bug
anupam-saini Feb 16, 2024
e25366e
Cleanup
anupam-saini Feb 16, 2024
38f4855
Merge branch 'main' into as-replace-table-as-select
anupam-saini Feb 19, 2024
d644906
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Feb 20, 2024
24b6b9f
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Feb 21, 2024
aa34637
Remove poetry.lock from PR
anupam-saini Feb 21, 2024
6b33671
Add more functionality and test
anupam-saini Feb 21, 2024
0326af3
Merge branch 'as-replace-table-as-select' of github.com:anupam-saini/…
anupam-saini Feb 21, 2024
8d25dda
Add TODO note for sort order update
anupam-saini Feb 21, 2024
53f2d2c
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Feb 25, 2024
db62f56
Merge branch 'main' into as-replace-table-as-select
Fokko Feb 29, 2024
f9ea0cf
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Feb 29, 2024
9eea538
Add partition spec and sort order update
anupam-saini Mar 1, 2024
d7dd4ae
Merge branch 'main' into as-replace-table-as-select
anupam-saini Mar 1, 2024
ab9ddf2
Merge branch 'main' into as-replace-table-as-select
anupam-saini Mar 19, 2024
a8eb07a
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Mar 24, 2024
b54b357
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Mar 31, 2024
044896d
Merge branch 'apache:main' into as-replace-table-as-select
anupam-saini Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,34 @@ catalog.create_table(
)
```

To create or replace a table if it already exists:

```python
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
DoubleType,
StringType,
NestedField,
)

catalog = load_catalog("default")

schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_or_replace_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
)
```

If the table with the specified identifier already exist then the table metadata will be updated with the provided parameters (schema, location, etc.).

## Load a table

### Catalog table
Expand Down
95 changes: 92 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,25 @@
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
AddPartitionSpecUpdate,
AddSchemaUpdate,
AddSortOrderUpdate,
AssertTableUUID,
CommitTableRequest,
CommitTableResponse,
SetCurrentSchemaUpdate,
SetDefaultSortOrderUpdate,
SetDefaultSpecUpdate,
Table,
TableMetadata,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.typedef import (
EMPTY_DICT,
Identifier,
Expand Down Expand Up @@ -646,6 +655,47 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
delete_files(io, {table.metadata_location}, METADATA)

def create_or_replace_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Create a new table or replace an existing one. Replacing the table reatains the table metadata history.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.

Returns:
Table: the new table instance.
"""
try:
return self._replace_table(
identifier=identifier,
new_schema=schema,
new_location=location,
new_partition_spec=partition_spec,
new_sort_order=sort_order,
new_properties=properties,
)
except NoSuchTableError:
return self.create_table(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)

@staticmethod
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
Expand Down Expand Up @@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary(

return properties_update_summary, updated_properties

def _replace_table(
self,
identifier: Union[str, Identifier],
new_schema: Union[Schema, "pa.Schema"],
new_partition_spec: PartitionSpec,
new_sort_order: SortOrder,
new_properties: Properties,
new_location: Optional[str] = None,
) -> Table:
table = self.load_table(identifier)
with table.transaction() as tx:
base_schema = table.schema()
new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consider the following:

CREATE TABLE default.t1 (name string);

Results in:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710407936565,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}
CREATE OR REPLACE TABLE default.t1 (name string, age int);

The second schema is added:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710407992389,
  "last-column-id" : 2,
  "current-schema-id" : 1,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  }, {
    "type" : "struct",
    "schema-id" : 1,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710407936565,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
  } ]
}

And then go back to the original schema:

CREATE OR REPLACE TABLE default.t1 (name string);

You'll see that no new schema is being added:

{
  "format-version" : 2,
  "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
  "location" : "s3://warehouse/default/t1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710408026710,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  }, {
    "type" : "struct",
    "schema-id" : 1,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710407936565,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
  }, {
    "timestamp-ms" : 1710407992389,
    "metadata-file" : "s3://warehouse/default/t1/metadata/00001-00192fa8-9019-481a-b4da-ebd99d11eeb9.metadata.json"
  } ]
}

What do you think of re-using the update_schema() class:

with table.transaction() as transaction:
    with transaction.update_schema(allow_incompatible_changes=True) as update_schema:
        # Remove old fields
        removed_column_names = base_schema._name_to_id().keys() - schema._name_to_id().keys()
        for removed_column_name in removed_column_names:
            update_schema.delete_column(removed_column_name)
        # Add new and evolve existing fields
        update_schema.union_by_name(schema)

^ Pseudocode, could be cleaner. Ideally, the removal should be done with a visit_with_partner (that's the opposite of the union_by_name.

Copy link
Contributor Author

@anupam-saini anupam-saini Mar 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Fokko for this detailed explanation.
But we also need to cover the step 2 of this example where we add a new schema, right?

So from my understanding, if the schema fields match with an old schema in the metadata, we do union_by_name with the old schema and set it as the current one
Else, we add the new schema.
Is this correct assessment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct!

new_sort_order = assign_fresh_sort_order_ids(
sort_order=new_sort_order,
old_schema=base_schema,
fresh_schema=new_schema,
sort_order_id=table.sort_order().order_id + 1,
)
new_partition_spec = assign_fresh_partition_spec_ids(
spec=new_partition_spec, old_schema=base_schema, fresh_schema=new_schema, spec_id=table.spec().spec_id + 1
)

requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),)
updates: Tuple[TableUpdate, ...] = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clear the snapshots here as well:

CREATE TABLE default.t3 AS SELECT 'Fokko' as name
{
  "format-version" : 2,
  "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
  "location" : "s3://warehouse/default/t3",
  "last-sequence-number" : 1,
  "last-updated-ms" : 1710409653861,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "created-at" : "2024-03-14T09:47:10.455199504Z",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 3622792816294171432,
    "timestamp-ms" : 1710409631964,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1710405058122",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "416",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "416",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
    "schema-id" : 0
  } ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409631964,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
  } ]
}
CREATE OR REPLACE TABLE default.t3 (name string);
{
  "format-version" : 2,
  "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
  "location" : "s3://warehouse/default/t3",
  "last-sequence-number" : 1,
  "last-updated-ms" : 1710411760623,
  "last-column-id" : 1,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "created-at" : "2024-03-14T09:47:10.455199504Z",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 3622792816294171432,
    "timestamp-ms" : 1710409631964,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "local-1710405058122",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "416",
      "changed-partition-count" : "1",
      "total-records" : "1",
      "total-files-size" : "416",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
    "schema-id" : 0
  } ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409631964,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
  }, {
    "timestamp-ms" : 1710409653861,
    "metadata-file" : "s3://warehouse/default/t3/metadata/00001-0297d4e7-2468-4c0d-b4ed-ea717df8c3e6.metadata.json"
  } ]
}

AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
AddSortOrderUpdate(sort_order=new_sort_order),
SetDefaultSortOrderUpdate(sort_order_id=-1),
AddPartitionSpecUpdate(spec=new_partition_spec),
SetDefaultSpecUpdate(spec_id=-1),
Comment on lines +799 to +800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes here, the spec is being re-used:

CREATE TABLE default.t2 (name string, age int) PARTITIONED BY (name);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409060360,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}
CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name, age);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409079414,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 1,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  }, {
    "spec-id" : 1,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    }, {
      "name" : "age",
      "transform" : "identity",
      "source-id" : 2,
      "field-id" : 1001
    } ]
  } ],
  "last-partition-id" : 1001,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409060360,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
  } ]
}
CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name);
{
  "format-version" : 2,
  "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
  "location" : "s3://warehouse/default/t2",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1710409086268,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "age",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    } ]
  }, {
    "spec-id" : 1,
    "fields" : [ {
      "name" : "name",
      "transform" : "identity",
      "source-id" : 1,
      "field-id" : 1000
    }, {
      "name" : "age",
      "transform" : "identity",
      "source-id" : 2,
      "field-id" : 1001
    } ]
  } ],
  "last-partition-id" : 1001,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ {
    "timestamp-ms" : 1710409060360,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
  }, {
    "timestamp-ms" : 1710409079414,
    "metadata-file" : "s3://warehouse/default/t2/metadata/00001-8062294f-a8d6-493d-905f-b82dfe01cb29.metadata.json"
  } ]
}

Ideally, we also want to-reuse the update_spec class here.

)
tx._apply(updates, requirements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #471 in this is not necessary anymore! 🥳

Suggested change
tx._apply(updates, requirements)
tx._apply(updates, requirements)


tx.set_properties(**new_properties)
if new_location is not None:
tx.update_location(new_location)
return table

def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
2 changes: 1 addition & 1 deletion pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def create_table(
properties: Properties = EMPTY_DICT,
) -> Table:
iceberg_schema = self._convert_schema_if_needed(schema)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
fresh_schema = assign_fresh_schema_ids(schema_or_type=iceberg_schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)


def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fresh_schema: Schema) -> PartitionSpec:
def assign_fresh_partition_spec_ids(
spec: PartitionSpec, old_schema: Schema, fresh_schema: Schema, spec_id: int = INITIAL_PARTITION_SPEC_ID
) -> PartitionSpec:
partition_fields = []
for pos, field in enumerate(spec.fields):
original_column_name = old_schema.find_column_name(field.source_id)
Expand All @@ -259,7 +261,7 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
transform=field.transform,
)
)
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
return PartitionSpec(*partition_fields, spec_id=spec_id)


T = TypeVar("T")
Expand Down
60 changes: 42 additions & 18 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,34 +1225,58 @@ def build_position_accessors(schema_or_type: Union[Schema, IcebergType]) -> Dict
return visit(schema_or_type, _BuildPositionAccessors())


def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id: Optional[Callable[[], int]] = None) -> Schema:
"""Traverses the schema, and sets new IDs."""
return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id))
def assign_fresh_schema_ids(
schema_or_type: Union[Schema, IcebergType], base_schema: Optional[Schema] = None, next_id: Optional[Callable[[], int]] = None
) -> Schema:
"""Traverse the schema and assign IDs from the base_schema, or the next_id function."""
visiting_schema = schema_or_type if isinstance(schema_or_type, Schema) else None
return pre_order_visit(
schema_or_type, _SetFreshIDs(visiting_schema=visiting_schema, base_schema=base_schema, next_id_func=next_id)
)


class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
"""Traverses the schema and assigns monotonically increasing ids."""

old_id_to_new_id: Dict[int, int]

def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None:
self.old_id_to_new_id = {}
counter = itertools.count(1)
"""Assign IDs from the base_schema, or generate fresh IDs from either the next_id function or monotonically increasing IDs starting from schema's highest ID."""

name_to_id: Dict[str, int]

def __init__(
self,
visiting_schema: Optional[Schema] = None,
base_schema: Optional[Schema] = None,
next_id_func: Optional[Callable[[], int]] = None,
) -> None:
self.name_to_id = {}
self.visiting_schema = visiting_schema
self.base_schema = base_schema
counter = itertools.count(1 + (base_schema.highest_field_id if base_schema is not None else 0))
self.next_id_func = next_id_func if next_id_func is not None else lambda: next(counter)

def _get_and_increment(self, current_id: int) -> int:
new_id = self.next_id_func()
self.old_id_to_new_id[current_id] = new_id
def _generate_id(self, field_name: Optional[str] = None) -> int:
field = None
if self.base_schema is not None and field_name is not None:
try:
field = self.base_schema.find_field(field_name)
except ValueError:
pass # field not found, generate new ID below

new_id = field.field_id if field is not None else self.next_id_func()
if field_name is not None:
self.name_to_id[field_name] = new_id
return new_id

def _name(self, id: int) -> Optional[str]:
return self.visiting_schema.find_column_name(id) if self.visiting_schema is not None else None

def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
return Schema(
*struct_result().fields,
identifier_field_ids=[self.old_id_to_new_id[field_id] for field_id in schema.identifier_field_ids],
identifier_field_ids=[self.name_to_id[field_name] for field_name in schema.identifier_field_names()],
schema_id=self.base_schema.schema_id + 1 if self.base_schema is not None else INITIAL_SCHEMA_ID,
)

def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
new_ids = [self._get_and_increment(field.field_id) for field in struct.fields]
new_ids = [self._generate_id(self._name(field.field_id)) for field in struct.fields]
new_fields = []
for field_id, field, field_type in zip(new_ids, struct.fields, field_results):
new_fields.append(
Expand All @@ -1270,16 +1294,16 @@ def field(self, field: NestedField, field_result: Callable[[], IcebergType]) ->
return field_result()

def list(self, list_type: ListType, element_result: Callable[[], IcebergType]) -> ListType:
element_id = self._get_and_increment(list_type.element_id)
element_id = self._generate_id(self._name(list_type.element_id))
return ListType(
element_id=element_id,
element=element_result(),
element_required=list_type.element_required,
)

def map(self, map_type: MapType, key_result: Callable[[], IcebergType], value_result: Callable[[], IcebergType]) -> MapType:
key_id = self._get_and_increment(map_type.key_id)
value_id = self._get_and_increment(map_type.value_id)
key_id = self._generate_id(self._name(map_type.key_id))
value_id = self._generate_id(self._name(map_type.value_id))
return MapType(
key_id=key_id,
key_type=key_result(),
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ def add_column(
self._added_name_to_id[full_name] = new_id
self._id_to_parent[new_id] = parent_full_path

new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id)
new_type = assign_fresh_schema_ids(schema_or_type=field_type, next_id=self.assign_new_column_id)
field = NestedField(field_id=new_id, name=name, field_type=new_type, required=required, doc=doc)

if parent_id in self._adds:
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def new_table_metadata(
) -> TableMetadata:
from pyiceberg.table import TableProperties

fresh_schema = assign_fresh_schema_ids(schema)
fresh_schema = assign_fresh_schema_ids(schema_or_type=schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)

Expand Down
51 changes: 48 additions & 3 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@
SetCurrentSchemaUpdate,
Table,
TableIdentifier,
UpdateSchema,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import IntegerType, LongType, NestedField
from pyiceberg.types import IntegerType, LongType, NestedField, StringType, StructType


class InMemoryCatalog(Catalog):
Expand Down Expand Up @@ -101,6 +102,7 @@ def create_table(
"last-updated-ms": 1602638573874,
"last-column-id": schema.highest_field_id,
"schema": schema.model_dump(),
"current_schema_id": schema.schema_id,
"partition-spec": partition_spec.model_dump()["fields"],
"properties": properties,
"current-snapshot-id": -1,
Expand Down Expand Up @@ -667,6 +669,49 @@ def test_catalog_repr(catalog: InMemoryCatalog) -> None:
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"


def test_catalog_create_or_replace_table(catalog: InMemoryCatalog, table_schema_nested: Schema) -> None:
# Given
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=table_schema_nested,
location=TEST_TABLE_LOCATION,
partition_spec=PartitionSpec(PartitionField(name="foo", transform=IdentityTransform(), source_id=1, field_id=1000)),
properties=TEST_TABLE_PROPERTIES,
)
highest_field_id = table_schema_nested.highest_field_id
new_schema = (
UpdateSchema(transaction=None, schema=table_schema_nested) # type: ignore
.update_column("bar", LongType())
.add_column(
"another_person",
field_type=StructType(
NestedField(field_id=highest_field_id + 2, name="name", field_type=StringType(), required=False),
NestedField(field_id=highest_field_id + 3, name="age", field_type=LongType(), required=True),
),
)
._apply()
)
new_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
new_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
)
new_properties = TEST_TABLE_PROPERTIES
new_properties["key3"] = "value3"
# When
new_table = catalog.create_or_replace_table(
identifier=table.identifier,
schema=new_schema,
partition_spec=new_spec,
sort_order=new_sort_order,
properties=new_properties,
)
# Then
assert new_table.schema() == new_schema
assert new_table.spec() == new_spec
assert new_table.sort_order() == new_sort_order
assert new_table.properties == new_properties


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
# table properties can be set to int, but still serialized to string
property_with_int = {"property_name": 42}
Expand Down