Skip to content

Commit

Permalink
Comments and fixing some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 15, 2024
1 parent d441af9 commit 2a65357
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 67 deletions.
4 changes: 2 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,12 +1585,12 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
except StopIteration:
pass

file_path = f'{table.location()}/data/{task.generate_datafile_filename("parquet")}'
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = schema_to_pyarrow(table.schema())

collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create() as fos:
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
writer.write_table(task.df)

Expand Down
79 changes: 52 additions & 27 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,25 +493,41 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition
return [field.to_summary() for field in field_stats]


MANIFEST_FILE_SCHEMA: Schema = Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA),
NestedField(515, "sequence_number", LongType(), required=False, initial_default=0),
NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=False),
NestedField(505, "existing_files_count", IntegerType(), required=False),
NestedField(506, "deleted_files_count", IntegerType(), required=False),
NestedField(512, "added_rows_count", LongType(), required=False),
NestedField(513, "existing_rows_count", LongType(), required=False),
NestedField(514, "deleted_rows_count", LongType(), required=False),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
)
MANIFEST_LIST_FILE_SCHEMAS: Dict[int, Schema] = {
1: Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=False),
NestedField(505, "existing_files_count", IntegerType(), required=False),
NestedField(506, "deleted_files_count", IntegerType(), required=False),
NestedField(512, "added_rows_count", LongType(), required=False),
NestedField(513, "existing_rows_count", LongType(), required=False),
NestedField(514, "deleted_rows_count", LongType(), required=False),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
2: Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA),
NestedField(515, "sequence_number", LongType(), required=False, initial_default=0),
NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=True),
NestedField(505, "existing_files_count", IntegerType(), required=True),
NestedField(506, "deleted_files_count", IntegerType(), required=True),
NestedField(512, "added_rows_count", LongType(), required=True),
NestedField(513, "existing_rows_count", LongType(), required=True),
NestedField(514, "deleted_rows_count", LongType(), required=True),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
}

MANIFEST_FILE_SCHEMA_STRUCT = MANIFEST_FILE_SCHEMA.as_struct()
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
Expand Down Expand Up @@ -554,7 +570,7 @@ class ManifestFile(Record):
key_metadata: Optional[bytes]

def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_FILE_SCHEMA_STRUCT, **named_data})
super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data})

def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0
Expand Down Expand Up @@ -599,7 +615,7 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
"""
with AvroFile[ManifestFile](
input_file,
MANIFEST_FILE_SCHEMA,
MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestFile, 508: PartitionFieldSummary},
read_enums={517: ManifestContent},
) as reader:
Expand Down Expand Up @@ -630,12 +646,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani

# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
if entry.data_sequence_number is None and (manifest.sequence_number <= 0 or entry.status == ManifestEntryStatus.ADDED):
entry.data_sequence_number = manifest.sequence_number

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
if entry.file_sequence_number is None and (manifest.sequence_number <= 0 or entry.status == ManifestEntryStatus.ADDED):
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number

Expand Down Expand Up @@ -841,21 +857,27 @@ def write_manifest(


class ManifestListWriter(ABC):
_format_version: Literal[1, 2]
_output_file: OutputFile
_meta: Dict[str, str]
_manifest_files: List[ManifestFile]
_commit_snapshot_id: int
_writer: AvroOutputFile[ManifestFile]

def __init__(self, output_file: OutputFile, meta: Dict[str, Any]):
def __init__(self, format_version: Literal[1, 2], output_file: OutputFile, meta: Dict[str, Any]):
self._format_version = format_version
self._output_file = output_file
self._meta = meta
self._manifest_files = []

def __enter__(self) -> ManifestListWriter:
"""Open the writer for writing."""
self._writer = AvroOutputFile[ManifestFile](
output_file=self._output_file, file_schema=MANIFEST_FILE_SCHEMA, schema_name="manifest_file", metadata=self._meta
output_file=self._output_file,
record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
schema_name="manifest_file",
metadata=self._meta,
)
self._writer.__enter__()
return self
Expand All @@ -881,7 +903,9 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite
class ManifestListWriterV1(ManifestListWriter):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
super().__init__(
output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}
format_version=1,
output_file=output_file,
meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"},
)

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
Expand All @@ -896,8 +920,9 @@ class ManifestListWriterV2(ManifestListWriter):

def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
super().__init__(
output_file,
{
format_version=2,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id),
"sequence-number": str(sequence_number),
Expand Down
Loading

0 comments on commit 2a65357

Please sign in to comment.