Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed Mar 25, 2024
1 parent 252702a commit 917cfc9
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 49 deletions.
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -266,7 +267,7 @@ pub struct FetchRowGroupsFromObjectStore {

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
Expand Down Expand Up @@ -304,7 +305,6 @@ impl FetchRowGroupsFromObjectStore {
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

if verbose() {
Expand Down
160 changes: 145 additions & 15 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::io::{Read, Seek};

use arrow::datatypes::ArrowSchemaRef;
use polars_core::config::verbose;
use polars_core::prelude::*;
#[cfg(feature = "cloud")]
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_parquet::read;
use polars_parquet::read::{self};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader};
use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::{
column_idx_to_series, materialize_hive_partitions, read_parquet, FetchRowGroupsFromMmapReader,
};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand All @@ -17,7 +22,7 @@ use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
pub use crate::parquet::read_impl::BatchedParquetReader;
use crate::predicates::PhysicalIoExpr;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::prelude::*;
use crate::RowIndex;

Expand Down Expand Up @@ -151,7 +156,7 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
metadata.row_groups.clone(),
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
Expand Down Expand Up @@ -272,6 +277,27 @@ impl ParquetAsyncReader {
self.reader.num_rows().await
}

pub async fn num_rows_with_predicate(&mut self) -> PolarsResult<usize> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
let reader = Arc::new(self.reader.clone());

let row_sizes = prune_row_groups(
reader.clone(),
schema.clone(),
metadata,
usize::MAX,
self.predicate.clone(),
self.hive_partition_columns.as_deref(),
)
.await?
.iter()
.map(|(row_size, _md)| *row_size)
.collect::<Vec<_>>();

Ok(row_sizes.iter().sum())
}

pub fn with_n_rows(mut self, n_rows: Option<usize>) -> Self {
self.n_rows = n_rows;
self
Expand Down Expand Up @@ -316,25 +342,43 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = match self.schema {
Some(schema) => schema,
None => self.schema().await?,
};
let schema = self.schema().await?;
let limit = self.n_rows.unwrap_or(usize::MAX);

let reader = Arc::new(self.reader);

let mut final_row_groups = metadata.row_groups.clone();
if let Some(_predicates) = self.predicate.clone() {
if self.row_index.is_none() {
final_row_groups = prune_row_groups(
reader.clone(),
schema.clone(),
metadata.clone(),
limit,
self.predicate.clone(),
self.hive_partition_columns.as_deref(),
)
.await?
.iter()
.map(|(_row_count, md)| md.clone())
.collect::<Vec<_>>();
}
}
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
reader.clone(),
schema.clone(),
self.projection.as_deref(),
self.predicate.clone(),
&metadata.row_groups,
self.n_rows.unwrap_or(usize::MAX),
&final_row_groups,
usize::MAX,
)?
.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
final_row_groups,
schema,
self.n_rows.unwrap_or(usize::MAX),
usize::MAX,
self.projection,
self.predicate.clone(),
self.row_index,
Expand All @@ -351,15 +395,22 @@ impl ParquetAsyncReader {

pub async fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let metadata = self.get_metadata().await?.clone();
let reader_schema = self.schema().await?;
let row_index = self.row_index.clone();
let hive_partition_columns = self.hive_partition_columns.clone();
let projection = self.projection.clone();

// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let n_batches = metadata.row_groups.len();
let n_batches = reader.num_row_groups();
if n_batches == 0 {
return Ok(materialize_empty_df(
projection.as_deref(),
reader_schema.as_ref(),
hive_partition_columns.as_deref(),
row_index.as_ref(),
));
}
let mut iter = reader.iter(n_batches);

let mut chunks = Vec::with_capacity(n_batches);
Expand All @@ -382,3 +433,82 @@ impl ParquetAsyncReader {
Ok(df)
}
}

#[cfg(feature = "cloud")]
async fn prune_row_groups(
reader: Arc<ParquetObjectStore>,
schema: Arc<ArrowSchema>,
metadata: Arc<FileMetaData>,
limit: usize,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<(usize, read::RowGroupMetaData)>> {
let predicate_columns = predicate.clone().unwrap().columns();
let predicate_projection = materialize_projection(
Some(&predicate_columns),
&Schema::from(schema.clone()),
hive_partition_columns,
false,
);

let mut predicate_row_group_fetcher = FetchRowGroupsFromObjectStore::new(
reader.clone(),
schema.clone(),
predicate_projection.clone().as_deref(),
predicate.clone(),
&metadata.row_groups,
usize::MAX,
)?;

let predicate_store: ColumnStore<'_> = predicate_row_group_fetcher
.fetch_row_groups(0..metadata.row_groups.len())
.await?;

let mut remaining_rows = limit;

let row_groups = metadata.row_groups.clone();
let final_row_groups = row_groups
.iter()
.map(|md| {
if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap() || remaining_rows == 0 {
return (0, md);
}
let chunk_size = md.num_rows();

let mut columns: Vec<Series> = vec![];

for column_i in predicate_projection.as_ref().unwrap() {
let column = column_idx_to_series(
*column_i,
md,
usize::MAX,
&schema.clone(),
&predicate_store,
chunk_size,
)
.unwrap();
columns.push(column)
}

let mut df = unsafe { DataFrame::new_no_checks(columns) };

materialize_hive_partitions(&mut df, hive_partition_columns, md.num_rows());
apply_predicate(&mut df, predicate.as_deref(), false).unwrap();

let row_count = df.height();

remaining_rows = remaining_rows.saturating_sub(row_count);

(row_count, md)
})
.filter(|(row_count, _md)| *row_count != 0)
.map(|(row_count, md)| (row_count, md.clone()))
.collect::<Vec<_>>();
if verbose() {
eprintln!(
"reduced the number of row groups in pruning by {}",
row_groups.len() - final_row_groups.len()
)
}
Ok(final_row_groups)
}
Loading

0 comments on commit 917cfc9

Please sign in to comment.