From 664e113fd703aa63fc6eac16d760da4fa5e41b09 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 16 Jan 2024 13:20:38 +0100 Subject: [PATCH] Move to fast-appends --- mkdocs/docs/api.md | 98 +++++++++++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 35 +++---------- 2 files changed, 104 insertions(+), 29 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 517e52f185..9d97d4f676 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata( The static-table is considered read-only. +## Write support + +With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) +``` + +Next, create a table based on the schema: + +```python +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("default") + +from pyiceberg.schema import Schema +from pyiceberg.types import NestedField, StringType, DoubleType + +schema = Schema( + NestedField(1, "city", StringType(), required=False), + NestedField(2, "lat", DoubleType(), required=False), + NestedField(3, "long", DoubleType(), required=False), +) + +tbl = catalog.create_table("default.cities", schema=schema) +``` + +Now write the data to the table: + + + +!!! note inline end "Fast append" + PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads. + + + +```python +tbl.append(df) + +# or + +tbl.overwrite(df) +``` + +The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`: + +``` +pyarrow.Table +city: string +lat: double +long: double +---- +city: [["Amsterdam","San Francisco","Drachten","Paris"]] +lat: [[52.371807,37.773972,53.11254,48.864716]] +long: [[4.896029,-122.431297,6.0989,2.349014]] +``` + +You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again: + +```python +df = pa.Table.from_pylist( + [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], +) + +tbl.append(df) +``` + +When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table: + +``` +pyarrow.Table +city: string +lat: double +long: double +---- +city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]] +lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]] +long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]] +``` + +The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files. + + + +!!! example "Under development" + Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on. + + + ## Schema evolution PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden). diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index eae7f07ce9..f8d88f4686 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2068,7 +2068,6 @@ class _MergeAppend: _snapshot_id: int _parent_snapshot_id: Optional[int] _added_data_files: List[DataFile] - _existing_manifest_entries: List[ManifestEntry] _commit_uuid: uuid.UUID def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: @@ -2078,19 +2077,7 @@ def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None # Since we only support the main branch for now self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None self._added_data_files = [] - self._existing_manifest_entries = [] self._commit_uuid = uuid.uuid4() - if self._operation == Operation.APPEND: - self._add_existing_files() - - def _add_existing_files(self) -> None: - if current_snapshot := self._table.current_snapshot(): - for manifest in current_snapshot.manifests(io=self._table.io): - if manifest.content != ManifestContent.DATA: - raise ValueError(f"Cannot write to tables that contain Merge-on-Write manifests: {manifest}") - - for entry in manifest.fetch_manifest_entry(io=self._table.io): - self._existing_manifest_entries.append(entry) def append_data_file(self, data_file: DataFile) -> _MergeAppend: self._added_data_files.append(data_file) @@ -2099,7 +2086,7 @@ def append_data_file(self, data_file: DataFile) -> _MergeAppend: def _manifests(self) -> List[ManifestFile]: manifests = [] - if self._added_data_files or self._existing_manifest_entries: + if self._added_data_files: output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) with write_manifest( format_version=self._table.format_version, @@ -2108,19 +2095,6 @@ def _manifests(self) -> List[ManifestFile]: output_file=self._table.io.new_output(output_file_location), snapshot_id=self._snapshot_id, ) as writer: - # First write the existing entries - for existing_entry in self._existing_manifest_entries: - writer.add_entry( - ManifestEntry( - status=ManifestEntryStatus.EXISTING, - snapshot_id=existing_entry.snapshot_id, - data_sequence_number=existing_entry.data_sequence_number, - file_sequence_number=existing_entry.file_sequence_number, - data_file=existing_entry.data_file, - ) - ) - - # Write the newly added data for data_file in self._added_data_files: writer.add_entry( ManifestEntry( @@ -2145,7 +2119,7 @@ def _summary(self) -> Dict[str, str]: return ssc.build() def commit(self) -> Snapshot: - manifests = self._manifests() + new_manifests = self._manifests() next_sequence_number = self._table.next_sequence_number() previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None @@ -2165,7 +2139,10 @@ def commit(self) -> Snapshot: parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, ) as writer: - writer.add_manifests(manifests) + if self._operation == Operation.APPEND and previous_snapshot is not None: + # In case we want to append, just add the existing manifests + writer.add_manifests(previous_snapshot.manifests(io=self._table.io)) + writer.add_manifests(new_manifests) snapshot = Snapshot( snapshot_id=self._snapshot_id,