Skip to content

Commit

Permalink
fix(deps): update dependency datafusion to v41 (#10147)
Browse files Browse the repository at this point in the history
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Phillip Cloud <[email protected]>
  • Loading branch information
renovate[bot] and cpcloud authored Sep 17, 2024
1 parent b31fcc6 commit e7cfc11
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 34 deletions.
6 changes: 3 additions & 3 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
# self.con.register_table is broken, so we do this roundabout thing
# of constructing a datafusion DataFrame, which has a side effect
# of registering the table
self.con.from_arrow_table(op.data.to_pyarrow(op.schema), op.name)
self.con.from_arrow(op.data.to_pyarrow(op.schema), op.name)

def read_csv(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
Expand Down Expand Up @@ -757,14 +757,14 @@ def _polars(source, table_name, _conn, overwrite: bool = False):
def _pyarrow_table(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source, name=tmp_name)
_conn.con.from_arrow(source, name=tmp_name)


@_read_in_memory.register("pyarrow.RecordBatchReader")
def _pyarrow_rbr(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source.read_all(), name=tmp_name)
_conn.con.from_arrow(source.read_all(), name=tmp_name)


@_read_in_memory.register("pyarrow.RecordBatch")
Expand Down
4 changes: 3 additions & 1 deletion ibis/backends/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,9 @@ def test_grouped_case(backend, con):

@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["datafusion"], raises=Exception, reason="not supported in datafusion"
["datafusion"],
raises=BaseException,
reason="because pyo3 panic exception is raised",
)
@pytest.mark.notyet(["flink"], raises=Py4JJavaError)
@pytest.mark.notyet(["impala"], raises=ImpalaHiveServer2Error)
Expand Down
45 changes: 40 additions & 5 deletions ibis/backends/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
["bigquery", "impala"], reason="Backend doesn't yet implement map types"
),
pytest.mark.notimpl(
["datafusion", "exasol", "polars", "druid", "oracle"],
["exasol", "polars", "druid", "oracle"],
reason="Not yet implemented in ibis",
),
]
Expand All @@ -39,6 +39,10 @@
reason="function hstore(character varying[], character varying[]) does not exist",
)

mark_notyet_datafusion = pytest.mark.notyet(
["datafusion"], raises=Exception, reason="only map and make_map are available"
)


@pytest.mark.notyet("clickhouse", reason="nested types can't be NULL")
@pytest.mark.notimpl(
Expand All @@ -54,6 +58,7 @@
param(None, None, id="null_both"),
],
)
@mark_notyet_datafusion
def test_map_nulls(con, k, v):
k = ibis.literal(k, type="array<string>")
v = ibis.literal(v, type="array<string>")
Expand All @@ -74,6 +79,7 @@ def test_map_nulls(con, k, v):
param(None, None, id="null_both"),
],
)
@mark_notyet_datafusion
def test_map_keys_nulls(con, k, v):
k = ibis.literal(k, type="array<string>")
v = ibis.literal(v, type="array<string>")
Expand Down Expand Up @@ -106,6 +112,7 @@ def test_map_keys_nulls(con, k, v):
param(ibis.literal(None, type="map<string, string>"), id="null_map"),
],
)
@mark_notyet_datafusion
def test_map_values_nulls(con, map):
assert con.execute(map.values()) is None

Expand Down Expand Up @@ -174,6 +181,7 @@ def test_map_values_nulls(con, map):
],
)
@pytest.mark.parametrize("method", ["get", "contains"])
@mark_notyet_datafusion
def test_map_get_contains_nulls(con, map, key, method):
expr = getattr(map, method)
assert con.execute(expr(key)) is None
Expand Down Expand Up @@ -205,18 +213,21 @@ def test_map_get_contains_nulls(con, map, key, method):
),
],
)
@mark_notyet_datafusion
def test_map_merge_nulls(con, m1, m2):
concatted = m1 + m2
assert con.execute(concatted) is None


@mark_notyet_datafusion
def test_map_table(backend):
table = backend.map
assert table.kv.type().is_map()
assert not table.limit(1).execute().empty


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_column_map_values(backend):
table = backend.map
expr = table.select("idx", vals=table.kv.values()).order_by("idx")
Expand All @@ -225,6 +236,7 @@ def test_column_map_values(backend):
backend.assert_series_equal(result, expected)


@mark_notyet_datafusion
def test_column_map_merge(backend):
table = backend.map
expr = table.select(
Expand All @@ -239,6 +251,7 @@ def test_column_map_merge(backend):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_keys(con):
mapping = ibis.literal({"1": "a", "2": "b"})
expr = mapping.keys().name("tmp")
Expand All @@ -250,6 +263,7 @@ def test_literal_map_keys(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_values(con):
mapping = ibis.literal({"1": "a", "2": "b"})
expr = mapping.values().name("tmp")
Expand All @@ -260,6 +274,7 @@ def test_literal_map_values(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_scalar_isin_literal_map_keys(con):
mapping = ibis.literal({"a": 1, "b": 2})
a = ibis.literal("a")
Expand All @@ -272,6 +287,7 @@ def test_scalar_isin_literal_map_keys(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_scalar_contains_key_scalar(con):
mapping = ibis.literal({"a": 1, "b": 2})
a = ibis.literal("a")
Expand All @@ -283,6 +299,7 @@ def test_map_scalar_contains_key_scalar(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_scalar_contains_key_column(backend, alltypes, df):
value = {"1": "a", "3": "c"}
mapping = ibis.literal(value)
Expand All @@ -294,6 +311,7 @@ def test_map_scalar_contains_key_column(backend, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_column_contains_key_scalar(backend, alltypes, df):
expr = ibis.map(ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col]))
series = df.apply(lambda row: {row["string_col"]: row["int_col"]}, axis=1)
Expand All @@ -306,6 +324,7 @@ def test_map_column_contains_key_scalar(backend, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_column_contains_key_column(alltypes):
map_expr = ibis.map(
ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col])
Expand All @@ -317,6 +336,7 @@ def test_map_column_contains_key_column(alltypes):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_literal_map_merge(con):
a = ibis.literal({"a": 0, "b": 2})
b = ibis.literal({"a": 1, "c": 3})
Expand All @@ -326,6 +346,7 @@ def test_literal_map_merge(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_getitem_broadcast(backend, alltypes, df):
value = {"1": "a", "2": "b"}

Expand Down Expand Up @@ -472,6 +493,7 @@ def test_literal_map_getitem_broadcast(backend, alltypes, df):
@values
@keys
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_get_all_types(con, keys, values):
m = ibis.map(ibis.array(keys), ibis.array(values))
for key, val in zip(keys, values):
Expand All @@ -482,6 +504,7 @@ def test_map_get_all_types(con, keys, values):

@keys
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_contains_all_types(con, keys):
a = ibis.array(keys)
m = ibis.map(a, a)
Expand All @@ -490,6 +513,7 @@ def test_map_contains_all_types(con, keys):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_get_broadcast(backend, alltypes, df):
value = {"1": "a", "2": "b"}

Expand Down Expand Up @@ -524,13 +548,14 @@ def test_map_construct_dict(con, keys, values):
assert result == dict(zip(keys, values))


@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@pytest.mark.notimpl(
["flink"],
raises=pa.lib.ArrowInvalid,
reason="Map array child array should have no nulls",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_construct_array_column(con, alltypes, df):
expr = ibis.map(ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col]))
result = con.execute(expr)
Expand All @@ -541,6 +566,7 @@ def test_map_construct_array_column(con, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_compatible_value_smaller(con):
value = ibis.literal({"A": 1000, "B": 2000})
expr = value.get("C", 3)
Expand All @@ -549,6 +575,7 @@ def test_map_get_with_compatible_value_smaller(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_compatible_value_bigger(con):
value = ibis.literal({"A": 1, "B": 2})
expr = value.get("C", 3000)
Expand All @@ -557,6 +584,7 @@ def test_map_get_with_compatible_value_bigger(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_incompatible_value_different_kind(con):
value = ibis.literal({"A": 1000, "B": 2000})
expr = value.get("C", 3.0)
Expand All @@ -565,6 +593,7 @@ def test_map_get_with_incompatible_value_different_kind(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
@pytest.mark.parametrize("null_value", [None, ibis.null()])
def test_map_get_with_null_on_not_nullable(con, null_value):
map_type = dt.Map(dt.string, dt.Int16(nullable=False))
Expand All @@ -579,18 +608,20 @@ def test_map_get_with_null_on_not_nullable(con, null_value):
["flink"], raises=Py4JJavaError, reason="Flink cannot handle typeless nulls"
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_get_with_null_on_null_type_with_null(con, null_value):
value = ibis.literal({"A": None, "B": None})
expr = value.get("C", null_value)
result = con.execute(expr)
assert pd.isna(result)


@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@pytest.mark.notyet(
["flink"], raises=Py4JJavaError, reason="Flink cannot handle typeless nulls"
)
@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_null_on_null_type_with_non_null(con):
value = ibis.literal({"A": None, "B": None})
expr = value.get("C", 1)
Expand All @@ -603,6 +634,7 @@ def test_map_get_with_null_on_null_type_with_non_null(con):
reason="`tbl_properties` is required when creating table with schema",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_create_table(con, temp_table):
t = con.create_table(
temp_table,
Expand All @@ -617,18 +649,21 @@ def test_map_create_table(con, temp_table):
reason="No translation rule for <class 'ibis.expr.operations.maps.MapLength'>",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_length(con):
expr = ibis.literal(dict(a="A", b="B")).length()
assert con.execute(expr) == 2


@mark_notyet_datafusion
def test_map_keys_unnest(backend):
expr = backend.map.kv.keys().unnest()
result = expr.to_pandas()
assert frozenset(result) == frozenset("abcdef")


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_contains_null(con):
expr = ibis.map(["a"], ibis.literal([None], type="array<string>"))
assert con.execute(expr.contains("a"))
Expand Down
17 changes: 1 addition & 16 deletions ibis/backends/tests/tpc/ds/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from ibis import _, coalesce, cumulative_window, date, ifelse, null, rank, union
from ibis import literal as lit
from ibis import selectors as s
from ibis.backends.tests.errors import (
ArrowNotImplementedError,
ClickHouseDatabaseError,
TrinoUserError,
)
from ibis.backends.tests.errors import ClickHouseDatabaseError, TrinoUserError
from ibis.backends.tests.tpc.conftest import tpc_test
from ibis.common.exceptions import OperationNotDefinedError

Expand Down Expand Up @@ -1416,7 +1412,6 @@ def test_26(catalog_sales, customer_demographics, date_dim, item, promotion):


@tpc_test("ds")
@pytest.mark.notyet(["datafusion"], reason="Failed to plan")
def test_27(store_sales, customer_demographics, date_dim, store, item):
results = (
store_sales.join(customer_demographics, [("ss_cdemo_sk", "cd_demo_sk")])
Expand Down Expand Up @@ -1999,11 +1994,6 @@ def test_38(store_sales, catalog_sales, web_sales, date_dim, customer):


@tpc_test("ds")
@pytest.mark.notyet(
["datafusion"],
raises=ArrowNotImplementedError,
reason="Unsupported cast from double to null using function cast_null",
)
def test_39(inventory, item, warehouse, date_dim):
inv = (
inventory.join(item, [("inv_item_sk", "i_item_sk")])
Expand Down Expand Up @@ -4894,11 +4884,6 @@ def test_89(item, store_sales, date_dim, store):
).limit(100)


@pytest.mark.notyet(
["datafusion"],
raises=ArrowNotImplementedError,
reason="Unsupported cast from double to null using function cast_null",
)
@tpc_test("ds")
def test_90(web_sales, household_demographics, time_dim, web_page):
def am_pm(*, hour: int, name: str):
Expand Down
15 changes: 8 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e7cfc11

Please sign in to comment.