Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose PyIceberg table as PyArrow Dataset #7598

Closed
wjones127 opened this issue May 12, 2023 · 7 comments
Closed

Expose PyIceberg table as PyArrow Dataset #7598

wjones127 opened this issue May 12, 2023 · 7 comments

Comments

@wjones127
Copy link
Member

Feature Request / Improvement

Hi, I've been looking at seeing what we can do to make PyArrow Datasets extensible for various table formats and making them consumable to various compute engines (including DuckDB, Polars, DataFusion, Dask). I've written up my observations here: https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit?usp=sharing

What this means for PyIceberg's API

Currently, integration with engines like DuckDB means filters and projections have to be specified up front, rather than pushed down from the query:

con = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")

Ideally, we should be able to export the table as a dataset, register it in DuckDB (or some other engine), and then filters and projections can be pushed down as the engine sees fit. Then the following would perform equivalent to the above, but would be more user friendly:

dataset = table.to_pyarrow_dataset()
con.register(dataset, "distant_taxi_trips")
conn.sql(""""SELECT VendorID, tpep_pickup_datetime, tpep_dropoff_datetime
    FROM distant_taxi_trips
    WHERE trip_distance > 10.0""")

Query engine

Other

@wjones127
Copy link
Member Author

cc @Fokko, who I think has been doing much of the work leading PyIceberg.

@corleyma
Copy link

corleyma commented May 14, 2023

I just wanted to second this as the best way to integrate with PyArrow; I've been doing essentially this approach using scan.plan_files(), but it's imperfect because PyIceberg does a lot of the important logic for reconciling schema evolutions, etc, after the scan planning.

Specifically, the reconciliation logic happens in the project_table function. (specifically, to_duckdb calls to_arrow which calls project_table). project_table both generates the projection plan based on the iceberg schema and executes the projection, loading a PyArrow table into memory for every file in the scan plan and concatenating them together. The concatenated table is what ultimately gets passed around to pyarrow or duckdb, which is not great if e.g. your subsequent queries could have benefited from further pushdown that would have limited the amount of data that needed to be read.

So, @wjones127, ideally there'd be some way to express this Iceberg schema reconciliation logic directly in a PyArrow dataset, which would become the logical source node passed to different engines to do their own querying/pushdown. Some time ago I think I looked at how feasible this would be to do in pyarrow Dataset and I think I concluded it wasn't possible yet -- specifically, that there was not a way for PyArrow dataset to have a different projection plan for every file in a dataset, which is potentially required in Iceberg given how the schema may have evolved over time -- but my recollection is a little hazy.

@Fokko
Copy link
Contributor

Fokko commented May 15, 2023

@wjones127 Thanks for raising this and doing all the work. I've added some comments to the Google doc and also the pull request that describes the interface.

@corleyma has a good point here. I think the main reason why Arrow doesn't have an Iceberg implementation today is that it is quite a lot of work to get the details right. And the details make Iceberg so performant.

As apache/arrow#33986 suggest I think it would be great for PyIceberg to be able to produce and consume substrait plans. It could consume a light-level plan SELECT * FROM s3://bucket/path/json@snapshot-id WHERE date_field => 2023-01-01 and produce a low-level plan where it would tell Arrow which files to read, and what kind of projection needs to be done. It will become complex though, for example, how would we express positional deletes? It can be done but would need some changes to substrait I assume.

That said, I'm all in to see if we can integrate PyIceberg into Arrow. I agree that the dataset is the ideal situation. If there is anything that you want me to try, please let me know, I'm happy to help and see if we can make this work.

@wjones127
Copy link
Member Author

specifically, that there was not a way for PyArrow dataset to have a different projection plan for every file in a dataset, which is potentially required in Iceberg given how the schema may have evolved over time

Yes. I think what's new in this proposal is the idea that you would write your own classes for Scanner, Dataset, and Fragment. That would give you full control over how they are implemented, but using the standard API allows various engines to know how to push down projections and filters. So the question is: is there anything in the proposed interface that seems to be a blocker?

There might also be utilities you need from PyArrow. For example, the residual filter functionality you mention is handled in Arrow C++ by a function called SimplifyWithGuarantee, but that's not exposed in Python. (See the "how does filter pushdown work?" section here)

It will become complex though, for example, how would we express #6775? It can be done but would need some changes to substrait I assume.

Agreed. Positional deletes make me think we'll likely never see an Iceberg scan expressed purely in Substrait. Otherwise, it is an appealing idea.

@corleyma
Copy link

specifically, that there was not a way for PyArrow dataset to have a different projection plan for every file in a dataset, which is potentially required in Iceberg given how the schema may have evolved over time

Yes. I think what's new in this proposal is the idea that you would write your own classes for Scanner, Dataset, and Fragment. That would give you full control over how they are implemented, but using the standard API allows various engines to know how to push down projections and filters. So the question is: is there anything in the proposed interface that seems to be a blocker?

There might also be utilities you need from PyArrow. For example, the residual filter functionality you mention is handled in Arrow C++ by a function called SimplifyWithGuarantee, but that's not exposed in Python. (See the "how does filter pushdown work?" section here)

Honestly, and perhaps dumbly, this option of PyIceberg implementing a dataset-compatible interface didn't occur to me! I like the idea a lot, though, particularly if the interface is settling on something relatively stable.

I looked at the code in PyIceberg again and I remembered an idea I had that I never tested. Right now, the implementation eagerly loads a table for every file-level projection and concats them. Would it be possible instead to create a pyarrow dataset for every file and return a union dataset that combines them? I've never touched these lower level features of PyArrow datasets before so this idea is all based on hazy recollection of source code reading from long ago.

If this is something PyArrow supports today (unioning datasets with different projection plans that produce the same final schema, without materializing a table), then it could be the easiest way to achieve the "pyiceberg returns a dataset that is compatible with iceberg schema evolution", at least for copy-on-write workloads.

I haven't thought much about positional deletes but I suspect that would require custom dataset interface implementation-- again, that might be the best solution in general anyway, though if pyiceberg were able to leverage only the interfaces as defined in pyarrow there would be less concern about drift between feature support over time.

@wjones127
Copy link
Member Author

Honestly, and perhaps dumbly, this option of PyIceberg implementing a dataset-compatible interface didn't occur to me! I like the idea a lot, though, particularly if the interface is settling on something relatively stable.

It took me a while to come around to it too.

Would it be possible instead to create a pyarrow dataset for every file and return a union dataset that combines them?

It might be worth a shot, but I'm not entirely sure.

@Fokko
Copy link
Contributor

Fokko commented Oct 2, 2023

Hey everyone, still very excited to get dataset support. I've migrated this issue to the new repository: apache/iceberg-python#30

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants