Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(deps): update dependency datafusion to v41 #10147

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
# self.con.register_table is broken, so we do this roundabout thing
# of constructing a datafusion DataFrame, which has a side effect
# of registering the table
self.con.from_arrow_table(op.data.to_pyarrow(op.schema), op.name)
self.con.from_arrow(op.data.to_pyarrow(op.schema), op.name)

def read_csv(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
Expand Down Expand Up @@ -757,14 +757,14 @@ def _polars(source, table_name, _conn, overwrite: bool = False):
def _pyarrow_table(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source, name=tmp_name)
_conn.con.from_arrow(source, name=tmp_name)


@_read_in_memory.register("pyarrow.RecordBatchReader")
def _pyarrow_rbr(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source.read_all(), name=tmp_name)
_conn.con.from_arrow(source.read_all(), name=tmp_name)


@_read_in_memory.register("pyarrow.RecordBatch")
Expand Down
4 changes: 3 additions & 1 deletion ibis/backends/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,9 @@ def test_grouped_case(backend, con):

@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["datafusion"], raises=Exception, reason="not supported in datafusion"
["datafusion"],
raises=BaseException,
reason="because pyo3 panic exception is raised",
)
@pytest.mark.notyet(["flink"], raises=Py4JJavaError)
@pytest.mark.notyet(["impala"], raises=ImpalaHiveServer2Error)
Expand Down
45 changes: 40 additions & 5 deletions ibis/backends/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
["bigquery", "impala"], reason="Backend doesn't yet implement map types"
),
pytest.mark.notimpl(
["datafusion", "exasol", "polars", "druid", "oracle"],
["exasol", "polars", "druid", "oracle"],
reason="Not yet implemented in ibis",
),
]
Expand All @@ -39,6 +39,10 @@
reason="function hstore(character varying[], character varying[]) does not exist",
)

mark_notyet_datafusion = pytest.mark.notyet(
["datafusion"], raises=Exception, reason="only map and make_map are available"
)


@pytest.mark.notyet("clickhouse", reason="nested types can't be NULL")
@pytest.mark.notimpl(
Expand All @@ -54,6 +58,7 @@
param(None, None, id="null_both"),
],
)
@mark_notyet_datafusion
def test_map_nulls(con, k, v):
k = ibis.literal(k, type="array<string>")
v = ibis.literal(v, type="array<string>")
Expand All @@ -74,6 +79,7 @@ def test_map_nulls(con, k, v):
param(None, None, id="null_both"),
],
)
@mark_notyet_datafusion
def test_map_keys_nulls(con, k, v):
k = ibis.literal(k, type="array<string>")
v = ibis.literal(v, type="array<string>")
Expand Down Expand Up @@ -106,6 +112,7 @@ def test_map_keys_nulls(con, k, v):
param(ibis.literal(None, type="map<string, string>"), id="null_map"),
],
)
@mark_notyet_datafusion
def test_map_values_nulls(con, map):
assert con.execute(map.values()) is None

Expand Down Expand Up @@ -174,6 +181,7 @@ def test_map_values_nulls(con, map):
],
)
@pytest.mark.parametrize("method", ["get", "contains"])
@mark_notyet_datafusion
def test_map_get_contains_nulls(con, map, key, method):
expr = getattr(map, method)
assert con.execute(expr(key)) is None
Expand Down Expand Up @@ -205,18 +213,21 @@ def test_map_get_contains_nulls(con, map, key, method):
),
],
)
@mark_notyet_datafusion
def test_map_merge_nulls(con, m1, m2):
concatted = m1 + m2
assert con.execute(concatted) is None


@mark_notyet_datafusion
def test_map_table(backend):
table = backend.map
assert table.kv.type().is_map()
assert not table.limit(1).execute().empty


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_column_map_values(backend):
table = backend.map
expr = table.select("idx", vals=table.kv.values()).order_by("idx")
Expand All @@ -225,6 +236,7 @@ def test_column_map_values(backend):
backend.assert_series_equal(result, expected)


@mark_notyet_datafusion
def test_column_map_merge(backend):
table = backend.map
expr = table.select(
Expand All @@ -239,6 +251,7 @@ def test_column_map_merge(backend):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_keys(con):
mapping = ibis.literal({"1": "a", "2": "b"})
expr = mapping.keys().name("tmp")
Expand All @@ -250,6 +263,7 @@ def test_literal_map_keys(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_values(con):
mapping = ibis.literal({"1": "a", "2": "b"})
expr = mapping.values().name("tmp")
Expand All @@ -260,6 +274,7 @@ def test_literal_map_values(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_scalar_isin_literal_map_keys(con):
mapping = ibis.literal({"a": 1, "b": 2})
a = ibis.literal("a")
Expand All @@ -272,6 +287,7 @@ def test_scalar_isin_literal_map_keys(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_scalar_contains_key_scalar(con):
mapping = ibis.literal({"a": 1, "b": 2})
a = ibis.literal("a")
Expand All @@ -283,6 +299,7 @@ def test_map_scalar_contains_key_scalar(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_scalar_contains_key_column(backend, alltypes, df):
value = {"1": "a", "3": "c"}
mapping = ibis.literal(value)
Expand All @@ -294,6 +311,7 @@ def test_map_scalar_contains_key_column(backend, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_column_contains_key_scalar(backend, alltypes, df):
expr = ibis.map(ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col]))
series = df.apply(lambda row: {row["string_col"]: row["int_col"]}, axis=1)
Expand All @@ -306,6 +324,7 @@ def test_map_column_contains_key_scalar(backend, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_column_contains_key_column(alltypes):
map_expr = ibis.map(
ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col])
Expand All @@ -317,6 +336,7 @@ def test_map_column_contains_key_column(alltypes):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_literal_map_merge(con):
a = ibis.literal({"a": 0, "b": 2})
b = ibis.literal({"a": 1, "c": 3})
Expand All @@ -326,6 +346,7 @@ def test_literal_map_merge(con):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_getitem_broadcast(backend, alltypes, df):
value = {"1": "a", "2": "b"}

Expand Down Expand Up @@ -472,6 +493,7 @@ def test_literal_map_getitem_broadcast(backend, alltypes, df):
@values
@keys
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_get_all_types(con, keys, values):
m = ibis.map(ibis.array(keys), ibis.array(values))
for key, val in zip(keys, values):
Expand All @@ -482,6 +504,7 @@ def test_map_get_all_types(con, keys, values):

@keys
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_contains_all_types(con, keys):
a = ibis.array(keys)
m = ibis.map(a, a)
Expand All @@ -490,6 +513,7 @@ def test_map_contains_all_types(con, keys):


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_literal_map_get_broadcast(backend, alltypes, df):
value = {"1": "a", "2": "b"}

Expand Down Expand Up @@ -524,13 +548,14 @@ def test_map_construct_dict(con, keys, values):
assert result == dict(zip(keys, values))


@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@pytest.mark.notimpl(
["flink"],
raises=pa.lib.ArrowInvalid,
reason="Map array child array should have no nulls",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_construct_array_column(con, alltypes, df):
expr = ibis.map(ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col]))
result = con.execute(expr)
Expand All @@ -541,6 +566,7 @@ def test_map_construct_array_column(con, alltypes, df):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_compatible_value_smaller(con):
value = ibis.literal({"A": 1000, "B": 2000})
expr = value.get("C", 3)
Expand All @@ -549,6 +575,7 @@ def test_map_get_with_compatible_value_smaller(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_compatible_value_bigger(con):
value = ibis.literal({"A": 1, "B": 2})
expr = value.get("C", 3000)
Expand All @@ -557,6 +584,7 @@ def test_map_get_with_compatible_value_bigger(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_incompatible_value_different_kind(con):
value = ibis.literal({"A": 1000, "B": 2000})
expr = value.get("C", 3.0)
Expand All @@ -565,6 +593,7 @@ def test_map_get_with_incompatible_value_different_kind(con):

@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
@pytest.mark.parametrize("null_value", [None, ibis.null()])
def test_map_get_with_null_on_not_nullable(con, null_value):
map_type = dt.Map(dt.string, dt.Int16(nullable=False))
Expand All @@ -579,18 +608,20 @@ def test_map_get_with_null_on_not_nullable(con, null_value):
["flink"], raises=Py4JJavaError, reason="Flink cannot handle typeless nulls"
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_get_with_null_on_null_type_with_null(con, null_value):
value = ibis.literal({"A": None, "B": None})
expr = value.get("C", null_value)
result = con.execute(expr)
assert pd.isna(result)


@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@pytest.mark.notyet(
["flink"], raises=Py4JJavaError, reason="Flink cannot handle typeless nulls"
)
@mark_notimpl_risingwave_hstore
@mark_notyet_postgres
@mark_notyet_datafusion
def test_map_get_with_null_on_null_type_with_non_null(con):
value = ibis.literal({"A": None, "B": None})
expr = value.get("C", 1)
Expand All @@ -603,6 +634,7 @@ def test_map_get_with_null_on_null_type_with_non_null(con):
reason="`tbl_properties` is required when creating table with schema",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_create_table(con, temp_table):
t = con.create_table(
temp_table,
Expand All @@ -617,18 +649,21 @@ def test_map_create_table(con, temp_table):
reason="No translation rule for <class 'ibis.expr.operations.maps.MapLength'>",
)
@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_length(con):
expr = ibis.literal(dict(a="A", b="B")).length()
assert con.execute(expr) == 2


@mark_notyet_datafusion
def test_map_keys_unnest(backend):
expr = backend.map.kv.keys().unnest()
result = expr.to_pandas()
assert frozenset(result) == frozenset("abcdef")


@mark_notimpl_risingwave_hstore
@mark_notyet_datafusion
def test_map_contains_null(con):
expr = ibis.map(["a"], ibis.literal([None], type="array<string>"))
assert con.execute(expr.contains("a"))
Expand Down
17 changes: 1 addition & 16 deletions ibis/backends/tests/tpc/ds/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from ibis import _, coalesce, cumulative_window, date, ifelse, null, rank, union
from ibis import literal as lit
from ibis import selectors as s
from ibis.backends.tests.errors import (
ArrowNotImplementedError,
ClickHouseDatabaseError,
TrinoUserError,
)
from ibis.backends.tests.errors import ClickHouseDatabaseError, TrinoUserError
from ibis.backends.tests.tpc.conftest import tpc_test
from ibis.common.exceptions import OperationNotDefinedError

Expand Down Expand Up @@ -1416,7 +1412,6 @@ def test_26(catalog_sales, customer_demographics, date_dim, item, promotion):


@tpc_test("ds")
@pytest.mark.notyet(["datafusion"], reason="Failed to plan")
def test_27(store_sales, customer_demographics, date_dim, store, item):
results = (
store_sales.join(customer_demographics, [("ss_cdemo_sk", "cd_demo_sk")])
Expand Down Expand Up @@ -1999,11 +1994,6 @@ def test_38(store_sales, catalog_sales, web_sales, date_dim, customer):


@tpc_test("ds")
@pytest.mark.notyet(
["datafusion"],
raises=ArrowNotImplementedError,
reason="Unsupported cast from double to null using function cast_null",
)
def test_39(inventory, item, warehouse, date_dim):
inv = (
inventory.join(item, [("inv_item_sk", "i_item_sk")])
Expand Down Expand Up @@ -4894,11 +4884,6 @@ def test_89(item, store_sales, date_dim, store):
).limit(100)


@pytest.mark.notyet(
["datafusion"],
raises=ArrowNotImplementedError,
reason="Unsupported cast from double to null using function cast_null",
)
@tpc_test("ds")
def test_90(web_sales, household_demographics, time_dim, web_page):
def am_pm(*, hour: int, name: str):
Expand Down
15 changes: 8 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading