Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Oct 23, 2024
1 parent c58fbcb commit a413725
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 11 deletions.
11 changes: 2 additions & 9 deletions crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,11 @@ impl Source for CsvSource {
};

for data_chunk in &mut out {
// The batched reader creates the column containing all nulls because the schema it
// gets passed contains the column.
//
let n = data_chunk.data.height();
// SAFETY: Columns are only replaced with columns
// 1. of the same name, and
// 2. of the same length.
for s in unsafe { data_chunk.data.get_columns_mut() } {
if s.name() == ca.name() {
*s = ca.slice(0, s.len()).into_column();
break;
}
}
unsafe { data_chunk.data.get_columns_mut() }.push(ca.slice(0, n).into_column())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ where
FileScan::Csv { options, .. } => {
let src = sources::CsvSource::new(
sources,
file_info.schema,
file_info.reader_schema.clone().unwrap().unwrap_right(),
options,
file_options,
verbose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(super) fn process_hstack(
acc_projections,
&lp_arena.get(input).schema(lp_arena),
expr_arena,
false,
true, // expands_schema
);

proj_pd.pushdown_and_assign(
Expand Down
26 changes: 26 additions & 0 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,29 @@ def test_scan_include_file_paths_respects_projection_pushdown() -> None:
)

assert_frame_equal(q.collect(), pl.DataFrame({"a": "a1", "b": "b1"}))


def test_streaming_scan_csv_include_file_paths_18257(io_files_path: Path) -> None:
lf = pl.scan_csv(
io_files_path / "foods1.csv",
include_file_paths="path",
).select("category", "path")

assert lf.collect(streaming=True).columns == ["category", "path"]


def test_streaming_scan_csv_with_row_index_19172(io_files_path: Path) -> None:
lf = (
pl.scan_csv(io_files_path / "foods1.csv", infer_schema=False)
.with_row_index()
.select("calories", "index")
.head(1)
)

assert_frame_equal(
lf.collect(streaming=True),
pl.DataFrame(
{"calories": "45", "index": 0},
schema={"calories": pl.String, "index": pl.UInt32},
),
)
30 changes: 30 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,33 @@ def test_projections_collapse_17781() -> None:
else:
lf = lf.join(lfj, on="index", how="left")
assert "SELECT " not in lf.explain() # type: ignore[union-attr]


def test_with_columns_projection_pushdown() -> None:
# # Summary
# `process_hstack` in projection PD incorrectly took a fast-path meant for
# LP nodes that don't add new columns to the schema, which stops projection
# PD if it sees that the schema lengths on the upper node matches.
#
# To trigger this, we drop the same number of columns before and after
# the with_columns, and in the with_columns we also add the same number of
# columns.
lf = (
pl.scan_csv(
b"""\
a,b,c,d,e
1,1,1,1,1
""",
include_file_paths="path",
)
.drop("a", "b")
.with_columns(pl.lit(1).alias(x) for x in ["x", "y"])
.drop("c", "d")
)

plan = lf.explain().strip()

assert plan.startswith("WITH_COLUMNS:")
# [dyn int: 1.alias("x"), dyn int: 1.alias("y")]
# Csv SCAN [20 in-mem bytes]
assert plan.endswith("PROJECT 1/6 COLUMNS")

0 comments on commit a413725

Please sign in to comment.