Skip to content

Commit

Permalink
feat: Conserve Parquet SortingColumns for ints (#19251)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Oct 16, 2024
1 parent 21dc469 commit 3720494
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 25 deletions.
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ or set 'streaming'",

pub use options::{ParallelStrategy, ParquetOptions};
use polars_error::{ErrString, PolarsError};
pub use read_impl::{create_sorting_map, try_set_sorted_flag};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
pub use reader::{BatchedParquetReader, ParquetReader};
Expand Down
103 changes: 88 additions & 15 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowSchemaRef;
use polars_core::chunked_array::builder::NullChunkedBuilder;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_core::{config, POOL};
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{
Expand Down Expand Up @@ -60,6 +61,57 @@ fn assert_dtypes(dtype: &ArrowDataType) {
}
}

fn should_copy_sortedness(dtype: &DataType) -> bool {
// @NOTE: For now, we are a bit conservative with this.
use DataType as D;

matches!(
dtype,
D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
)
}

pub fn try_set_sorted_flag(
series: &mut Series,
col_idx: usize,
sorting_map: &PlHashMap<usize, IsSorted>,
) {
if let Some(is_sorted) = sorting_map.get(&col_idx) {
if should_copy_sortedness(series.dtype()) {
if config::verbose() {
eprintln!(
"Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
series.name()
);
}

series.set_sorted_flag(*is_sorted);
}
}
}

pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
let capacity = md.sorting_columns().map_or(0, |s| s.len());
let mut sorting_map = PlHashMap::with_capacity(capacity);

if let Some(sorting_columns) = md.sorting_columns() {
for sorting in sorting_columns {
let prev_value = sorting_map.insert(
sorting.column_idx as usize,
if sorting.descending {
IsSorted::Descending
} else {
IsSorted::Ascending
},
);

debug_assert!(prev_value.is_none());
}
}

sorting_map
}

fn column_idx_to_series(
column_i: usize,
// The metadata belonging to this column
Expand Down Expand Up @@ -327,6 +379,8 @@ fn rg_to_dfs_prefiltered(
}
}

let sorting_map = create_sorting_map(md);

// Collect the data for the live columns
let live_columns = (0..num_live_columns)
.into_par_iter()
Expand All @@ -345,8 +399,12 @@ fn rg_to_dfs_prefiltered(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(col_idx, part.as_slice(), None, schema, store)
.map(Column::from)
let mut series =
column_idx_to_series(col_idx, part.as_slice(), None, schema, store)?;

try_set_sorted_flag(&mut series, col_idx, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -452,7 +510,7 @@ fn rg_to_dfs_prefiltered(
array.filter(&mask_arr)
};

let array = if mask_setting.should_prefilter(
let mut series = if mask_setting.should_prefilter(
prefilter_cost,
&schema.get_at_index(col_idx).unwrap().1.dtype,
) {
Expand All @@ -461,9 +519,11 @@ fn rg_to_dfs_prefiltered(
post()?
};

debug_assert_eq!(array.len(), filter_mask.set_bits());
debug_assert_eq!(series.len(), filter_mask.set_bits());

try_set_sorted_flag(&mut series, col_idx, &sorting_map);

Ok(array.into_column())
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;

Expand Down Expand Up @@ -576,6 +636,8 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let sorting_map = create_sorting_map(md);

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
Expand All @@ -593,14 +655,17 @@ fn rg_to_dfs_optionally_par_over_columns(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()
})?
Expand All @@ -620,14 +685,17 @@ fn rg_to_dfs_optionally_par_over_columns(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?
};
Expand Down Expand Up @@ -712,6 +780,8 @@ fn rg_to_dfs_par_over_rg(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let sorting_map = create_sorting_map(md);

let columns = projection
.iter()
.map(|column_i| {
Expand All @@ -727,14 +797,17 @@ fn rg_to_dfs_par_over_rg(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down
15 changes: 14 additions & 1 deletion crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use hashbrown::hash_map::RawEntryMut;
use parquet_format_safe::RowGroup;
use parquet_format_safe::{RowGroup, SortingColumn};
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::idx_vec::UnitVec;
use polars_utils::pl_str::PlSmallStr;
Expand Down Expand Up @@ -41,6 +41,7 @@ pub struct RowGroupMetadata {
num_rows: usize,
total_byte_size: usize,
full_byte_range: core::ops::Range<u64>,
sorting_columns: Option<Vec<SortingColumn>>,
}

impl RowGroupMetadata {
Expand All @@ -59,6 +60,11 @@ impl RowGroupMetadata {
.map(|x| x.iter().map(|&x| &self.columns[x]))
}

/// Fetch all columns under this root name if it exists.
pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> {
self.column_lookup.get(root_name).map(|x| x.as_slice())
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> usize {
self.num_rows
Expand All @@ -85,6 +91,10 @@ impl RowGroupMetadata {
self.columns.iter().map(|x| x.byte_range())
}

pub fn sorting_columns(&self) -> Option<&[SortingColumn]> {
self.sorting_columns.as_deref()
}

/// Method to convert from Thrift.
pub(crate) fn try_from_thrift(
schema_descr: &SchemaDescriptor,
Expand All @@ -106,6 +116,8 @@ impl RowGroupMetadata {
0..0
};

let sorting_columns = rg.sorting_columns.clone();

let columns = rg
.columns
.into_iter()
Expand All @@ -131,6 +143,7 @@ impl RowGroupMetadata {
num_rows,
total_byte_size,
full_byte_range,
sorting_columns,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::future::Future;
use std::sync::Arc;

use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap};
use polars_core::series::IsSorted;
use polars_core::utils::operation_exceeded_idxsize_msg;
use polars_error::{polars_err, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::read_this_row_group;
use polars_io::prelude::{create_sorting_map, FileMetadata};
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_parquet::read::RowGroupMetadata;
Expand All @@ -27,6 +28,7 @@ pub(super) struct RowGroupData {
pub(super) slice: Option<(usize, usize)>,
pub(super) file_max_row_group_height: usize,
pub(super) row_group_metadata: RowGroupMetadata,
pub(super) sorting_map: PlHashMap<usize, IsSorted>,
pub(super) shared_file_state: Arc<tokio::sync::OnceCell<SharedFileState>>,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl RowGroupDataFetcher {
let current_row_group_idx = self.current_row_group_idx;

let num_rows = row_group_metadata.num_rows();
let sorting_map = create_sorting_map(&row_group_metadata);

self.current_row_offset = current_row_offset.saturating_add(num_rows);
self.current_row_group_idx += 1;
Expand Down Expand Up @@ -246,6 +249,7 @@ impl RowGroupDataFetcher {
slice,
file_max_row_group_height: current_max_row_group_height,
row_group_metadata,
sorting_map,
shared_file_state: current_shared_file_state.clone(),
})
});
Expand Down
35 changes: 27 additions & 8 deletions crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::_internal::calc_prefilter_cost;
pub use polars_io::prelude::_internal::PrefilterMaskSetting;
use polars_io::prelude::try_set_sorted_flag;
use polars_io::RowIndex;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::ScanSources;
Expand Down Expand Up @@ -367,11 +368,20 @@ fn decode_column(

assert_eq!(array.len(), expected_num_rows);

let series = Series::try_from((arrow_field, array))?;
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

// TODO: Also load in the metadata.

Ok(series.into())
Ok(series.into_column())
}

/// # Safety
Expand Down Expand Up @@ -652,17 +662,26 @@ fn decode_column_prefiltered(
deserialize_filter,
)?;

let column = Series::try_from((arrow_field, array))?.into_column();
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

let column = if !prefilter {
column.filter(mask)?
let series = if !prefilter {
series.filter(mask)?
} else {
column
series
};

assert_eq!(column.len(), expected_num_rows);
assert_eq!(series.len(), expected_num_rows);

Ok(column)
Ok(series.into_column())
}

mod tests {
Expand Down
Loading

0 comments on commit 3720494

Please sign in to comment.