diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index 8d9f40755c6da..dd2b052e5b003 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -25,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>; type QueuePayload = (usize, DownloadedRowGroup); type QueueSend = Arc>>; +#[derive(Clone)] pub struct ParquetObjectStore { store: PolarsObjectStore, path: ObjectPath, @@ -266,7 +267,7 @@ pub struct FetchRowGroupsFromObjectStore { impl FetchRowGroupsFromObjectStore { pub fn new( - reader: ParquetObjectStore, + reader: Arc, schema: ArrowSchemaRef, projection: Option<&[usize]>, predicate: Option>, @@ -304,7 +305,6 @@ impl FetchRowGroupsFromObjectStore { } else { row_groups.iter().cloned().enumerate().collect() }; - let reader = Arc::new(reader); let msg_limit = get_rg_prefetch_size(); if verbose() { diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 5691857a4ac4c..b49933d85d2e1 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -1,14 +1,19 @@ use std::io::{Read, Seek}; use arrow::datatypes::ArrowSchemaRef; +use polars_core::config::verbose; use polars_core::prelude::*; #[cfg(feature = "cloud")] use polars_core::utils::accumulate_dataframes_vertical_unchecked; -use polars_parquet::read; +use polars_parquet::read::{self}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader}; +use super::mmap::ColumnStore; +use super::predicates::read_this_row_group; +use super::read_impl::{ + column_idx_to_series, materialize_hive_partitions, read_parquet, FetchRowGroupsFromMmapReader, +}; #[cfg(feature = "cloud")] use crate::cloud::CloudOptions; use crate::mmap::MmapBytesReader; @@ -17,7 +22,7 @@ use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use crate::parquet::async_impl::ParquetObjectStore; pub use crate::parquet::read_impl::BatchedParquetReader; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::prelude::*; use crate::RowIndex; @@ -151,7 +156,7 @@ impl ParquetReader { let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into(); BatchedParquetReader::new( row_group_fetcher, - metadata, + metadata.row_groups.clone(), schema, self.n_rows.unwrap_or(usize::MAX), self.projection, @@ -272,6 +277,27 @@ impl ParquetAsyncReader { self.reader.num_rows().await } + pub async fn num_rows_with_predicate(&mut self) -> PolarsResult { + let metadata = self.reader.get_metadata().await?.clone(); + let schema = self.schema().await?; + let reader = Arc::new(self.reader.clone()); + + let row_sizes = prune_row_groups( + reader.clone(), + schema.clone(), + metadata, + usize::MAX, + self.predicate.clone(), + self.hive_partition_columns.as_deref(), + ) + .await? + .iter() + .map(|(row_size, _md)| *row_size) + .collect::>(); + + Ok(row_sizes.iter().sum()) + } + pub fn with_n_rows(mut self, n_rows: Option) -> Self { self.n_rows = n_rows; self @@ -316,25 +342,43 @@ impl ParquetAsyncReader { pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.clone(); - let schema = match self.schema { - Some(schema) => schema, - None => self.schema().await?, - }; + let schema = self.schema().await?; + let limit = self.n_rows.unwrap_or(usize::MAX); + + let reader = Arc::new(self.reader); + + let mut final_row_groups = metadata.row_groups.clone(); + if let Some(_predicates) = self.predicate.clone() { + if self.row_index.is_none() { + final_row_groups = prune_row_groups( + reader.clone(), + schema.clone(), + metadata.clone(), + limit, + self.predicate.clone(), + self.hive_partition_columns.as_deref(), + ) + .await? + .iter() + .map(|(_row_count, md)| md.clone()) + .collect::>(); + } + } // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( - self.reader, + reader.clone(), schema.clone(), self.projection.as_deref(), self.predicate.clone(), - &metadata.row_groups, - self.n_rows.unwrap_or(usize::MAX), + &final_row_groups, + usize::MAX, )? .into(); BatchedParquetReader::new( row_group_fetcher, - metadata, + final_row_groups, schema, - self.n_rows.unwrap_or(usize::MAX), + usize::MAX, self.projection, self.predicate.clone(), self.row_index, @@ -351,7 +395,6 @@ impl ParquetAsyncReader { pub async fn finish(mut self) -> PolarsResult { let rechunk = self.rechunk; - let metadata = self.get_metadata().await?.clone(); let reader_schema = self.schema().await?; let row_index = self.row_index.clone(); let hive_partition_columns = self.hive_partition_columns.clone(); @@ -359,7 +402,15 @@ impl ParquetAsyncReader { // batched reader deals with slice pushdown let reader = self.batched(usize::MAX).await?; - let n_batches = metadata.row_groups.len(); + let n_batches = reader.num_row_groups(); + if n_batches == 0 { + return Ok(materialize_empty_df( + projection.as_deref(), + reader_schema.as_ref(), + hive_partition_columns.as_deref(), + row_index.as_ref(), + )); + } let mut iter = reader.iter(n_batches); let mut chunks = Vec::with_capacity(n_batches); @@ -382,3 +433,82 @@ impl ParquetAsyncReader { Ok(df) } } + +#[cfg(feature = "cloud")] +async fn prune_row_groups( + reader: Arc, + schema: Arc, + metadata: Arc, + limit: usize, + predicate: Option>, + hive_partition_columns: Option<&[Series]>, +) -> PolarsResult> { + let predicate_columns = predicate.clone().unwrap().columns(); + let predicate_projection = materialize_projection( + Some(&predicate_columns), + &Schema::from(schema.clone()), + hive_partition_columns, + false, + ); + + let mut predicate_row_group_fetcher = FetchRowGroupsFromObjectStore::new( + reader.clone(), + schema.clone(), + predicate_projection.clone().as_deref(), + predicate.clone(), + &metadata.row_groups, + usize::MAX, + )?; + + let predicate_store: ColumnStore<'_> = predicate_row_group_fetcher + .fetch_row_groups(0..metadata.row_groups.len()) + .await?; + + let mut remaining_rows = limit; + + let row_groups = metadata.row_groups.clone(); + let final_row_groups = row_groups + .iter() + .map(|md| { + if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap() || remaining_rows == 0 { + return (0, md); + } + let chunk_size = md.num_rows(); + + let mut columns: Vec = vec![]; + + for column_i in predicate_projection.as_ref().unwrap() { + let column = column_idx_to_series( + *column_i, + md, + usize::MAX, + &schema.clone(), + &predicate_store, + chunk_size, + ) + .unwrap(); + columns.push(column) + } + + let mut df = unsafe { DataFrame::new_no_checks(columns) }; + + materialize_hive_partitions(&mut df, hive_partition_columns, md.num_rows()); + apply_predicate(&mut df, predicate.as_deref(), false).unwrap(); + + let row_count = df.height(); + + remaining_rows = remaining_rows.saturating_sub(row_count); + + (row_count, md) + }) + .filter(|(row_count, _md)| *row_count != 0) + .map(|(row_count, md)| (row_count, md.clone())) + .collect::>(); + if verbose() { + eprintln!( + "reduced the number of row groups in pruning by {}", + row_groups.len() - final_row_groups.len() + ) + } + Ok(final_row_groups) +} diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 096fa9708c7a5..1774c8540bbe2 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -8,7 +8,7 @@ use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; use polars_parquet::read; -use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData}; +use polars_parquet::read::{ArrayIter, RowGroupMetaData}; use rayon::prelude::*; use super::materialize_empty_df; @@ -49,7 +49,7 @@ fn assert_dtypes(data_type: &ArrowDataType) { } } -fn column_idx_to_series( +pub fn column_idx_to_series( column_i: usize, md: &RowGroupMetaData, remaining_rows: usize, @@ -132,7 +132,7 @@ fn rg_to_dfs( row_group_start: usize, row_group_end: usize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -148,7 +148,7 @@ fn rg_to_dfs( row_group_start, row_group_end, remaining_rows, - file_metadata, + row_group_metadata, schema, predicate, row_index, @@ -164,7 +164,7 @@ fn rg_to_dfs( row_group_end, previous_row_count, remaining_rows, - file_metadata, + row_group_metadata, schema, predicate, row_index, @@ -183,7 +183,7 @@ fn rg_to_dfs_optionally_par_over_columns( row_group_start: usize, row_group_end: usize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -194,13 +194,14 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); - for rg_idx in row_group_start..row_group_end { - let md = &file_metadata.row_groups[rg_idx]; + for md in row_group_metadata + .iter() + .take(row_group_end) + .skip(row_group_start) + { let current_row_count = md.num_rows() as IdxSize; - if use_statistics - && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? - { + if use_statistics && !read_this_row_group(predicate, md, schema)? { *previous_row_count += current_row_count; continue; } @@ -272,7 +273,7 @@ fn rg_to_dfs_par_over_rg( row_group_end: usize, previous_row_count: &mut IdxSize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -281,8 +282,7 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { // compute the limits per row group and the row count offsets - let row_groups = file_metadata - .row_groups + let row_groups = row_group_metadata .iter() .enumerate() .skip(row_group_start) @@ -304,11 +304,7 @@ fn rg_to_dfs_par_over_rg( .map(|(rg_idx, md, projection_height, row_count_start)| { if projection_height == 0 || use_statistics - && !read_this_row_group( - predicate, - &file_metadata.row_groups[rg_idx], - schema, - )? + && !read_this_row_group(predicate, &row_group_metadata[rg_idx], schema)? { return Ok(None); } @@ -420,7 +416,7 @@ pub fn read_parquet( 0, n_row_groups, &mut limit, - &file_metadata, + &file_metadata.row_groups, reader_schema, predicate, row_index.clone(), @@ -520,7 +516,7 @@ pub struct BatchedParquetReader { limit: usize, projection: Arc<[usize]>, schema: ArrowSchemaRef, - metadata: FileMetaDataRef, + row_group_metadata: Vec, predicate: Option>, row_index: Option, rows_read: IdxSize, @@ -539,7 +535,7 @@ impl BatchedParquetReader { #[allow(clippy::too_many_arguments)] pub fn new( row_group_fetcher: RowGroupFetcher, - metadata: FileMetaDataRef, + row_group_metadata: Vec, schema: ArrowSchemaRef, limit: usize, projection: Option>, @@ -550,7 +546,7 @@ impl BatchedParquetReader { hive_partition_columns: Option>, mut parallel: ParallelStrategy, ) -> PolarsResult { - let n_row_groups = metadata.row_groups.len(); + let n_row_groups = row_group_metadata.len(); let projection = projection .map(Arc::from) .unwrap_or_else(|| (0usize..schema.len()).collect::>()); @@ -575,7 +571,7 @@ impl BatchedParquetReader { limit, projection, schema, - metadata, + row_group_metadata, row_index, rows_read: 0, predicate, @@ -594,6 +590,10 @@ impl BatchedParquetReader { self.limit == 0 } + pub fn num_row_groups(&self) -> usize { + self.row_group_metadata.len() + } + pub fn schema(&self) -> &ArrowSchemaRef { &self.schema } @@ -626,7 +626,7 @@ impl BatchedParquetReader { row_group_start, row_group_start + n, self.limit, - &self.metadata.row_groups, + &self.row_group_metadata, ); let store = self @@ -641,7 +641,7 @@ impl BatchedParquetReader { row_group_start, row_group_end, &mut self.limit, - &self.metadata, + &self.row_group_metadata, &self.schema, self.predicate.as_deref(), self.row_index.clone(), @@ -665,7 +665,7 @@ impl BatchedParquetReader { let row_index = self.row_index.clone(); let predicate = self.predicate.clone(); let schema = self.schema.clone(); - let metadata = self.metadata.clone(); + let metadata = self.row_group_metadata.clone(); let parallel = self.parallel; let projection = self.projection.clone(); let use_statistics = self.use_statistics; diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 7c3dca6b654e0..a377e89e9fc7d 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -13,6 +13,8 @@ pub trait PhysicalIoExpr: Send + Sync { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { None } + + fn columns(&self) -> Vec; } pub trait StatsEvaluator { diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 083874daaf752..b6c69a1ff4b42 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -201,6 +201,7 @@ impl ParquetExec { } else { (None, None) }; + let mut reader = ParquetAsyncReader::from_uri( &path.to_string_lossy(), cloud_options, @@ -221,6 +222,7 @@ impl ParquetExec { } let num_rows = reader.num_rows().await?; + PolarsResult::Ok((num_rows, reader)) }); let readers_and_metadata = futures::future::try_join_all(iter).await?; @@ -240,7 +242,6 @@ impl ParquetExec { let use_statistics = self.options.use_statistics; let predicate = &self.predicate; let base_row_index_ref = &base_row_index; - if verbose { eprintln!("reading of {}/{} file...", processed, self.paths.len()); } diff --git a/crates/polars-lazy/src/physical_plan/expressions/mod.rs b/crates/polars-lazy/src/physical_plan/expressions/mod.rs index 4642654a9fb6c..1d87bf34b814f 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/mod.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/mod.rs @@ -654,6 +654,19 @@ impl PhysicalIoExpr for PhysicalIoHelper { fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } pub(crate) fn phys_expr_to_io_expr(expr: Arc) -> Arc { diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index d5c584c1eff96..12130bad03355 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -31,6 +31,19 @@ impl PhysicalIoExpr for Wrap { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.0.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &dyn Any) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 5413592ed8340..17ec07a7a4ca7 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -30,6 +30,10 @@ impl PhysicalIoExpr for Len { fn evaluate_io(&self, _df: &DataFrame) -> PolarsResult { unimplemented!() } + + fn columns(&self) -> Vec { + unimplemented!() + } } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &dyn Any) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index ecd2b20b40608..9f28c99816825 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -161,7 +161,11 @@ impl ParquetSource { } #[cfg(feature = "async")] - async fn init_reader_async(&self, index: usize) -> PolarsResult { + async fn init_reader_async( + &self, + index: usize, + n_rows: usize, + ) -> PolarsResult { let metadata = self.metadata.clone(); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); @@ -172,7 +176,7 @@ impl ParquetSource { let uri = path.to_string_lossy(); ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata) .await? - .with_n_rows(file_options.n_rows) + .with_n_rows(Some(n_rows)) .with_row_index(file_options.row_index) .with_projection(projection) .with_predicate(predicate.clone()) @@ -184,6 +188,30 @@ impl ParquetSource { Ok(batched_reader) } + #[cfg(feature = "async")] + async fn num_rows_per_reader(&self, index: usize) -> PolarsResult { + let metadata = self.metadata.clone(); + let predicate: Option> = self.predicate.clone(); + let cloud_options = self.cloud_options.clone(); + let (path, options, _file_options, projection, _chunk_size, reader_schema, hive_partitions) = + self.prepare_init_reader(index)?; + + let mut reader = { + let uri = path.to_string_lossy(); + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata) + .await? + .with_projection(projection) + .with_predicate(predicate.clone()) + .use_statistics(options.use_statistics) + .with_hive_partition_columns(hive_partitions) + }; + let mut num_rows = reader.num_rows().await; + if predicate.is_some() { + num_rows = reader.num_rows_with_predicate().await; + } + num_rows + } + #[allow(unused_variables)] #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -251,7 +279,33 @@ impl ParquetSource { .zip(&mut self.iter) .map(|(_, index)| index) .collect::>(); - let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); + + let mut rows_left_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); + + let num_rows_to_read = range + .clone() + .into_iter() + .map(|index| self.num_rows_per_reader(index)); + + let num_rows_to_read = polars_io::pl_async::get_runtime().block_on(async { + futures::future::try_join_all(num_rows_to_read).await + })?; + + let num_rows_to_read = num_rows_to_read + .into_iter() + .map(|rows_per_reader| { + if rows_left_to_read == 0 { + return 0; + } + rows_left_to_read = rows_left_to_read.saturating_sub(rows_per_reader); + rows_per_reader + }) + .collect::>(); + + let init_iter = range + .into_iter() + .zip(num_rows_to_read) + .map(|(index, num_rows)| self.init_reader_async(index, num_rows)); let batched_readers = polars_io::pl_async::get_runtime() .block_on_potential_spawn(async { diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index dcb934db76f11..aa050deedb5ba 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -135,6 +135,18 @@ where fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.p.expression(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } PolarsResult::Ok(Arc::new(Wrap { p }) as Arc) diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 89886a569617d..e91ef967e87fc 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -3,7 +3,7 @@ import io from datetime import datetime, time, timezone from decimal import Decimal -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast import numpy as np import pandas as pd @@ -784,3 +784,19 @@ def test_parquet_array_statistics() -> None: assert pl.scan_parquet("test.parquet").filter( pl.col("a") != [1, 2, 3] ).collect().to_dict(as_series=False) == {"a": [[4, 5, 6], [7, 8, 9]], "b": [2, 3]} + + +def test_skip_full_load_of_rgs_using_predicate( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capfd: Any +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + df = pl.DataFrame( + {"a": pl.arange(0, 10, eager=True), "b": pl.arange(0, 10, eager=True)} + ) + root = tmp_path / "test_rg_skip.parquet" + df.write_parquet(root, use_pyarrow=True, row_group_size=2) + + q = pl.scan_parquet(root, parallel="row_groups") + assert q.filter(pl.col("a").gt(6)).collect().shape == (3, 2) + assert "reduced the number of row groups in pruning by 3" in capfd.readouterr().err