Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Prune row groups before loading all columns #13746

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ strength_reduce = "0.2"
strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-stream = "0.1.15"
tokio-util = "0.7.8"
unicode-reverse = "1.0.8"
url = "2.4"
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<PlSmallStr> {
let mut arena: Arena<AExpr> = Arena::new();
let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s)
}
}
columns
}
}

pub fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ where
);

ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
let df = chunk_df_for_writing(df, 512 * 512)?;
let df = chunk_df_for_writing(df, 10)?;
let iter = df.iter_chunks(self.compat_level, true);

for batch in iter {
Expand Down
33 changes: 2 additions & 31 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;

use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
Expand All @@ -26,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Debug, Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -272,37 +272,8 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

// TODO!
// Optimize this. Now we partition the predicate columns twice. (later on reading as well)
// I think we must add metadata context where we can cache and amortize the partitioning.
let mut part_md = PartitionedColumnChunkMD::new(rg);
let live = pred.live_variables();
part_md.set_partitions(
live.as_ref()
.map(|vars| vars.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>())
.as_ref(),
);
let should_be_read =
matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(i, Default::default());
}
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();

should_be_read.then(|| (i, rg.clone()))
})
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

Expand Down
49 changes: 31 additions & 18 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
// The metadata belonging to this column
field_md: &[&ColumnChunkMetaData],
Expand Down Expand Up @@ -144,6 +144,7 @@ fn rg_to_dfs(
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &Vec<RowGroupMetaData>,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand Down Expand Up @@ -177,6 +178,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
file_metadata,
row_groups,
schema,
live_variables,
predicate,
Expand All @@ -196,7 +198,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand All @@ -211,7 +213,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand Down Expand Up @@ -247,6 +249,7 @@ fn rg_to_dfs_prefiltered(
row_group_start: usize,
row_group_end: usize,
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
live_variables: Vec<PlSmallStr>,
predicate: &dyn PhysicalIoExpr,
Expand Down Expand Up @@ -529,7 +532,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -540,13 +543,11 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.sum();
let mut n_rows_processed: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
let slice_end = slice.0 + slice.1;

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
let md = &row_groups[rg_idx];

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
Expand Down Expand Up @@ -636,7 +637,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -645,16 +646,16 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
let mut row_groups_iter = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.map(|i| row_groups[i].num_rows())
.sum();
let slice_end = slice.0 + slice.1;

for i in row_group_start..row_group_end {
let row_count_start = *previous_row_count;
let rg_md = &file_metadata.row_groups[i];
let rg_md = &row_groups[i];
let rg_slice =
split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end);
*previous_row_count = previous_row_count
Expand All @@ -665,15 +666,15 @@ fn rg_to_dfs_par_over_rg(
continue;
}

row_groups.push((i, rg_md, rg_slice, row_count_start));
row_groups_iter.push((i, rg_md, rg_slice, row_count_start));
}

let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
let part_mds = {
let projected_columns = projected_columns_set(schema, projection);
row_groups
row_groups_iter
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = PartitionedColumnChunkMD::new(rg);
Expand All @@ -683,7 +684,7 @@ fn rg_to_dfs_par_over_rg(
.collect::<Vec<_>>()
};

row_groups
row_groups_iter
.into_par_iter()
.enumerate()
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
Expand Down Expand Up @@ -747,6 +748,7 @@ pub fn read_parquet<R: MmapBytesReader>(
projection: Option<&[usize]>,
reader_schema: &ArrowSchemaRef,
metadata: Option<FileMetaDataRef>,
row_groups: Option<Vec<RowGroupMetaData>>,
predicate: Option<&dyn PhysicalIoExpr>,
mut parallel: ParallelStrategy,
row_index: Option<RowIndex>,
Expand All @@ -766,6 +768,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
let row_groups = row_groups.unwrap_or_else(|| file_metadata.row_groups.clone());
let n_row_groups = file_metadata.row_groups.len();

// if there are multiple row groups and categorical data
Expand Down Expand Up @@ -820,6 +823,7 @@ pub fn read_parquet<R: MmapBytesReader>(
n_row_groups,
slice,
&file_metadata,
&row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -887,7 +891,10 @@ impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
}

impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
pub async fn fetch_row_groups(
&mut self,
_row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -947,6 +954,7 @@ pub struct BatchedParquetReader {
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetaDataRef,
row_groups: Vec<RowGroupMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -967,6 +975,7 @@ impl BatchedParquetReader {
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetaDataRef,
row_groups: Vec<RowGroupMetaData>,
schema: ArrowSchemaRef,
slice: (usize, usize),
projection: Option<Vec<usize>>,
Expand All @@ -978,7 +987,7 @@ impl BatchedParquetReader {
include_file_path: Option<(PlSmallStr, Arc<str>)>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_groups.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
Expand All @@ -1004,6 +1013,7 @@ impl BatchedParquetReader {
projection,
schema,
metadata,
row_groups,
row_index,
rows_read: 0,
predicate,
Expand Down Expand Up @@ -1054,7 +1064,7 @@ impl BatchedParquetReader {
self.row_group_offset,
self.row_group_offset + n,
self.slice,
&self.metadata.row_groups,
&self.row_groups,
);

let store = self
Expand All @@ -1070,6 +1080,7 @@ impl BatchedParquetReader {
row_group_range.end,
self.slice,
&self.metadata,
&self.row_groups,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand All @@ -1093,6 +1104,7 @@ impl BatchedParquetReader {
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let row_groups = self.row_groups.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
Expand All @@ -1107,6 +1119,7 @@ impl BatchedParquetReader {
row_group_range.end,
slice,
&metadata,
&row_groups,
&schema,
predicate.as_deref(),
row_index,
Expand Down
Loading
Loading