Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Nov 7, 2023
1 parent d9c1a84 commit 1b50c8d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 13 deletions.
15 changes: 10 additions & 5 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ def _task_to_table(
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_arrow_schema: pa.schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
Expand Down Expand Up @@ -836,9 +837,11 @@ def _task_to_table(
if file_schema is None:
raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")

{
columns = {
# Projecting nested fields doesn't work...
projected_schema.find_column_name(col.field_id): pc.field(col.name)
projected_schema.find_column_name(col.field_id): pc.field(col.name).cast(
schema_to_pyarrow(col.field_type)
)
for col in file_project_schema.columns
}

Expand All @@ -848,7 +851,7 @@ def _task_to_table(
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
# columns=columns,
columns=columns,
)

if positional_deletes:
Expand Down Expand Up @@ -886,7 +889,8 @@ def _task_to_table(

row_counts.append(len(arrow_table))

return arrow_table
# arrow_table.select(projected_arrow_schema)
return arrow_table.cast(projected_arrow_schema)


def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
Expand Down Expand Up @@ -949,6 +953,7 @@ def project_table(

# Will raise an exception
_ = table.schema().is_compatible(projected_schema)
projected_schema_arrow = schema_to_pyarrow(projected_schema)

projected_field_ids = {
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
Expand All @@ -964,6 +969,7 @@ def project_table(
task,
bound_row_filter,
projected_schema,
projected_schema_arrow,
projected_field_ids,
deletes_per_file.get(task.file.file_path),
case_sensitive,
Expand All @@ -989,7 +995,6 @@ def project_table(

tables = [f.result() for f in completed_futures if f.result()]

projected_schema_arrow = schema_to_pyarrow(projected_schema)
empty_table = pa.Table.from_batches([], schema=projected_schema_arrow)

if len(tables) < 1:
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ markers = [
]

# Turns a warning into an error
filterwarnings = [
"error"
]
#filterwarnings = [
# "error"
#]

[tool.black]
line-length = 130
Expand Down
10 changes: 8 additions & 2 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,9 +946,15 @@ def test_read_map(schema_map: Schema, file_map: str) -> None:
assert (
repr(result_table.schema)
== """properties: map<string, string>
child 0, entries: struct<key: string not null, value: string> not null
child 0, entries: struct<key: string not null, value: string not null> not null
child 0, key: string not null
child 1, value: string"""
-- field metadata --
field_id: '51'
child 1, value: string not null
-- field metadata --
field_id: '52'
-- field metadata --
field_id: '5'"""
)


Expand Down
6 changes: 3 additions & 3 deletions tests/io/test_pyarrow_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,10 +752,10 @@ def test_stats_types(table_schema_nested: Schema) -> None:
# table_metadata = TableMetadataUtil.parse_obj(table_metadata)
# schema = schema_to_pyarrow(table_metadata.schemas[0])

# _ints = [0, 2, 4, 8, 1, 3, 5, 7]
# parity = [True, True, True, True, False, False, False, False]
_ints = [0, 2, 4, 8, 1, 3, 5, 7]
parity = [True, True, True, True, False, False, False, False]

# table = pa.Table.from_pydict({"ints": _ints, "even": parity}, schema=schema)
table = pa.Table.from_pydict({"ints": _ints, "even": parity}, schema=schema)

# visited_paths = []

Expand Down

0 comments on commit 1b50c8d

Please sign in to comment.