Skip to content

Commit

Permalink
avoid unnecessary materialization of group indices
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Oct 25, 2024
1 parent 91ad500 commit 76d9d45
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 55 deletions.
5 changes: 2 additions & 3 deletions crates/polars-expr/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub trait Grouper: Any + Send {

/// Partitions this Grouper into the given number of partitions.
///
/// Updates partition_idxs and group_idxs such that the ith group of self
/// has group index group_idxs[i] in partition partition_idxs[i].
/// Updates partition_idxs such that the ith group of self moves to partition
/// partition_idxs[i].
///
/// It is guaranteed that two equal keys in two independent partition_into
/// calls map to the same partition index if the seed and the number of
Expand All @@ -36,7 +36,6 @@ pub trait Grouper: Any + Send {
seed: u64,
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
group_idxs: &mut Vec<IdxSize>,
) -> Vec<Box<dyn Grouper>>;

/// Returns the keys in this Grouper in group order, that is the key for
Expand Down
8 changes: 1 addition & 7 deletions crates/polars-expr/src/groups/row_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ impl Grouper for RowEncodedHashGrouper {
seed: u64,
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
group_idxs: &mut Vec<IdxSize>,
) -> Vec<Box<dyn Grouper>> {
assert!(num_partitions > 0);

Expand Down Expand Up @@ -226,21 +225,16 @@ impl Grouper for RowEncodedHashGrouper {

unsafe {
partition_idxs.clear();
group_idxs.clear();
partition_idxs.reserve(self.table.len());
group_idxs.reserve(self.table.len());
let partition_idxs_out = partition_idxs.spare_capacity_mut();
let group_idxs_out = group_idxs.spare_capacity_mut();
for group in self.table.iter() {
let ph = folded_multiply(group.key_hash, seed | 1);
let p_idx = hash_to_partition(ph, num_partitions);
let p = partitions.get_unchecked_mut(p_idx);
let group_idx = p.insert_key_unique(group.key_hash, group.key(&self.key_data));
p.insert_key_unique(group.key_hash, group.key(&self.key_data));
*partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(p_idx as IdxSize);
*group_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(group_idx);
}
partition_idxs.set_len(self.table.len());
group_idxs.set_len(self.table.len());
}

partitions.into_iter().map(|p| Box::new(p) as _).collect()
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-expr/src/reduce/len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ impl GroupedReduction for LenReduce {
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition_vec(self.groups, partition_sizes, partition_idxs, group_idxs)
partition_vec(self.groups, partition_sizes, partition_idxs)
.into_iter()
.map(|groups| Box::new(Self { groups }) as _)
.collect()
Expand Down
10 changes: 2 additions & 8 deletions crates/polars-expr/src/reduce/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,24 +349,21 @@ impl GroupedReduction for BoolMinGroupedReduction {
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(
&self.values.freeze(),
partition_sizes,
partition_idxs,
group_idxs,
);
let p_mask = partition_mask(
&self.mask.freeze(),
partition_sizes,
partition_idxs,
group_idxs,
);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| Box::new(Self { values, mask }) as _)
.map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _)
.collect()
}

Expand Down Expand Up @@ -480,24 +477,21 @@ impl GroupedReduction for BoolMaxGroupedReduction {
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(
&self.values.freeze(),
partition_sizes,
partition_idxs,
group_idxs,
);
let p_mask = partition_mask(
&self.mask.freeze(),
partition_sizes,
partition_idxs,
group_idxs,
);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| Box::new(Self { values, mask }) as _)
.map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _)
.collect()
}

Expand Down
9 changes: 3 additions & 6 deletions crates/polars-expr/src/reduce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub trait GroupedReduction: Any + Send {
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>>;

/// Returns the finalized value per group as a Series.
Expand Down Expand Up @@ -267,9 +266,8 @@ where
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition::partition_vec(self.values, partition_sizes, partition_idxs, group_idxs)
partition::partition_vec(self.values, partition_sizes, partition_idxs)
.into_iter()
.map(|values| {
Box::new(Self {
Expand Down Expand Up @@ -393,14 +391,13 @@ where
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs, group_idxs)
partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs)
.into_iter()
.map(|(values, mask)| {
Box::new(Self {
values,
mask,
mask: mask.into_mut(),
in_dtype: self.in_dtype.clone(),
reducer: self.reducer.clone(),
}) as _
Expand Down
34 changes: 10 additions & 24 deletions crates/polars-expr/src/reduce/partition.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::bitmap::{Bitmap, BitmapBuilder};
use polars_utils::itertools::Itertools;
use polars_utils::vec::PushUnchecked;
use polars_utils::IdxSize;

/// Partitions this Vec into multiple Vecs.
Expand All @@ -13,9 +14,7 @@ pub unsafe fn partition_vec<T>(
v: Vec<T>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
idx_in_partition: &[IdxSize],
) -> Vec<Vec<T>> {
assert!(idx_in_partition.len() == v.len());
assert!(partition_idxs.len() == v.len());

let mut partitions = partition_sizes
Expand All @@ -27,11 +26,9 @@ pub unsafe fn partition_vec<T>(
// Scatter into each partition.
for (i, val) in v.into_iter().enumerate() {
let p_idx = *partition_idxs.get_unchecked(i) as usize;
let idx_in_p = *idx_in_partition.get_unchecked(i) as usize;
debug_assert!(p_idx < partitions.len());
let p = partitions.get_unchecked_mut(p_idx);
debug_assert!(idx_in_p < p.capacity());
p.as_mut_ptr().add(idx_in_p).write(val);
p.push_unchecked(val);
}

for (p, sz) in partitions.iter_mut().zip(partition_sizes) {
Expand All @@ -48,25 +45,20 @@ pub unsafe fn partition_mask(
m: &Bitmap,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
idx_in_partition: &[IdxSize],
) -> Vec<MutableBitmap> {
assert!(idx_in_partition.len() == m.len());
) -> Vec<BitmapBuilder> {
assert!(partition_idxs.len() == m.len());

let mut partitions = partition_sizes
.iter()
.map(|sz| MutableBitmap::from_len_zeroed(*sz as usize))
.map(|sz| BitmapBuilder::with_capacity(*sz as usize))
.collect_vec();

unsafe {
// Scatter into each partition.
for i in 0..m.len() {
let p_idx = *partition_idxs.get_unchecked(i) as usize;
let idx_in_p = *idx_in_partition.get_unchecked(i) as usize;
debug_assert!(p_idx < partitions.len());
let p = partitions.get_unchecked_mut(p_idx);
debug_assert!(idx_in_p < p.capacity());
p.set_unchecked(idx_in_p, m.get_bit_unchecked(i));
p.push_unchecked(m.get_bit_unchecked(i));
}
}

Expand All @@ -81,28 +73,22 @@ pub unsafe fn partition_vec_mask<T>(
m: &Bitmap,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
idx_in_partition: &[IdxSize],
) -> Vec<(Vec<T>, MutableBitmap)> {
assert!(idx_in_partition.len() == v.len());
) -> Vec<(Vec<T>, BitmapBuilder)> {
assert!(partition_idxs.len() == v.len());
assert!(m.len() == v.len());

let mut partitions = partition_sizes
.iter()
.map(|sz| (Vec::<T>::with_capacity(*sz as usize), MutableBitmap::from_len_zeroed(*sz as usize)))
.map(|sz| (Vec::<T>::with_capacity(*sz as usize), BitmapBuilder::with_capacity(*sz as usize)))
.collect_vec();

unsafe {
// Scatter into each partition.
for (i, val) in v.into_iter().enumerate() {
let p_idx = *partition_idxs.get_unchecked(i) as usize;
let idx_in_p = *idx_in_partition.get_unchecked(i) as usize;
debug_assert!(p_idx < partitions.len());
let (pv, pm) = partitions.get_unchecked_mut(p_idx);
debug_assert!(idx_in_p < pv.capacity());
debug_assert!(idx_in_p < pm.len());
pv.as_mut_ptr().add(idx_in_p).write(val);
pm.set_unchecked(idx_in_p, m.get_bit_unchecked(i));
pv.push_unchecked(val);
pm.push_unchecked(m.get_bit_unchecked(i));
}

for (p, sz) in partitions.iter_mut().zip(partition_sizes) {
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-expr/src/reduce/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ where
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
group_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition::partition_vec(self.sums, partition_sizes, partition_idxs, group_idxs)
partition::partition_vec(self.sums, partition_sizes, partition_idxs)
.into_iter()
.map(|sums| {
Box::new(Self {
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-stream/src/nodes/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,18 @@ impl GroupBySinkState {
.into_par_iter()
.with_max_len(1)
.map(|local| {
let mut group_idxs = Vec::new();
let mut partition_idxs = Vec::new();
let p_groupers = local.grouper.partition(
seed,
num_partitions,
&mut partition_idxs,
&mut group_idxs,
);
let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec();
let grouped_reductions_p = local
.grouped_reductions
.into_iter()
.map(|r| unsafe {
r.partition(&partition_sizes, &partition_idxs, &group_idxs)
r.partition(&partition_sizes, &partition_idxs)
})
.collect_vec();
(p_groupers, grouped_reductions_p)
Expand Down

0 comments on commit 76d9d45

Please sign in to comment.