Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPLACE TABLE Support #281

Open
sungwy opened this issue Jan 18, 2024 · 11 comments · May be fixed by #433
Open

REPLACE TABLE Support #281

sungwy opened this issue Jan 18, 2024 · 11 comments · May be fixed by #433

Comments

@sungwy
Copy link
Collaborator

sungwy commented Jan 18, 2024

Feature Request / Improvement

SparkCatalog supports "REPLACE TABLE ... AS SELECT" as an atomic operation when using a SparkCatalog.

Similarly, we should introduce support for an atomic RTAS operation in PyIceberg.

Atomic table replacement creates a new snapshot with the results of the SELECT query, but keeps table history.

https://iceberg.apache.org/docs/latest/spark-ddl/#replace-table--as-select

@Fokko
Copy link
Contributor

Fokko commented Jan 19, 2024

@syun64 thanks for raising this. With the freshly merged write support, we can do:

cat = load_catalog('default')
tbl = cat.load_table('default.some_table')

tbl.overwrite(df: pa.Table)

The overwrite will create a new snapshot with the existing schema. This would only replace the data, and keep the metadata. Would that work for you? The next step would be to automatically evolve the schema as well (with flags for compatible/incompatible changes).

@nicor88
Copy link

nicor88 commented Jan 19, 2024

The next step would be to automatically evolve the schema as well (with flags for compatible/incompatible changes).

@Fokko evolving schema would be nice, also would it be possible to evolve partitions? e.g. after a specific overwrite I want to have schema evolution and possibility to replace the table adding partitions to specific columns.

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 19, 2024

Thank you for the great points @Fokko and @nicor88 .

Just like @nicor88 mentioned, I think RTAS will be slightly different from overwrite in the sense that the schema, the partitioning scheme, sort order or any of the table properties can also be updated atomically with this operation. In short, the function needs to support updating any of the arguments that are currently supported on create_table function, in addition to overwriting the Iceberg table data with the input pyarrow table.

I'm wondering if it would be better to have a separate function that achieves these goals in a single transaction?

class Table:
    ...
    def replace(
        self,
        schema: Schema,
        df: pa.Table,
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    )  -> None:
    # update table properties, partition spec, sort_order and schema
    # overwrite all data in the table with new data from df
    # commit transaction in single metadata update

@Fokko
Copy link
Contributor

Fokko commented Jan 19, 2024

Evolving schema would be nice, also would it be possible to evolve partitions? e.g. after a specific overwrite I want to have schema evolution and possibility to replace the table adding partitions to specific columns.

Schema evolution is already possible today. What's missing there is the unionByName method that will promote the schema to a new schema:

with table.update_schema() as update:
    update.union_by_name_with(new_schema)

This will promote the schema to the new schema if compatible. If you also want to allow incompatible changes, you can do:

with table.update_schema(allow_incompatible_changes=True) as update:
    update.union_by_name_with(new_schema)

By making this explicit, you make sure that you don't break the table for the downstream consumers. If you want to add partition columns in a single transaction, you can do:

with table.transaction() as tx:
    with tx.update_schema() as update:
        update.union_by_name_with(new_schema)
    with tx.update_spec() as update:
        update.add_field("dt_transaction", "transaction", DayTransform())

Note: The updating partitions is still underway (#245), but is expected to be included in 0.6.0

I'm wondering if it would be better to have a separate function that achieves these goals in a single transaction?

We could do something similar as CREATE OR REPLACE TABLE as in Spark. It isn't just a DROP/CREATE, since it will create a new schema with new field-ids and such. Please refer to the java code. This might be more applicable if you just want to replace everything, instead of evolving the existing table.

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 19, 2024

Makes sense @Fokko . Thank you very much for taking the time to lay all these options where a user may have to handle compatible or incompatible schema updates.

As you suggested, I think it would make sense to port over the Java code to support the replace operation for the case where we just want to replace the existing table with the new schema and data in a single transaction. I’ll read through the code and take a stab at the PR that will introduce the operation to PyIceberg.

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 22, 2024

In order to reduce duplication of code, would it make sense to combine the job of TypeUtil.assignFreshIds with UnionByNameVisitor?

They seem to be doing the same task of ensuring that the new schema reuses the field_id of a column that existed in the original schema. The only difference is that TypeUtil.assignFreshIds drops columns that are not in the target schema, where as UnionByNameVisitor unions the original and target schemas.

If that sounds like a good idea, #284 will be a prerequisite to building this feature.

@Fokko
Copy link
Contributor

Fokko commented Jan 22, 2024

@syun64 I started on #284 today. It re-uses the UpdateSchema API which already sets the right IDs, and maintains IDs for the existing fields. I don't think that doesn't help us because the UnionByName visitor requires an Iceberg schema, instead of a PyArrow schema.

My suggestion is to create a name-mapping from an Arrow table, that we can use in pyarrow_to_schema to correctly assign fields by name.

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 23, 2024

Hi @Fokko - sounds like you beat me to it 😄 Please let me know if you need any additional heavy lifting on #284 . Happy to help as always.

The reason I was curious if there's an opportunity to deduplicate code here, is because buildReplacement code in Java also takes Iceberg Schema as an input. It then compares the new updated schema against the existing schema to use the existing ID if the corresponding field name exists, or assign a new ID starting from the next increment from last_column_id of the table.

On second thought, I'm wondering if it would actually make sense to extend functionality of the PyArrow Schema Visitor we are planning to implement for #278 and have the schema visitor take the last_column_id and the base Schema as the input so that we assign the existing field ID if it exists, and assign a new field ID that starts from last_column_id. What are your thoughts on this idea?

@sungwy
Copy link
Collaborator Author

sungwy commented Feb 22, 2024

There's a PR in progress that will introduce 'REPLACE TABLE' support, but I don't think we've come to a consensus yet on how we would want to support 'AS SELECT' semantics in PyIceberg, and if we want to introduce it at all.

If we wanted to, I think we could update all create_table... and replace_table... operations that take something like

as_select: pa.Table = None

as an optional parameter and add a snapshot update with full table static overwrite to the new table metadata. Does that sound like a reasonable enhancement?

@Fokko
Copy link
Contributor

Fokko commented Feb 29, 2024

@syun64 Yes, that sounds like a great suggestion to me. I think it makes a lot of sense to create a table including a snapshot. I'm not sure if as_select is the best name since we just pass in a dataframe.

For most catalogs, this will be quite straightforward. For the REST catalog, there is one subtle implementation detail that we want to take into account, and that we need to leverage the staged creation. Details can be found here: https://github.com/apache/iceberg/blob/bb53c3d4e0e27ac6706803c2371793ad2476ae04/open-api/rest-catalog-open-api.yaml#L480-L492

@sungwy sungwy removed this from the PyIceberg 0.7.0 release milestone Mar 26, 2024
Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Sep 23, 2024
@ndrluis ndrluis removed the stale label Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants