Skip to content

Commit

Permalink
test(python): Fix spurious test failures (#13961)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Jan 24, 2024
1 parent 0d01a3a commit 6273995
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 178 deletions.
3 changes: 3 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_predicate_pushdown(
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files(
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None:
df = pl.read_ipc(io_files_path / "*.ipc")
Expand Down Expand Up @@ -127,6 +129,7 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) ->
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_projection_pushdown(
io_files_path: Path, tmp_path: Path
Expand Down
16 changes: 0 additions & 16 deletions py-polars/tests/unit/io/test_lazy_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,3 @@ def test_anonymous_scan_explain(io_files_path: Path) -> None:
q = pl.scan_ndjson(source=file)
assert "Anonymous" in q.explain()
assert "Anonymous" in q.show_graph(raw_output=True) # type: ignore[operator]


def test_sink_ndjson_should_write_same_data(
io_files_path: Path, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
# Arrange
source_path = io_files_path / "foods1.csv"
target_path = tmp_path / "foods_test.ndjson"
expected = pl.read_csv(source_path)
lf = pl.scan_csv(source_path)
# Act
lf.sink_ndjson(target_path)
df = pl.read_ndjson(target_path)
# Assert
assert_frame_equal(df, expected)
42 changes: 1 addition & 41 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,46 +201,6 @@ def test_row_index_schema_parquet(parquet_file_path: Path) -> None:
).dtypes == [pl.UInt32, pl.String]


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)


@pytest.mark.write_disk()
def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
Expand Down Expand Up @@ -314,7 +274,7 @@ def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> Non


@pytest.mark.write_disk()
def test_streaming_categorical(tmp_path: Path) -> None:
def test_categorical(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame(
Expand Down
16 changes: 0 additions & 16 deletions py-polars/tests/unit/operations/test_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,22 +831,6 @@ def test_group_by_rolling_deprecated() -> None:
assert_frame_equal(result_lazy, expected, check_row_order=False)


def test_group_by_multiple_keys_one_literal() -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
for streaming in [True, False]:
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)


def test_group_by_list_scalar_11749() -> None:
df = pl.DataFrame(
{
Expand Down
82 changes: 0 additions & 82 deletions py-polars/tests/unit/operations/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,88 +738,6 @@ def test_outer_join_bool() -> None:
}


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches(streaming: bool) -> None:
# null values in joins should never find a match.
df_a = pl.LazyFrame(
{
"idx_a": [0, 1, 2],
"a": [None, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"idx_b": [0, 1, 2, 3],
"a": [None, 2, 1, None],
}
)

expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
assert_frame_equal(
df_a.join(df_b, on="a", how="inner").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
)
assert_frame_equal(
df_a.join(df_b, on="a", how="left").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{
"idx_a": [None, 2, 1, None, 0],
"a": [None, 2, 1, None, None],
"idx_b": [0, 1, 2, 3, None],
"a_right": [None, 2, 1, None, None],
}
)
assert_frame_equal(df_a.join(df_b, on="a", how="outer").collect(), expected)


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
df_a = pl.LazyFrame(
{
"a": [None, 1, 2],
"idx": [0, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"a": [None, 2, 1, None, 1],
"idx": [0, 1, 2, 3, 1],
"c": [10, 20, 30, 40, 50],
}
)

expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="inner").collect(streaming=streaming),
expected,
)
expected = pl.DataFrame(
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="left").collect(streaming=streaming),
expected,
)

expected = pl.DataFrame(
{
"a": [None, None, None, None, None, 1, 2],
"idx": [None, None, None, None, 0, 1, 2],
"a_right": [None, 2, 1, None, None, 1, None],
"idx_right": [0, 1, 2, 3, None, 1, None],
"c": [10, 20, 30, 40, None, 50, None],
}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="outer").sort("a").collect(), expected
)


def test_outer_join_coalesce_different_names_13450() -> None:
df1 = pl.DataFrame({"L1": ["a", "b", "c"], "L3": ["b", "c", "d"], "L2": [1, 2, 3]})
df2 = pl.DataFrame({"L3": ["a", "c", "d"], "R2": [7, 8, 9]})
Expand Down
3 changes: 3 additions & 0 deletions py-polars/tests/unit/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytestmark = pytest.mark.xdist_group("streaming")
2 changes: 0 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
if TYPE_CHECKING:
from polars.type_aliases import JoinStrategy

pytestmark = pytest.mark.xdist_group("streaming")


def test_streaming_categoricals_5921() -> None:
with pl.StringCache():
Expand Down
2 changes: 0 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming_cse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import polars as pl
from polars.testing import assert_frame_equal

pytestmark = pytest.mark.xdist_group("streaming")


def test_cse_expr_selection_streaming(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
Expand Down
18 changes: 16 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import polars as pl
from polars.testing import assert_frame_equal

pytestmark = pytest.mark.xdist_group("streaming")


@pytest.mark.slow()
def test_streaming_group_by_sorted_fast_path_nulls_10273() -> None:
Expand Down Expand Up @@ -422,3 +420,19 @@ def test_streaming_group_by_literal(literal: Any) -> None:
"a_count": [20],
"a_sum": [190],
}


@pytest.mark.parametrize("streaming", [True, False])
def test_group_by_multiple_keys_one_literal(streaming: bool) -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)
67 changes: 61 additions & 6 deletions py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import unittest
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

import pytest

Expand All @@ -12,9 +12,6 @@
from pathlib import Path


pytestmark = pytest.mark.xdist_group("streaming")


@pytest.mark.write_disk()
def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
Expand Down Expand Up @@ -122,7 +119,7 @@ def test_sink_csv_with_options() -> None:
passed into the rust-polars correctly.
"""
df = pl.LazyFrame({"dummy": ["abc"]})
with unittest.mock.patch.object(df, "_ldf") as ldf:
with patch.object(df, "_ldf") as ldf:
df.sink_csv(
"path",
include_bom=True,
Expand Down Expand Up @@ -198,3 +195,61 @@ def test_streaming_cross_join_schema(tmp_path: Path) -> None:
a.join(b, how="cross").sink_parquet(file_path)
read = pl.read_parquet(file_path, parallel="none")
assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}


@pytest.mark.write_disk()
def test_sink_ndjson_should_write_same_data(
io_files_path: Path, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)

source_path = io_files_path / "foods1.csv"
target_path = tmp_path / "foods_test.ndjson"

expected = pl.read_csv(source_path)

lf = pl.scan_csv(source_path)
lf.sink_ndjson(target_path)
df = pl.read_ndjson(target_path)

assert_frame_equal(df, expected)


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)
Loading

0 comments on commit 6273995

Please sign in to comment.