diff --git a/crates/polars-expr/src/groups/mod.rs b/crates/polars-expr/src/groups/mod.rs index 37103006de7a..43091244c661 100644 --- a/crates/polars-expr/src/groups/mod.rs +++ b/crates/polars-expr/src/groups/mod.rs @@ -25,8 +25,8 @@ pub trait Grouper: Any + Send { /// Partitions this Grouper into the given number of partitions. /// - /// Updates partition_idxs and group_idxs such that the ith group of self - /// has group index group_idxs[i] in partition partition_idxs[i]. + /// Updates partition_idxs such that the ith group of self moves to partition + /// partition_idxs[i]. /// /// It is guaranteed that two equal keys in two independent partition_into /// calls map to the same partition index if the seed and the number of @@ -36,7 +36,6 @@ pub trait Grouper: Any + Send { seed: u64, num_partitions: usize, partition_idxs: &mut Vec, - group_idxs: &mut Vec, ) -> Vec>; /// Returns the keys in this Grouper in group order, that is the key for diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 6745d8952b47..52b1c3aaa348 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -196,7 +196,6 @@ impl Grouper for RowEncodedHashGrouper { seed: u64, num_partitions: usize, partition_idxs: &mut Vec, - group_idxs: &mut Vec, ) -> Vec> { assert!(num_partitions > 0); @@ -226,21 +225,16 @@ impl Grouper for RowEncodedHashGrouper { unsafe { partition_idxs.clear(); - group_idxs.clear(); partition_idxs.reserve(self.table.len()); - group_idxs.reserve(self.table.len()); let partition_idxs_out = partition_idxs.spare_capacity_mut(); - let group_idxs_out = group_idxs.spare_capacity_mut(); for group in self.table.iter() { let ph = folded_multiply(group.key_hash, seed | 1); let p_idx = hash_to_partition(ph, num_partitions); let p = partitions.get_unchecked_mut(p_idx); - let group_idx = p.insert_key_unique(group.key_hash, group.key(&self.key_data)); + p.insert_key_unique(group.key_hash, group.key(&self.key_data)); *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(p_idx as IdxSize); - *group_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(group_idx); } partition_idxs.set_len(self.table.len()); - group_idxs.set_len(self.table.len()); } partitions.into_iter().map(|p| Box::new(p) as _).collect() diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index 99ae57323a75..89bd4ca3a5aa 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -67,9 +67,8 @@ impl GroupedReduction for LenReduce { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition_vec(self.groups, partition_sizes, partition_idxs, group_idxs) + partition_vec(self.groups, partition_sizes, partition_idxs) .into_iter() .map(|groups| Box::new(Self { groups }) as _) .collect() diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index 38c148c16206..00b85e99f059 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -349,24 +349,21 @@ impl GroupedReduction for BoolMinGroupedReduction { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { let p_values = partition_mask( &self.values.freeze(), partition_sizes, partition_idxs, - group_idxs, ); let p_mask = partition_mask( &self.mask.freeze(), partition_sizes, partition_idxs, - group_idxs, ); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) .collect() } @@ -480,24 +477,21 @@ impl GroupedReduction for BoolMaxGroupedReduction { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { let p_values = partition_mask( &self.values.freeze(), partition_sizes, partition_idxs, - group_idxs, ); let p_mask = partition_mask( &self.mask.freeze(), partition_sizes, partition_idxs, - group_idxs, ); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) .collect() } diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index a5199277cd83..bd1e55e8bb00 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -64,7 +64,6 @@ pub trait GroupedReduction: Any + Send { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec>; /// Returns the finalized value per group as a Series. @@ -267,9 +266,8 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec(self.values, partition_sizes, partition_idxs, group_idxs) + partition::partition_vec(self.values, partition_sizes, partition_idxs) .into_iter() .map(|values| { Box::new(Self { @@ -393,14 +391,13 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs, group_idxs) + partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs) .into_iter() .map(|(values, mask)| { Box::new(Self { values, - mask, + mask: mask.into_mut(), in_dtype: self.in_dtype.clone(), reducer: self.reducer.clone(), }) as _ diff --git a/crates/polars-expr/src/reduce/partition.rs b/crates/polars-expr/src/reduce/partition.rs index 70c7d4fea2a6..33e94e579ab5 100644 --- a/crates/polars-expr/src/reduce/partition.rs +++ b/crates/polars-expr/src/reduce/partition.rs @@ -1,5 +1,6 @@ -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::bitmap::{Bitmap, BitmapBuilder}; use polars_utils::itertools::Itertools; +use polars_utils::vec::PushUnchecked; use polars_utils::IdxSize; /// Partitions this Vec into multiple Vecs. @@ -13,9 +14,7 @@ pub unsafe fn partition_vec( v: Vec, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], ) -> Vec> { - assert!(idx_in_partition.len() == v.len()); assert!(partition_idxs.len() == v.len()); let mut partitions = partition_sizes @@ -27,11 +26,9 @@ pub unsafe fn partition_vec( // Scatter into each partition. for (i, val) in v.into_iter().enumerate() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; debug_assert!(p_idx < partitions.len()); let p = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < p.capacity()); - p.as_mut_ptr().add(idx_in_p).write(val); + p.push_unchecked(val); } for (p, sz) in partitions.iter_mut().zip(partition_sizes) { @@ -48,25 +45,20 @@ pub unsafe fn partition_mask( m: &Bitmap, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], -) -> Vec { - assert!(idx_in_partition.len() == m.len()); +) -> Vec { assert!(partition_idxs.len() == m.len()); let mut partitions = partition_sizes .iter() - .map(|sz| MutableBitmap::from_len_zeroed(*sz as usize)) + .map(|sz| BitmapBuilder::with_capacity(*sz as usize)) .collect_vec(); unsafe { // Scatter into each partition. for i in 0..m.len() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; - debug_assert!(p_idx < partitions.len()); let p = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < p.capacity()); - p.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + p.push_unchecked(m.get_bit_unchecked(i)); } } @@ -81,28 +73,22 @@ pub unsafe fn partition_vec_mask( m: &Bitmap, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], -) -> Vec<(Vec, MutableBitmap)> { - assert!(idx_in_partition.len() == v.len()); +) -> Vec<(Vec, BitmapBuilder)> { assert!(partition_idxs.len() == v.len()); assert!(m.len() == v.len()); let mut partitions = partition_sizes .iter() - .map(|sz| (Vec::::with_capacity(*sz as usize), MutableBitmap::from_len_zeroed(*sz as usize))) + .map(|sz| (Vec::::with_capacity(*sz as usize), BitmapBuilder::with_capacity(*sz as usize))) .collect_vec(); unsafe { // Scatter into each partition. for (i, val) in v.into_iter().enumerate() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; - debug_assert!(p_idx < partitions.len()); let (pv, pm) = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < pv.capacity()); - debug_assert!(idx_in_p < pm.len()); - pv.as_mut_ptr().add(idx_in_p).write(val); - pm.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + pv.push_unchecked(val); + pm.push_unchecked(m.get_bit_unchecked(i)); } for (p, sz) in partitions.iter_mut().zip(partition_sizes) { diff --git a/crates/polars-expr/src/reduce/sum.rs b/crates/polars-expr/src/reduce/sum.rs index 0e36b038c331..466d5ffb9f9d 100644 --- a/crates/polars-expr/src/reduce/sum.rs +++ b/crates/polars-expr/src/reduce/sum.rs @@ -130,9 +130,8 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec(self.sums, partition_sizes, partition_idxs, group_idxs) + partition::partition_vec(self.sums, partition_sizes, partition_idxs) .into_iter() .map(|sums| { Box::new(Self { diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index eb03ef45ef85..e9ca1fd3eb21 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -119,20 +119,18 @@ impl GroupBySinkState { .into_par_iter() .with_max_len(1) .map(|local| { - let mut group_idxs = Vec::new(); let mut partition_idxs = Vec::new(); let p_groupers = local.grouper.partition( seed, num_partitions, &mut partition_idxs, - &mut group_idxs, ); let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec(); let grouped_reductions_p = local .grouped_reductions .into_iter() .map(|r| unsafe { - r.partition(&partition_sizes, &partition_idxs, &group_idxs) + r.partition(&partition_sizes, &partition_idxs) }) .collect_vec(); (p_groupers, grouped_reductions_p)