Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 committed Jan 21, 2024
1 parent 79ae3b3 commit 75e7a1a
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 37 deletions.
3 changes: 1 addition & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub struct FetchRowGroupsFromObjectStore {

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
Expand Down Expand Up @@ -260,7 +260,6 @@ impl FetchRowGroupsFromObjectStore {
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

if verbose() {
Expand Down
118 changes: 110 additions & 8 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ use std::io::{Read, Seek};
use std::sync::Arc;

use arrow::datatypes::ArrowSchemaRef;
use polars_core::config::verbose;
use polars_core::prelude::*;
#[cfg(feature = "cloud")]
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
#[cfg(feature = "cloud")]
use polars_core::POOL;
use polars_parquet::read;
use polars_parquet::write::FileMetaData;
#[cfg(feature = "cloud")]
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader};
use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::{
column_idx_to_series, materialize_hive_partitions, read_parquet, FetchRowGroupsFromMmapReader,
};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand All @@ -19,7 +28,7 @@ use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
pub use crate::parquet::read_impl::BatchedParquetReader;
use crate::predicates::PhysicalIoExpr;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::prelude::*;
use crate::RowIndex;

Expand Down Expand Up @@ -153,7 +162,7 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
metadata.row_groups.clone(),
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
Expand Down Expand Up @@ -309,21 +318,40 @@ impl ParquetAsyncReader {
Some(schema) => schema,
None => self.schema().await?,
};
let limit = self.n_rows.unwrap_or(usize::MAX);

let reader = Arc::new(self.reader);

let mut final_row_groups = metadata.row_groups.clone();
if let Some(_predicates) = self.predicate.clone() {
if self.row_index.is_none() {
final_row_groups = prune_row_groups(
reader.clone(),
schema.clone(),
metadata.clone(),
limit,
self.predicate.clone(),
self.hive_partition_columns.as_deref(),
)
.await?;
}
}

// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
reader.clone(),
schema.clone(),
self.projection.as_deref(),
self.predicate.clone(),
&metadata.row_groups,
self.n_rows.unwrap_or(usize::MAX),
&final_row_groups,
limit,
)?
.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
final_row_groups,
schema,
self.n_rows.unwrap_or(usize::MAX),
limit,
self.projection,
self.predicate.clone(),
self.row_index,
Expand Down Expand Up @@ -370,3 +398,77 @@ impl ParquetAsyncReader {
Ok(df)
}
}

#[cfg(feature = "cloud")]
async fn prune_row_groups(
reader: Arc<ParquetObjectStore>,
schema: Arc<ArrowSchema>,
metadata: Arc<FileMetaData>,
limit: usize,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<read::RowGroupMetaData>> {
let predicate_columns = predicate.clone().unwrap().columns();
let predicate_projection = materialize_projection(
Some(&predicate_columns),
&Schema::from(schema.clone()),
hive_partition_columns,
false,
);

let mut predicate_row_group_fetcher = FetchRowGroupsFromObjectStore::new(
reader.clone(),
schema.clone(),
predicate_projection.clone().as_deref(),
predicate.clone(),
&metadata.row_groups,
limit,
)?;

let predicate_store: ColumnStore<'_> = predicate_row_group_fetcher
.fetch_row_groups(0..metadata.row_groups.len())
.await?;

let row_groups = metadata.row_groups.clone();
let final_row_groups = POOL.install(|| {
row_groups
.par_iter()
.filter(|&md| {
if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap() {
return false;
}
let chunk_size = md.num_rows();

let mut columns: Vec<Series> = vec![];

for column_i in predicate_projection.as_ref().unwrap() {
let column = column_idx_to_series(
*column_i,
md,
limit,
&schema.clone(),
&predicate_store,
chunk_size,
)
.unwrap();
columns.push(column)
}

let mut df = DataFrame::new_no_checks(columns);

materialize_hive_partitions(&mut df, hive_partition_columns, md.num_rows());
apply_predicate(&mut df, predicate.as_deref(), false).unwrap();

df.height() != 0
})
.map(|md| md.clone())
.collect::<Vec<_>>()
});
if verbose() {
eprintln!(
"reduced the number of row groups in pruning by {}",
row_groups.len() - final_row_groups.len()
)
}
Ok(final_row_groups)
}
48 changes: 22 additions & 26 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use polars_parquet::read::{ArrayIter, RowGroupMetaData};
use rayon::prelude::*;

use super::materialize_empty_df;
Expand Down Expand Up @@ -51,7 +51,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
remaining_rows: usize,
Expand Down Expand Up @@ -131,7 +131,7 @@ fn rg_to_dfs(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -147,7 +147,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -163,7 +163,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -182,7 +182,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -193,13 +193,14 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
for md in row_group_metadata
.iter()
.take(row_group_end)
.skip(row_group_start)
{
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
if use_statistics && !read_this_row_group(predicate, md, schema)? {
*previous_row_count += current_row_count;
continue;
}
Expand Down Expand Up @@ -271,7 +272,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -280,8 +281,7 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let row_groups = file_metadata
.row_groups
let row_groups = row_group_metadata
.iter()
.enumerate()
.skip(row_group_start)
Expand All @@ -303,11 +303,7 @@ fn rg_to_dfs_par_over_rg(
.map(|(rg_idx, md, projection_height, row_count_start)| {
if projection_height == 0
|| use_statistics
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
)?
&& !read_this_row_group(predicate, &row_group_metadata[rg_idx], schema)?
{
return Ok(None);
}
Expand Down Expand Up @@ -419,7 +415,7 @@ pub fn read_parquet<R: MmapBytesReader>(
0,
n_row_groups,
&mut limit,
&file_metadata,
&file_metadata.row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -519,7 +515,7 @@ pub struct BatchedParquetReader {
limit: usize,
projection: Vec<usize>,
schema: ArrowSchemaRef,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -538,7 +534,7 @@ impl BatchedParquetReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
schema: ArrowSchemaRef,
limit: usize,
projection: Option<Vec<usize>>,
Expand All @@ -548,7 +544,7 @@ impl BatchedParquetReader {
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_group_metadata.len();
let projection = projection.unwrap_or_else(|| (0usize..schema.len()).collect::<Vec<_>>());

let parallel =
Expand All @@ -563,7 +559,7 @@ impl BatchedParquetReader {
limit,
projection,
schema,
metadata,
row_group_metadata,
row_index,
rows_read: 0,
predicate,
Expand Down Expand Up @@ -608,7 +604,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_start + n,
self.limit,
&self.metadata.row_groups,
&self.row_group_metadata,
);

let store = self
Expand All @@ -622,7 +618,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_end,
&mut self.limit,
&self.metadata,
&self.row_group_metadata,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub trait PhysicalIoExpr: Send + Sync {
fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
None
}

fn columns(&self) -> Vec<String>;
}

pub trait StatsEvaluator {
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<String> {
let mut arena: Arena<AExpr> = Arena::new();
to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s.as_ref().to_string())
}
}
columns
}
}

pub(crate) fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
Loading

0 comments on commit 75e7a1a

Please sign in to comment.