diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 682b1ac731..7081794aff 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1585,12 +1585,12 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: except StopIteration: pass - file_path = f'{table.location()}/data/{task.generate_datafile_filename("parquet")}' + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = schema_to_pyarrow(table.schema()) collected_metrics: List[pq.FileMetaData] = [] fo = table.io.new_output(file_path) - with fo.create() as fos: + with fo.create(overwrite=True) as fos: with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: writer.write_table(task.df) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index becf5a473d..6e1d11c23c 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -493,25 +493,41 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition return [field.to_summary() for field in field_stats] -MANIFEST_FILE_SCHEMA: Schema = Schema( - NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), - NestedField(501, "manifest_length", LongType(), required=True), - NestedField(502, "partition_spec_id", IntegerType(), required=True), - NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA), - NestedField(515, "sequence_number", LongType(), required=False, initial_default=0), - NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0), - NestedField(503, "added_snapshot_id", LongType(), required=True), - NestedField(504, "added_files_count", IntegerType(), required=False), - NestedField(505, "existing_files_count", IntegerType(), required=False), - NestedField(506, "deleted_files_count", IntegerType(), required=False), - NestedField(512, "added_rows_count", LongType(), required=False), - NestedField(513, "existing_rows_count", LongType(), required=False), - NestedField(514, "deleted_rows_count", LongType(), required=False), - NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), - NestedField(519, "key_metadata", BinaryType(), required=False), -) +MANIFEST_LIST_FILE_SCHEMAS: Dict[int, Schema] = { + 1: Schema( + NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), + NestedField(501, "manifest_length", LongType(), required=True), + NestedField(502, "partition_spec_id", IntegerType(), required=True), + NestedField(503, "added_snapshot_id", LongType(), required=True), + NestedField(504, "added_files_count", IntegerType(), required=False), + NestedField(505, "existing_files_count", IntegerType(), required=False), + NestedField(506, "deleted_files_count", IntegerType(), required=False), + NestedField(512, "added_rows_count", LongType(), required=False), + NestedField(513, "existing_rows_count", LongType(), required=False), + NestedField(514, "deleted_rows_count", LongType(), required=False), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField(519, "key_metadata", BinaryType(), required=False), + ), + 2: Schema( + NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), + NestedField(501, "manifest_length", LongType(), required=True), + NestedField(502, "partition_spec_id", IntegerType(), required=True), + NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA), + NestedField(515, "sequence_number", LongType(), required=False, initial_default=0), + NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0), + NestedField(503, "added_snapshot_id", LongType(), required=True), + NestedField(504, "added_files_count", IntegerType(), required=True), + NestedField(505, "existing_files_count", IntegerType(), required=True), + NestedField(506, "deleted_files_count", IntegerType(), required=True), + NestedField(512, "added_rows_count", LongType(), required=True), + NestedField(513, "existing_rows_count", LongType(), required=True), + NestedField(514, "deleted_rows_count", LongType(), required=True), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField(519, "key_metadata", BinaryType(), required=False), + ), +} -MANIFEST_FILE_SCHEMA_STRUCT = MANIFEST_FILE_SCHEMA.as_struct() +MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} POSITIONAL_DELETE_SCHEMA = Schema( @@ -554,7 +570,7 @@ class ManifestFile(Record): key_metadata: Optional[bytes] def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": MANIFEST_FILE_SCHEMA_STRUCT, **named_data}) + super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data}) def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -599,7 +615,7 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: """ with AvroFile[ManifestFile]( input_file, - MANIFEST_FILE_SCHEMA, + MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION], read_types={-1: ManifestFile, 508: PartitionFieldSummary}, read_enums={517: ManifestContent}, ) as reader: @@ -630,12 +646,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED - if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + if entry.data_sequence_number is None and (manifest.sequence_number <= 0 or entry.status == ManifestEntryStatus.ADDED): entry.data_sequence_number = manifest.sequence_number # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED - if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + if entry.file_sequence_number is None and (manifest.sequence_number <= 0 or entry.status == ManifestEntryStatus.ADDED): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number @@ -841,13 +857,15 @@ def write_manifest( class ManifestListWriter(ABC): + _format_version: Literal[1, 2] _output_file: OutputFile _meta: Dict[str, str] _manifest_files: List[ManifestFile] _commit_snapshot_id: int _writer: AvroOutputFile[ManifestFile] - def __init__(self, output_file: OutputFile, meta: Dict[str, Any]): + def __init__(self, format_version: Literal[1, 2], output_file: OutputFile, meta: Dict[str, Any]): + self._format_version = format_version self._output_file = output_file self._meta = meta self._manifest_files = [] @@ -855,7 +873,11 @@ def __init__(self, output_file: OutputFile, meta: Dict[str, Any]): def __enter__(self) -> ManifestListWriter: """Open the writer for writing.""" self._writer = AvroOutputFile[ManifestFile]( - output_file=self._output_file, file_schema=MANIFEST_FILE_SCHEMA, schema_name="manifest_file", metadata=self._meta + output_file=self._output_file, + record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION], + file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version], + schema_name="manifest_file", + metadata=self._meta, ) self._writer.__enter__() return self @@ -881,7 +903,9 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite class ManifestListWriterV1(ManifestListWriter): def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]): super().__init__( - output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"} + format_version=1, + output_file=output_file, + meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}, ) def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: @@ -896,8 +920,9 @@ class ManifestListWriterV2(ManifestListWriter): def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): super().__init__( - output_file, - { + format_version=2, + output_file=output_file, + meta={ "snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "sequence-number": str(sequence_number), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5dec768382..c74ee3f69b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -853,9 +853,6 @@ def last_sequence_number(self) -> int: def next_sequence_number(self) -> int: return self.last_sequence_number + 1 if self.metadata.format_version > 1 else INITIAL_SEQUENCE_NUMBER - def _next_sequence_number(self) -> int: - return INITIAL_SEQUENCE_NUMBER if self.format_version == 1 else self.last_sequence_number + 1 - def new_snapshot_id(self) -> int: """Generate a new snapshot-id that's not in use.""" snapshot_id = _generate_snapshot_id() @@ -888,30 +885,45 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) def append(self, df: pa.Table) -> None: + """ + Append data to the table. + + Args: + df: The Arrow dataframe that will be appended to overwrite the table + """ if len(self.spec().fields) > 0: raise ValueError("Cannot write to partitioned tables") + if len(self.sort_order().fields) > 0: + raise ValueError("Cannot write to tables with a sort-order") + snapshot_id = self.new_snapshot_id() data_files = _dataframe_to_data_files(self, df=df) merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id) for data_file in data_files: - merge.append_datafile(data_file) - - if current_snapshot := self.current_snapshot(): - for manifest in current_snapshot.manifests(io=self.io): - for entry in manifest.fetch_manifest_entry(io=self.io): - merge.append_datafile(entry.data_file, added=False) + merge.append_data_file(data_file) merge.commit() def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: + """ + Overwrite all the data in the table. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + overwrite_filter: ALWAYS_TRUE when you overwrite all the data, + or a boolean expression in case of a partial overwrite + """ if overwrite_filter != AlwaysTrue(): raise NotImplementedError("Cannot overwrite a subset of a table") if len(self.spec().fields) > 0: raise ValueError("Cannot write to partitioned tables") + if len(self.sort_order().fields) > 0: + raise ValueError("Cannot write to tables with a sort-order") + snapshot_id = self.new_snapshot_id() data_files = _dataframe_to_data_files(self, df=df) @@ -923,7 +935,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T ) for data_file in data_files: - merge.append_datafile(data_file) + merge.append_data_file(data_file) merge.commit() @@ -1139,7 +1151,7 @@ def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int: return INITIAL_SEQUENCE_NUMBER -def _match_deletes_to_datafile(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]: +def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]: """Check if the delete file is relevant for the data file. Using the column metrics to see if the filename is in the lower and upper bound. @@ -1283,7 +1295,7 @@ def plan_files(self) -> Iterable[FileScanTask]: return [ FileScanTask( data_entry.data_file, - delete_files=_match_deletes_to_datafile( + delete_files=_match_deletes_to_data_file( data_entry, positional_delete_entries, ), @@ -2017,7 +2029,7 @@ class WriteTask: # Later to be extended with partition information - def generate_datafile_filename(self, extension: str) -> str: + def generate_data_file_filename(self, extension: str) -> str: # Mimics the behavior in the Java API: # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 return f"00000-{self.task_id}-{self.write_uuid}.{extension}" @@ -2027,15 +2039,21 @@ def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: return f'{location}/metadata/{commit_uuid}-m{num}.avro' -def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: # Mimics the behavior in Java: # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 - return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: from pyiceberg.io.pyarrow import write_file + if len(table.spec().fields) > 0: + raise ValueError("Cannot write to partitioned tables") + + if len(table.sort_order().fields) > 0: + raise ValueError("Cannot write to tables with a sort-order") + write_uuid = uuid.uuid4() counter = itertools.count(0) @@ -2049,8 +2067,8 @@ class _MergeAppend: _table: Table _snapshot_id: int _parent_snapshot_id: Optional[int] - _added_datafiles: List[DataFile] - _existing_datafiles: List[DataFile] + _added_data_files: List[DataFile] + _existing_manifest_entries: List[ManifestEntry] _commit_uuid: uuid.UUID def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None: @@ -2059,22 +2077,30 @@ def __init__(self, operation: Operation, table: Table, snapshot_id: int) -> None self._snapshot_id = snapshot_id # Since we only support the main branch for now self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None - self._added_datafiles = [] - self._existing_datafiles = [] + self._added_data_files = [] + self._existing_manifest_entries = [] self._commit_uuid = uuid.uuid4() + if self._operation == Operation.APPEND: + self._add_existing_files() - def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: - if added: - self._added_datafiles.append(data_file) - else: - self._existing_datafiles.append(data_file) + def _add_existing_files(self) -> None: + if current_snapshot := self._table.current_snapshot(): + for manifest in current_snapshot.manifests(io=self._table.io): + if manifest.content != ManifestContent.DATA: + raise ValueError(f"Cannot write to tables that contain Merge-on-Write manifests: {manifest}") + + for entry in manifest.fetch_manifest_entry(io=self._table.io): + self._existing_manifest_entries.append(entry) + + def append_data_file(self, data_file: DataFile) -> _MergeAppend: + self._added_data_files.append(data_file) return self def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: ssc = SnapshotSummaryCollector() manifests = [] - if self._added_datafiles: + if self._added_data_files or self._existing_manifest_entries: output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) with write_manifest( format_version=self._table.format_version, @@ -2083,7 +2109,20 @@ def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: output_file=self._table.io.new_output(output_file_location), snapshot_id=self._snapshot_id, ) as writer: - for data_file in self._added_datafiles + self._existing_datafiles: + # First write the existing entries + for existing_entry in self._existing_manifest_entries: + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=existing_entry.snapshot_id, + data_sequence_number=existing_entry.data_sequence_number, + file_sequence_number=existing_entry.file_sequence_number, + data_file=existing_entry.data_file, + ) + ) + + # Write the newly added data + for data_file in self._added_data_files: writer.add_entry( ManifestEntry( status=ManifestEntryStatus.ADDED, @@ -2093,8 +2132,6 @@ def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: data_file=data_file, ) ) - - for data_file in self._added_datafiles: ssc.add_file(data_file=data_file) manifests.append(writer.to_manifest_file()) @@ -2103,6 +2140,7 @@ def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: def commit(self) -> Snapshot: new_summary, manifests = self._manifests() + next_sequence_number = self._table.next_sequence_number() previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None summary = update_snapshot_summaries( @@ -2111,16 +2149,15 @@ def commit(self) -> Snapshot: truncate_full_table=self._operation == Operation.OVERWRITE, ) - manifest_list_filename = _generate_manifest_list_filename( - snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid + manifest_list_file_path = _generate_manifest_list_path( + location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid ) - manifest_list_file_path = f'{self._table.location()}/metadata/{manifest_list_filename}' with write_manifest_list( format_version=self._table.metadata.format_version, output_file=self._table.io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, - sequence_number=self._table.next_sequence_number(), + sequence_number=next_sequence_number, ) as writer: writer.add_manifests(manifests) @@ -2128,7 +2165,7 @@ def commit(self) -> Snapshot: snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, manifest_list=manifest_list_file_path, - sequence_number=self._table._next_sequence_number(), + sequence_number=next_sequence_number, summary=summary, schema_id=self._table.schema().schema_id, ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 475814a051..9943c6f94a 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -58,7 +58,7 @@ UpdateSchema, _apply_table_update, _generate_snapshot_id, - _match_deletes_to_datafile, + _match_deletes_to_data_file, _TableMetadataUpdateContext, update_table_metadata, ) @@ -357,7 +357,7 @@ def test_match_deletes_to_datafile() -> None: upper_bounds={}, ), ) - assert _match_deletes_to_datafile( + assert _match_deletes_to_data_file( data_entry, SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), ) == { @@ -414,7 +414,7 @@ def test_match_deletes_to_datafile_duplicate_number() -> None: upper_bounds={}, ), ) - assert _match_deletes_to_datafile( + assert _match_deletes_to_data_file( data_entry, SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), ) == { diff --git a/tests/test_integration_write.py b/tests/test_integration_write.py index caae67c7b8..d6e8f80129 100644 --- a/tests/test_integration_write.py +++ b/tests/test_integration_write.py @@ -23,7 +23,7 @@ from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError from pyiceberg.schema import Schema from pyiceberg.types import ( BinaryType, @@ -43,7 +43,7 @@ @pytest.fixture() def catalog() -> Catalog: - return load_catalog( + catalog = load_catalog( "local", **{ "type": "rest", @@ -54,6 +54,13 @@ def catalog() -> Catalog: }, ) + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + return catalog + TEST_DATA_WITH_NULL = { 'bool': [False, None, True],