Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed Aug 18, 2024
1 parent 1dc2533 commit 1200460
Show file tree
Hide file tree
Showing 24 changed files with 396 additions and 44 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ strength_reduce = "0.2"
strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-stream = "0.1.15"
tokio-util = "0.7.8"
unicode-reverse = "1.0.8"
url = "2.4"
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<String> {
let mut arena: Arena<AExpr> = Arena::new();
let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s.as_ref().to_string())
}
}
columns
}
}

pub fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
20 changes: 1 addition & 19 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,26 +271,8 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(i, Default::default());
}
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();

should_be_read.then(|| (i, rg.clone()))
})
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

Expand Down
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
filter: Option<Filter>,
Expand Down Expand Up @@ -760,7 +760,10 @@ impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
}

impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
pub async fn fetch_row_groups(
&mut self,
_row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
Expand Down
133 changes: 128 additions & 5 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::ArrowSchemaRef;
use polars_core::config::verbose;
use polars_core::prelude::*;
#[cfg(feature = "cloud")]
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
Expand All @@ -11,15 +12,20 @@ use polars_parquet::read;
use super::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use super::async_impl::ParquetObjectStore;
use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
pub use super::read_impl::BatchedParquetReader;
use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader};
use super::read_impl::{
column_idx_to_series, compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader,
};
#[cfg(feature = "cloud")]
use super::utils::materialize_empty_df;
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::hive::materialize_hive_partitions;
use crate::mmap::MmapBytesReader;
use crate::parquet::metadata::FileMetaDataRef;
use crate::predicates::PhysicalIoExpr;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::prelude::*;
use crate::RowIndex;

Expand Down Expand Up @@ -269,7 +275,7 @@ impl ParquetAsyncReader {
pub async fn from_uri(
uri: &str,
cloud_options: Option<&CloudOptions>,
metadata: Option<FileMetaDataRef>,
metadata: Option<FileMetaDataRef>
) -> PolarsResult<ParquetAsyncReader> {
Ok(ParquetAsyncReader {
reader: ParquetObjectStore::from_uri(uri, cloud_options, metadata).await?,
Expand Down Expand Up @@ -328,6 +334,27 @@ impl ParquetAsyncReader {
self
}

pub async fn num_rows_with_predicate(mut self) -> PolarsResult<usize> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;

let row_sizes = prune_row_groups(
self.reader,
schema.clone(),
metadata,
usize::MAX,
self.predicate.clone(),
self.projection.as_deref(),
self.hive_partition_columns.as_deref(),
)
.await?
.iter()
.map(|(row_size, _md)| *row_size)
.collect::<Vec<_>>();

Ok(row_sizes.iter().sum())
}

pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
Expand Down Expand Up @@ -379,6 +406,13 @@ impl ParquetAsyncReader {
Some(schema) => schema,
None => self.schema().await?,
};
let mut row_groups = metadata.row_groups.clone();

if self.slice.1 != 0 {
self.slice = (0, usize::MAX);
} else {
row_groups = vec![];
}
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
Expand All @@ -389,9 +423,9 @@ impl ParquetAsyncReader {
0,
metadata.row_groups.len(),
self.slice,
&metadata.row_groups,
&row_groups,
),
&metadata.row_groups,
&row_groups,
)?
.into();
BatchedParquetReader::new(
Expand Down Expand Up @@ -447,3 +481,92 @@ impl ParquetAsyncReader {
Ok(df)
}
}

#[cfg(feature = "cloud")]
async fn prune_row_groups(
reader: ParquetObjectStore,
schema: Arc<ArrowSchema>,
metadata: Arc<FileMetaData>,
limit: usize,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
projection: Option<&[usize]>,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<(usize, read::RowGroupMetaData)>> {
use super::read_impl::RowGroupFetcher;

let predicate_columns = predicate.clone().unwrap().columns();
let predicate_projection = materialize_projection(
Some(&predicate_columns),
&Schema::from(schema.clone()),
hive_partition_columns,
false,
);

let mut predicate_row_group_fetcher: RowGroupFetcher = FetchRowGroupsFromObjectStore::new(
reader,
schema.clone(),
projection,
predicate.clone(),
compute_row_group_range(
0,
metadata.row_groups.len(),
(0, usize::MAX),
&metadata.row_groups,
),
&metadata.row_groups,
)?
.into();

let predicate_store: ColumnStore = predicate_row_group_fetcher
.fetch_row_groups(0..metadata.row_groups.len())
.await?;

let mut remaining_rows = limit;

let row_groups = metadata.row_groups.clone();
let final_row_groups = row_groups
.iter()
.map(|md| {
if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap()
|| remaining_rows == 0
{
return (0, md);
}

let mut columns: Vec<Series> = vec![];

for column_i in predicate_projection.as_ref().unwrap() {
let column =
column_idx_to_series(*column_i, md, None, &schema.clone(), &predicate_store)
.unwrap();
columns.push(column)
}

let mut df = unsafe { DataFrame::new_no_checks(columns) };
let reader_schema = schema.as_ref();

materialize_hive_partitions(
&mut df,
reader_schema,
hive_partition_columns,
md.num_rows(),
);
apply_predicate(&mut df, predicate.as_deref(), false).unwrap();

let row_count = df.height();

remaining_rows = remaining_rows.saturating_sub(row_count);

(row_count, md)
})
.filter(|(row_count, _md)| *row_count != 0)
.map(|(row_count, md)| (row_count, md.clone()))
.collect::<Vec<_>>();
if verbose() {
eprintln!(
"reduced the number of row groups in pruning by {}",
row_groups.len() - final_row_groups.len()
)
}
Ok(final_row_groups)
}
2 changes: 2 additions & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub trait PhysicalIoExpr: Send + Sync {
fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
None
}

fn columns(&self) -> Vec<String>;
}

pub trait StatsEvaluator {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ polars-utils = { workspace = true }

ahash = { workspace = true }
bitflags = { workspace = true }
crossbeam-channel = { workspace = true }
glob = { version = "0.3" }
memchr = { workspace = true }
once_cell = { workspace = true }
pyo3 = { workspace = true, optional = true }
Expand Down
26 changes: 26 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::path::Path;
use std::sync::{Arc, Mutex};

pub use anonymous_scan::*;
use crossbeam_channel::{bounded, Receiver};
#[cfg(feature = "csv")]
pub use csv::*;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -31,6 +32,7 @@ pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
Expand Down Expand Up @@ -875,6 +877,30 @@ impl LazyFrame {
)
}

pub fn sink_to_batches(mut self) -> Result<Receiver<DataFrame>, PolarsError> {
self.opt_state.set(OptState::STREAMING, true);
let morsels_per_sink = POOL.current_num_threads();
let backpressure = morsels_per_sink * 4;
let (sender, receiver) = bounded(backpressure);
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Batch {
sender: BatchSender { id: 0, sender },
},
};

let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: format!("cannot run the whole query in a streaming order")
);
POOL.spawn(move || {
let _ = physical_plan.execute(&mut state).unwrap();
});

Ok(receiver)
}

#[cfg(any(
feature = "ipc",
feature = "parquet",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ impl PhysicalIoExpr for Wrap {
fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
self.0.as_stats_evaluator()
}

fn columns(&self) -> Vec<String> {
let mut arena: Arena<AExpr> = Arena::new();
let _ = to_aexpr(self.0.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s.as_ref().to_string())
}
}
columns
}
}
impl PhysicalPipedExpr for Wrap {
fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult<Series> {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ impl ParquetExec {
.await?;

let num_rows = reader.num_rows().await?;

PolarsResult::Ok((num_rows, reader))
});
let readers_and_metadata = futures::future::try_join_all(iter).await?;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ fn create_physical_plan_impl(
SinkType::Cloud { .. } => {
polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.")
},
SinkType::Batch { .. } => {
polars_bail!(InvalidOperation: "batch sink not supported in the standard engine")
},
},
Union { inputs, options } => {
let inputs = inputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl PhysicalIoExpr for Len {
fn live_variables(&self) -> Option<Vec<Arc<str>>> {
Some(vec![])
}

fn columns(&self) -> Vec<String> {
unimplemented!()
}
}
impl PhysicalPipedExpr for Len {
fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult<Series> {
Expand Down
Loading

0 comments on commit 1200460

Please sign in to comment.