Skip to content

Commit

Permalink
feat(polars): allow user to specify "engine" kwarg (#10151)
Browse files Browse the repository at this point in the history
Co-authored-by: Phillip Cloud <[email protected]>
Co-authored-by: Jim Crist-Harif <[email protected]>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent 4487ff4 commit 3877d6d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
29 changes: 24 additions & 5 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Iterable, Mapping
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

import polars as pl

Expand Down Expand Up @@ -492,6 +492,7 @@ def _to_dataframe(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
) -> pl.DataFrame:
self._run_pre_execute_hooks(expr)
Expand All @@ -501,7 +502,7 @@ def _to_dataframe(
limit = ibis.options.sql.default_limit
if limit is not None:
lf = lf.limit(limit)
df = lf.collect(streaming=streaming)
df = lf.collect(streaming=streaming, engine=engine)
# XXX: Polars sometimes returns data with the incorrect column names.
# For now we catch this case and rename them here if needed.
expected_cols = tuple(table_expr.columns)
Expand All @@ -515,10 +516,16 @@ def execute(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
if isinstance(expr, (ir.Table, ir.Scalar)):
return expr.__pandas_result__(df.to_pandas())
Expand All @@ -540,10 +547,16 @@ def to_polars(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
return expr.__polars_result__(df)

Expand All @@ -553,12 +566,18 @@ def _to_pyarrow_table(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
from ibis.formats.pyarrow import PyArrowData

df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
return PyArrowData.convert_table(df.to_arrow(), expr.as_table().schema())

Expand Down
16 changes: 16 additions & 0 deletions ibis/backends/polars/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ def test_memtable_polars_types(con):
res = con.to_polars((t.x + t.y + t.z).name("test"))
sol = (df["x"] + df["y"] + df["z"]).rename("test")
pl.testing.assert_series_equal(res, sol)


@pytest.mark.parametrize("to_method", ["to_pyarrow", "to_polars"])
def test_streaming(con, mocker, to_method):
t = con.table("functional_alltypes")
mocked_collect = mocker.patch("polars.LazyFrame.collect")
getattr(con, to_method)(t, streaming=True)
mocked_collect.assert_called_once_with(streaming=True, engine="cpu")


@pytest.mark.parametrize("to_method", ["to_pyarrow", "to_polars"])
def test_engine(con, mocker, to_method):
t = con.table("functional_alltypes")
mocked_collect = mocker.patch("polars.LazyFrame.collect")
getattr(con, to_method)(t, engine="gpu")
mocked_collect.assert_called_once_with(streaming=False, engine="gpu")

0 comments on commit 3877d6d

Please sign in to comment.