From a4137258d329cc75e95715c58f0f07a95ef0259c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 23 Oct 2024 22:09:42 +1100 Subject: [PATCH] c --- .../polars-pipe/src/executors/sources/csv.rs | 11 ++----- crates/polars-pipe/src/pipeline/convert.rs | 2 +- .../optimizer/projection_pushdown/hstack.rs | 2 +- py-polars/tests/unit/io/test_scan.py | 26 ++++++++++++++++ py-polars/tests/unit/test_projections.py | 30 +++++++++++++++++++ 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 38484f9c7255..900be25256b4 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -216,18 +216,11 @@ impl Source for CsvSource { }; for data_chunk in &mut out { - // The batched reader creates the column containing all nulls because the schema it - // gets passed contains the column. - // + let n = data_chunk.data.height(); // SAFETY: Columns are only replaced with columns // 1. of the same name, and // 2. of the same length. - for s in unsafe { data_chunk.data.get_columns_mut() } { - if s.name() == ca.name() { - *s = ca.slice(0, s.len()).into_column(); - break; - } - } + unsafe { data_chunk.data.get_columns_mut() }.push(ca.slice(0, n).into_column()) } } diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 23df59fa59f7..b0b19aa26708 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -109,7 +109,7 @@ where FileScan::Csv { options, .. } => { let src = sources::CsvSource::new( sources, - file_info.schema, + file_info.reader_schema.clone().unwrap().unwrap_right(), options, file_options, verbose, diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs index 8096b5bde3d8..81328fe208e6 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs @@ -63,7 +63,7 @@ pub(super) fn process_hstack( acc_projections, &lp_arena.get(input).schema(lp_arena), expr_arena, - false, + true, // expands_schema ); proj_pd.pushdown_and_assign( diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 4977fa115749..30af7b830ff8 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -809,3 +809,29 @@ def test_scan_include_file_paths_respects_projection_pushdown() -> None: ) assert_frame_equal(q.collect(), pl.DataFrame({"a": "a1", "b": "b1"})) + + +def test_streaming_scan_csv_include_file_paths_18257(io_files_path: Path) -> None: + lf = pl.scan_csv( + io_files_path / "foods1.csv", + include_file_paths="path", + ).select("category", "path") + + assert lf.collect(streaming=True).columns == ["category", "path"] + + +def test_streaming_scan_csv_with_row_index_19172(io_files_path: Path) -> None: + lf = ( + pl.scan_csv(io_files_path / "foods1.csv", infer_schema=False) + .with_row_index() + .select("calories", "index") + .head(1) + ) + + assert_frame_equal( + lf.collect(streaming=True), + pl.DataFrame( + {"calories": "45", "index": 0}, + schema={"calories": pl.String, "index": pl.UInt32}, + ), + ) diff --git a/py-polars/tests/unit/test_projections.py b/py-polars/tests/unit/test_projections.py index 48b3077423e0..e075fdb6acc2 100644 --- a/py-polars/tests/unit/test_projections.py +++ b/py-polars/tests/unit/test_projections.py @@ -582,3 +582,33 @@ def test_projections_collapse_17781() -> None: else: lf = lf.join(lfj, on="index", how="left") assert "SELECT " not in lf.explain() # type: ignore[union-attr] + + +def test_with_columns_projection_pushdown() -> None: + # # Summary + # `process_hstack` in projection PD incorrectly took a fast-path meant for + # LP nodes that don't add new columns to the schema, which stops projection + # PD if it sees that the schema lengths on the upper node matches. + # + # To trigger this, we drop the same number of columns before and after + # the with_columns, and in the with_columns we also add the same number of + # columns. + lf = ( + pl.scan_csv( + b"""\ +a,b,c,d,e +1,1,1,1,1 +""", + include_file_paths="path", + ) + .drop("a", "b") + .with_columns(pl.lit(1).alias(x) for x in ["x", "y"]) + .drop("c", "d") + ) + + plan = lf.explain().strip() + + assert plan.startswith("WITH_COLUMNS:") + # [dyn int: 1.alias("x"), dyn int: 1.alias("y")] + # Csv SCAN [20 in-mem bytes] + assert plan.endswith("PROJECT 1/6 COLUMNS")