diff --git a/Cargo.lock b/Cargo.lock index 97bffdb060ea1..03e4eb19b6515 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3163,7 +3163,9 @@ version = "0.42.0" dependencies = [ "ahash", "bitflags", + "crossbeam-channel", "futures", + "glob", "memchr", "once_cell", "polars-arrow", @@ -3308,6 +3310,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "crossbeam-channel", "either", "futures", "hashbrown", diff --git a/Cargo.toml b/Cargo.toml index 44ba246bccae1..8229c994189ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ strength_reduce = "0.2" strum_macros = "0.26" thiserror = "1" tokio = "1.26" +tokio-stream = "0.1.15" tokio-util = "0.7.8" unicode-reverse = "1.0.8" url = "2.4" diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 17179f89cbdd1..2e0e873c592f0 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -621,6 +621,19 @@ impl PhysicalIoExpr for PhysicalIoHelper { fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } pub fn phys_expr_to_io_expr(expr: Arc) -> Arc { diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 97e4829581bca..f0133e564653a 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -271,26 +271,8 @@ impl FetchRowGroupsFromObjectStore { .collect() }); - let mut prefetched: PlHashMap = PlHashMap::new(); - - let mut row_groups = if let Some(pred) = predicate.as_deref() { - row_group_range - .filter_map(|i| { - let rg = &row_groups[i]; - let should_be_read = - matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); - - // Already add the row groups that will be skipped to the prefetched data. - if !should_be_read { - prefetched.insert(i, Default::default()); - } + let mut row_groups = row_groups.iter().cloned().enumerate().collect::>(); - should_be_read.then(|| (i, rg.clone())) - }) - .collect::>() - } else { - row_groups.iter().cloned().enumerate().collect() - }; let reader = Arc::new(reader); let msg_limit = get_rg_prefetch_size(); diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 15d35fcd285ba..a04c33e229112 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -56,7 +56,7 @@ fn assert_dtypes(data_type: &ArrowDataType) { } } -fn column_idx_to_series( +pub fn column_idx_to_series( column_i: usize, md: &RowGroupMetaData, filter: Option, @@ -760,7 +760,10 @@ impl From for RowGroupFetcher { } impl RowGroupFetcher { - async fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { + pub async fn fetch_row_groups( + &mut self, + _row_groups: Range, + ) -> PolarsResult { match self { RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups), #[cfg(feature = "cloud")] diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 30eb593191eb3..c17e1c6c69d72 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -2,6 +2,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use arrow::datatypes::ArrowSchemaRef; +use polars_core::config::verbose; use polars_core::prelude::*; #[cfg(feature = "cloud")] use polars_core::utils::accumulate_dataframes_vertical_unchecked; @@ -11,15 +12,20 @@ use polars_parquet::read; use super::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use super::async_impl::ParquetObjectStore; +use super::mmap::ColumnStore; +use super::predicates::read_this_row_group; pub use super::read_impl::BatchedParquetReader; -use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader}; +use super::read_impl::{ + column_idx_to_series, compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader, +}; #[cfg(feature = "cloud")] use super::utils::materialize_empty_df; #[cfg(feature = "cloud")] use crate::cloud::CloudOptions; +use crate::hive::materialize_hive_partitions; use crate::mmap::MmapBytesReader; use crate::parquet::metadata::FileMetaDataRef; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::prelude::*; use crate::RowIndex; @@ -269,7 +275,7 @@ impl ParquetAsyncReader { pub async fn from_uri( uri: &str, cloud_options: Option<&CloudOptions>, - metadata: Option, + metadata: Option ) -> PolarsResult { Ok(ParquetAsyncReader { reader: ParquetObjectStore::from_uri(uri, cloud_options, metadata).await?, @@ -328,6 +334,27 @@ impl ParquetAsyncReader { self } + pub async fn num_rows_with_predicate(mut self) -> PolarsResult { + let metadata = self.reader.get_metadata().await?.clone(); + let schema = self.schema().await?; + + let row_sizes = prune_row_groups( + self.reader, + schema.clone(), + metadata, + usize::MAX, + self.predicate.clone(), + self.projection.as_deref(), + self.hive_partition_columns.as_deref(), + ) + .await? + .iter() + .map(|(row_size, _md)| *row_size) + .collect::>(); + + Ok(row_sizes.iter().sum()) + } + pub fn with_row_index(mut self, row_index: Option) -> Self { self.row_index = row_index; self @@ -379,6 +406,13 @@ impl ParquetAsyncReader { Some(schema) => schema, None => self.schema().await?, }; + let mut row_groups = metadata.row_groups.clone(); + + if self.slice.1 != 0 { + self.slice = (0, usize::MAX); + } else { + row_groups = vec![]; + } // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( self.reader, @@ -389,9 +423,9 @@ impl ParquetAsyncReader { 0, metadata.row_groups.len(), self.slice, - &metadata.row_groups, + &row_groups, ), - &metadata.row_groups, + &row_groups, )? .into(); BatchedParquetReader::new( @@ -447,3 +481,92 @@ impl ParquetAsyncReader { Ok(df) } } + +#[cfg(feature = "cloud")] +async fn prune_row_groups( + reader: ParquetObjectStore, + schema: Arc, + metadata: Arc, + limit: usize, + predicate: Option>, + projection: Option<&[usize]>, + hive_partition_columns: Option<&[Series]>, +) -> PolarsResult> { + use super::read_impl::RowGroupFetcher; + + let predicate_columns = predicate.clone().unwrap().columns(); + let predicate_projection = materialize_projection( + Some(&predicate_columns), + &Schema::from(schema.clone()), + hive_partition_columns, + false, + ); + + let mut predicate_row_group_fetcher: RowGroupFetcher = FetchRowGroupsFromObjectStore::new( + reader, + schema.clone(), + projection, + predicate.clone(), + compute_row_group_range( + 0, + metadata.row_groups.len(), + (0, usize::MAX), + &metadata.row_groups, + ), + &metadata.row_groups, + )? + .into(); + + let predicate_store: ColumnStore = predicate_row_group_fetcher + .fetch_row_groups(0..metadata.row_groups.len()) + .await?; + + let mut remaining_rows = limit; + + let row_groups = metadata.row_groups.clone(); + let final_row_groups = row_groups + .iter() + .map(|md| { + if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap() + || remaining_rows == 0 + { + return (0, md); + } + + let mut columns: Vec = vec![]; + + for column_i in predicate_projection.as_ref().unwrap() { + let column = + column_idx_to_series(*column_i, md, None, &schema.clone(), &predicate_store) + .unwrap(); + columns.push(column) + } + + let mut df = unsafe { DataFrame::new_no_checks(columns) }; + let reader_schema = schema.as_ref(); + + materialize_hive_partitions( + &mut df, + reader_schema, + hive_partition_columns, + md.num_rows(), + ); + apply_predicate(&mut df, predicate.as_deref(), false).unwrap(); + + let row_count = df.height(); + + remaining_rows = remaining_rows.saturating_sub(row_count); + + (row_count, md) + }) + .filter(|(row_count, _md)| *row_count != 0) + .map(|(row_count, md)| (row_count, md.clone())) + .collect::>(); + if verbose() { + eprintln!( + "reduced the number of row groups in pruning by {}", + row_groups.len() - final_row_groups.len() + ) + } + Ok(final_row_groups) +} diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 08ad7685461ca..207fbc6986022 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -16,6 +16,8 @@ pub trait PhysicalIoExpr: Send + Sync { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { None } + + fn columns(&self) -> Vec; } pub trait StatsEvaluator { diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index e784049cd78a4..0106d599dedda 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -25,6 +25,8 @@ polars-utils = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } +crossbeam-channel = { workspace = true } +glob = { version = "0.3" } memchr = { workspace = true } once_cell = { workspace = true } pyo3 = { workspace = true, optional = true } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 9f81d07a97d3b..0aa0ccf65f341 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; +use crossbeam_channel::{bounded, Receiver}; #[cfg(feature = "csv")] pub use csv::*; #[cfg(not(target_arch = "wasm32"))] @@ -31,6 +32,7 @@ pub use ndjson::*; #[cfg(feature = "parquet")] pub use parquet::*; use polars_core::prelude::*; +use polars_core::POOL; use polars_expr::{create_physical_expr, ExpressionConversionState}; use polars_io::RowIndex; use polars_mem_engine::{create_physical_plan, Executor}; @@ -875,6 +877,30 @@ impl LazyFrame { ) } + pub fn sink_to_batches(mut self) -> Result, PolarsError> { + self.opt_state.set(OptState::STREAMING, true); + let morsels_per_sink = POOL.current_num_threads(); + let backpressure = morsels_per_sink * 4; + let (sender, receiver) = bounded(backpressure); + self.logical_plan = DslPlan::Sink { + input: Arc::new(self.logical_plan), + payload: SinkType::Batch { + sender: BatchSender { id: 0, sender }, + }, + }; + + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: format!("cannot run the whole query in a streaming order") + ); + POOL.spawn(move || { + let _ = physical_plan.execute(&mut state).unwrap(); + }); + + Ok(receiver) + } + #[cfg(any( feature = "ipc", feature = "parquet", diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 92103b3d81706..86f3415ce1197 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -33,6 +33,19 @@ impl PhysicalIoExpr for Wrap { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.0.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index a78dbf113151a..1417eb8e8fed3 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -358,6 +358,7 @@ impl ParquetExec { .await?; let num_rows = reader.num_rows().await?; + PolarsResult::Ok((num_rows, reader)) }); let readers_and_metadata = futures::future::try_join_all(iter).await?; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index fc7a29435c770..d92fbce78af79 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -217,6 +217,9 @@ fn create_physical_plan_impl( SinkType::Cloud { .. } => { polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.") }, + SinkType::Batch { .. } => { + polars_bail!(InvalidOperation: "batch sink not supported in the standard engine") + }, }, Union { inputs, options } => { let inputs = inputs diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 1603de3729fad..87e392a53334b 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -34,6 +34,10 @@ impl PhysicalIoExpr for Len { fn live_variables(&self) -> Option>> { Some(vec![]) } + + fn columns(&self) -> Vec { + unimplemented!() + } } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs new file mode 100644 index 0000000000000..58fd05e037a54 --- /dev/null +++ b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs @@ -0,0 +1,51 @@ +use std::any::Any; + +use crossbeam_channel::Sender; +use polars_core::prelude::*; + +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, +}; + +#[derive(Clone)] +pub struct BatchSink { + sender: Sender, +} + +impl BatchSink { + pub fn new(sender: Sender) -> PolarsResult { + Ok(Self { sender }) + } +} + +impl Sink for BatchSink { + fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { + let df: DataFrame = chunks_to_df_unchecked(vec![chunk]); + let result = self.sender.send(df); + match result { + Ok(..) => Ok(SinkResult::CanHaveMoreInput), + Err(..) => Ok(SinkResult::Finished), + } + } + + fn combine(&mut self, _other: &mut dyn Sink) { + // Nothing to do + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } + + fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult { + let _ = self.sender.send(Default::default()); + Ok(FinalizedSink::Finished(Default::default())) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn fmt(&self) -> &str { + "batch_sink" + } +} diff --git a/crates/polars-pipe/src/executors/sinks/output/mod.rs b/crates/polars-pipe/src/executors/sinks/output/mod.rs index 602e525fa8534..a2f8cc9b25f2a 100644 --- a/crates/polars-pipe/src/executors/sinks/output/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/output/mod.rs @@ -1,3 +1,4 @@ +mod batch_sink; #[cfg(feature = "csv")] mod csv; #[cfg(any( @@ -14,6 +15,7 @@ mod json; #[cfg(feature = "parquet")] mod parquet; +pub use batch_sink::*; #[cfg(feature = "csv")] pub use csv::*; #[cfg(feature = "ipc")] diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index a897df0c24784..8139eb98e213b 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -35,6 +35,7 @@ pub struct ParquetSource { processed_rows: usize, iter: Range, paths: Arc>, + total_files_read: usize, options: ParquetOptions, file_options: FileScanOptions, #[allow(dead_code)] @@ -46,6 +47,7 @@ pub struct ParquetSource { run_async: bool, prefetch_size: usize, predicate: Option>, + rows_left_to_read: usize, } impl ParquetSource { @@ -175,19 +177,19 @@ impl ParquetSource { } #[cfg(feature = "async")] - async fn init_reader_async(&self, index: usize) -> PolarsResult { - let metadata = self.metadata.clone(); + async fn init_reader_async(&self, index: usize, n_rows: usize) -> PolarsResult { + let metadata: Option> = self.metadata.clone(); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; - assert_eq!(file_options.slice, None); - let batched_reader = { let uri = path.to_string_lossy(); ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? + .read_parallel(options.parallel) + .with_slice(Some((0, n_rows))) .with_row_index(file_options.row_index) .with_projection(projection) .check_schema( @@ -214,6 +216,30 @@ impl ParquetSource { Ok(batched_reader) } + #[cfg(feature = "async")] + async fn num_rows_per_reader(&self, index: usize) -> PolarsResult { + let predicate = self.predicate.clone(); + let metadata = self.metadata.clone(); + let cloud_options = self.cloud_options.clone(); + let (path, options, _file_options, projection, _chunk_size, hive_partitions) = + self.prepare_init_reader(index)?; + + let mut reader = { + let uri = path.to_string_lossy(); + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) + .await? + .with_projection(projection) + .with_predicate(predicate.clone()) + .use_statistics(options.use_statistics) + .with_hive_partition_columns(hive_partitions) + }; + if predicate.is_some() { + reader.num_rows_with_predicate().await + } else { + reader.num_rows().await + } + } + #[allow(unused_variables)] #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -236,6 +262,7 @@ impl ParquetSource { eprintln!("POLARS PREFETCH_SIZE: {}", prefetch_size) } let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); + let rows_left_to_read = file_options.slice.unwrap_or((0, usize::MAX)).1; let mut source = ParquetSource { batched_readers: VecDeque::new(), @@ -246,6 +273,7 @@ impl ParquetSource { file_options, iter, paths, + total_files_read: 0, cloud_options, metadata, file_info, @@ -254,6 +282,7 @@ impl ParquetSource { run_async, prefetch_size, predicate, + rows_left_to_read, }; // Already start downloading when we deal with cloud urls. if run_async { @@ -269,21 +298,44 @@ impl ParquetSource { // // It is important we do this for a reasonable batch size, that's why we start this when we // have just 2 readers left. - if self.file_options.slice.is_none() - && self.run_async - && (self.batched_readers.len() <= 2 || self.batched_readers.is_empty()) + if self.batched_readers.is_empty() + && self.rows_left_to_read != 0 + && self.total_files_read != self.paths.len() { - #[cfg(not(feature = "async"))] - panic!("activate 'async' feature"); - - #[cfg(feature = "async")] - { + if self.run_async { let range = 0..self.prefetch_size - self.batched_readers.len(); + let range = range .zip(&mut self.iter) .map(|(_, index)| index) .collect::>(); - let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); + + let num_rows_to_read = range + .clone() + .into_iter() + .map(|index| self.num_rows_per_reader(index)); + + let num_rows_to_read = polars_io::pl_async::get_runtime() + .block_on(async { futures::future::try_join_all(num_rows_to_read).await })?; + + let num_rows_to_read = num_rows_to_read + .into_iter() + .zip(range) + .map(|(rows_per_reader, index)| { + self.total_files_read += 1; + if self.rows_left_to_read == 0 { + return (index, 0); + } + self.rows_left_to_read = + self.rows_left_to_read.saturating_sub(rows_per_reader); + (index, rows_per_reader) + }) + .filter(|(_index, rows_per_reader)| *rows_per_reader != 0) + .collect::>(); + + let init_iter = num_rows_to_read + .into_iter() + .map(|(index, num_rows)| self.init_reader_async(index, num_rows)); let batched_readers = polars_io::pl_async::get_runtime().block_on_potential_spawn(async { @@ -293,10 +345,10 @@ impl ParquetSource { for r in batched_readers { self.finish_init_reader(r)?; } - } - } else { - for _ in 0..self.prefetch_size - self.batched_readers.len() { - self.init_next_reader_sync()? + } else { + for _ in 0..self.prefetch_size - self.batched_readers.len() { + self.init_next_reader_sync()? + } } } Ok(()) @@ -308,7 +360,9 @@ impl Source for ParquetSource { self.prefetch_files()?; let Some(mut reader) = self.batched_readers.pop_front() else { - // If there was no new reader, we depleted all of them and are finished. + if self.total_files_read != self.paths.len() && self.rows_left_to_read != 0 { + return self.get_batches(_context); + } return Ok(SourceResult::Finished); }; diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 1f080941b38b4..c5a3d765621bb 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -138,6 +138,18 @@ where fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.p.expression(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } PolarsResult::Ok(Arc::new(Wrap { p }) as Arc) @@ -181,6 +193,9 @@ where SinkType::Memory => { Box::new(OrderedSink::new(input_schema.into_owned())) as Box }, + SinkType::Batch { sender } => { + Box::new(BatchSink::new(sender.sender.clone())?) as Box + }, #[allow(unused_variables)] SinkType::File { path, file_type, .. diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 28b867c8580c6..1858067650c92 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = { workspace = true } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } +crossbeam-channel = { workspace = true } either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } diff --git a/crates/polars-plan/src/plans/ir/dot.rs b/crates/polars-plan/src/plans/ir/dot.rs index 49e9bef1a3dcf..14eca9da04345 100644 --- a/crates/polars-plan/src/plans/ir/dot.rs +++ b/crates/polars-plan/src/plans/ir/dot.rs @@ -317,6 +317,7 @@ impl<'a> IRDotDisplay<'a> { write_label(f, id, |f| { f.write_str(match payload { SinkType::Memory => "SINK (MEMORY)", + SinkType::Batch { .. } => "SINK (BATCH)", SinkType::File { .. } => "SINK (FILE)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (CLOUD)", diff --git a/crates/polars-plan/src/plans/ir/format.rs b/crates/polars-plan/src/plans/ir/format.rs index 45fa8efa117e6..dfd6b55d38c5d 100644 --- a/crates/polars-plan/src/plans/ir/format.rs +++ b/crates/polars-plan/src/plans/ir/format.rs @@ -367,6 +367,7 @@ impl<'a> IRDisplay<'a> { Sink { input, payload, .. } => { let name = match payload { SinkType::Memory => "SINK (memory)", + SinkType::Batch { .. } => "SINK (batch)", SinkType::File { .. } => "SINK (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", diff --git a/crates/polars-plan/src/plans/ir/schema.rs b/crates/polars-plan/src/plans/ir/schema.rs index 5b5042e503779..846a40eb3e2b1 100644 --- a/crates/polars-plan/src/plans/ir/schema.rs +++ b/crates/polars-plan/src/plans/ir/schema.rs @@ -38,6 +38,7 @@ impl IR { ExtContext { .. } => "ext_context", Sink { payload, .. } => match payload { SinkType::Memory => "sink (memory)", + SinkType::Batch { .. } => "sink (batch)", SinkType::File { .. } => "sink (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "sink (cloud)", diff --git a/crates/polars-plan/src/plans/ir/tree_format.rs b/crates/polars-plan/src/plans/ir/tree_format.rs index 86dc04da82a2f..398ba9ded7ad5 100644 --- a/crates/polars-plan/src/plans/ir/tree_format.rs +++ b/crates/polars-plan/src/plans/ir/tree_format.rs @@ -358,6 +358,7 @@ impl<'a> TreeFmtNode<'a> { match payload { SinkType::Memory => "SINK (memory)", SinkType::File { .. } => "SINK (file)", + SinkType::Batch { .. } => "SINK (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", }, diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 0cff24124ff12..95602d8ec74d5 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -1,8 +1,10 @@ +use std::hash::{Hash, Hasher}; #[cfg(feature = "json")] use std::num::NonZeroUsize; use std::path::PathBuf; use bitflags::bitflags; +use crossbeam_channel::{bounded, Sender}; use polars_core::prelude::*; use polars_core::utils::SuperTypeOptions; #[cfg(feature = "csv")] @@ -285,10 +287,41 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[derive(Clone, Debug)] +pub struct BatchSender { + pub id: u32, + pub sender: Sender, +} + +impl Default for BatchSender { + fn default() -> Self { + let (sender, _receiver) = bounded(1); + Self { id: 0, sender } + } +} + +impl PartialEq for BatchSender { + fn eq(&self, other: &Self) -> bool { + self.sender.same_channel(&other.sender) + } +} + +impl Eq for BatchSender {} + +impl Hash for BatchSender { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SinkType { Memory, + Batch { + #[serde(skip)] + sender: BatchSender, + }, File { path: Arc, file_type: FileType, diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 2f9c3436c4be7..d4f56ef8c6176 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1510,3 +1510,19 @@ def test_delta_strings_encoding_roundtrip( f.seek(0) assert_frame_equal(pl.read_parquet(f), df) + + +def test_skip_full_load_of_rgs_using_predicate( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capfd: Any +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + df = pl.DataFrame( + {"a": pl.arange(0, 10, eager=True), "b": pl.arange(0, 10, eager=True)} + ) + root = tmp_path / "test_rg_skip.parquet" + df.write_parquet(root, use_pyarrow=True, row_group_size=2) + + q = pl.scan_parquet(root, parallel="row_groups") + assert q.filter(pl.col("a").gt(6)).collect().shape == (3, 2) + assert "reduced the number of row groups in pruning by 3" in capfd.readouterr().err