Skip to content

Commit

Permalink
Move to fast-appends
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 16, 2024
1 parent 48ba852 commit 664e113
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 29 deletions.
98 changes: 98 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(

The static-table is considered read-only.

## Write support

With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:

```python
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
```

Next, create a table based on the schema:

```python
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table("default.cities", schema=schema)
```

Now write the data to the table:

<!-- prettier-ignore-start -->

!!! note inline end "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads.

<!-- prettier-ignore-end -->

```python
tbl.append(df)
# or
tbl.overwrite(df)
```

The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`:

```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
```

You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again:

```python
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)
tbl.append(df)
```

When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:

```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
```

The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files.

<!-- prettier-ignore-start -->

!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.

<!-- prettier-ignore-end -->

## Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
Expand Down
35 changes: 6 additions & 29 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,6 @@ class _MergeAppend:
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_existing_manifest_entries: List[ManifestEntry]
_commit_uuid: uuid.UUID

def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None:
Expand All @@ -2078,19 +2077,7 @@ def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None
# Since we only support the main branch for now
self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None
self._added_data_files = []
self._existing_manifest_entries = []
self._commit_uuid = uuid.uuid4()
if self._operation == Operation.APPEND:
self._add_existing_files()

def _add_existing_files(self) -> None:
if current_snapshot := self._table.current_snapshot():
for manifest in current_snapshot.manifests(io=self._table.io):
if manifest.content != ManifestContent.DATA:
raise ValueError(f"Cannot write to tables that contain Merge-on-Write manifests: {manifest}")

for entry in manifest.fetch_manifest_entry(io=self._table.io):
self._existing_manifest_entries.append(entry)

def append_data_file(self, data_file: DataFile) -> _MergeAppend:
self._added_data_files.append(data_file)
Expand All @@ -2099,7 +2086,7 @@ def append_data_file(self, data_file: DataFile) -> _MergeAppend:
def _manifests(self) -> List[ManifestFile]:
manifests = []

if self._added_data_files or self._existing_manifest_entries:
if self._added_data_files:
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid)
with write_manifest(
format_version=self._table.format_version,
Expand All @@ -2108,19 +2095,6 @@ def _manifests(self) -> List[ManifestFile]:
output_file=self._table.io.new_output(output_file_location),
snapshot_id=self._snapshot_id,
) as writer:
# First write the existing entries
for existing_entry in self._existing_manifest_entries:
writer.add_entry(
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=existing_entry.snapshot_id,
data_sequence_number=existing_entry.data_sequence_number,
file_sequence_number=existing_entry.file_sequence_number,
data_file=existing_entry.data_file,
)
)

# Write the newly added data
for data_file in self._added_data_files:
writer.add_entry(
ManifestEntry(
Expand All @@ -2145,7 +2119,7 @@ def _summary(self) -> Dict[str, str]:
return ssc.build()

def commit(self) -> Snapshot:
manifests = self._manifests()
new_manifests = self._manifests()
next_sequence_number = self._table.next_sequence_number()

previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None
Expand All @@ -2165,7 +2139,10 @@ def commit(self) -> Snapshot:
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
) as writer:
writer.add_manifests(manifests)
if self._operation == Operation.APPEND and previous_snapshot is not None:
# In case we want to append, just add the existing manifests
writer.add_manifests(previous_snapshot.manifests(io=self._table.io))
writer.add_manifests(new_manifests)

snapshot = Snapshot(
snapshot_id=self._snapshot_id,
Expand Down

0 comments on commit 664e113

Please sign in to comment.