Skip to content

Commit

Permalink
perf: Improve var/cov/corr performance (#19381)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Oct 22, 2024
1 parent 1d210ab commit 94a58ff
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 190 deletions.
9 changes: 9 additions & 0 deletions crates/polars-arrow/src/array/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,12 @@ impl<'a, A: ArrayAccessor<'a> + ?Sized> Iterator for NonNullValuesIter<'a, A> {
}

unsafe impl<'a, A: ArrayAccessor<'a> + ?Sized> TrustedLen for NonNullValuesIter<'a, A> {}

impl<'a, A: ?Sized> Clone for NonNullValuesIter<'a, A> {
fn clone(&self) -> Self {
Self {
accessor: self.accessor,
idxs: self.idxs.clone(),
}
}
}
1 change: 1 addition & 0 deletions crates/polars-arrow/src/bitmap/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fn calc_iters_remaining(length: usize, min_length_for_iter: usize, consume: usiz
1 + obvious_iters // Thus always exactly 1 more iter.
}

#[derive(Clone)]
pub struct TrueIdxIter<'a> {
mask: BitMask<'a>,
first_unknown: usize,
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-compute/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![cfg_attr(feature = "simd", feature(portable_simd))]
#![cfg_attr(feature = "simd", feature(core_intrinsics))] // For fadd_algebraic.
#![cfg_attr(feature = "simd", allow(internal_features))]
#![cfg_attr(feature = "simd", feature(avx512_target_feature))]
#![cfg_attr(
all(feature = "simd", target_arch = "x86_64"),
Expand All @@ -21,6 +23,7 @@ pub mod if_then_else;
pub mod min_max;
pub mod size;
pub mod unique;
pub mod var_cov;

// Trait to enable the scalar blanket implementation.
pub trait NotSimdPrimitive: NativeType {}
Expand Down
319 changes: 319 additions & 0 deletions crates/polars-compute/src/var_cov.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Some formulae:
// mean_x = sum(weight[i] * x[i]) / sum(weight)
// dp_xy = weighted sum of deviation products of variables x, y, written in
// the paper as simply XY.
// dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y))
//
// cov(x, y) = dp_xy / sum(weight)
// var(x) = cov(x, x)
//
// Algorithms from:
// Numerically stable parallel computation of (co-)variance.
// Schubert, E., & Gertz, M. (2018).
//
// Key equations from the paper:
// (17) for mean update, (23) for dp update (and also Table 1).

use arrow::array::{Array, PrimitiveArray};
use arrow::types::NativeType;
use num_traits::AsPrimitive;

const CHUNK_SIZE: usize = 128;

#[inline(always)]
fn alg_add(a: f64, b: f64) -> f64 {
#[cfg(feature = "simd")]
{
std::intrinsics::fadd_algebraic(a, b)
}
#[cfg(not(feature = "simd"))]
{
a + b
}
}

fn alg_sum(it: impl IntoIterator<Item = f64>) -> f64 {
it.into_iter().fold(0.0, alg_add)
}

#[derive(Default)]
pub struct VarState {
weight: f64,
mean: f64,
dp: f64,
}

#[derive(Default)]
pub struct CovState {
weight: f64,
mean_x: f64,
mean_y: f64,
dp_xy: f64,
}

#[derive(Default)]
pub struct PearsonState {
weight: f64,
mean_x: f64,
mean_y: f64,
dp_xx: f64,
dp_xy: f64,
dp_yy: f64,
}

impl VarState {
fn new(x: &[f64]) -> Self {
if x.is_empty() {
return Self::default();
}

let weight = x.len() as f64;
let mean = alg_sum(x.iter().copied()) / weight;
Self {
weight,
mean,
dp: alg_sum(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
}
}

pub fn combine(&mut self, other: &Self) {
if other.weight == 0.0 {
return;
}

let new_weight = self.weight + other.weight;
let inv_weight = 1.0 / new_weight;
let other_weight_frac = other.weight * inv_weight;
let delta_mean = self.mean - other.mean;
let new_mean = self.mean - delta_mean * other_weight_frac;
self.dp += other.dp + other.weight * (new_mean - other.mean) * delta_mean;
self.weight = new_weight;
self.mean = new_mean;
}

pub fn finalize(&mut self, ddof: u8) -> Option<f64> {
if self.weight <= ddof as f64 {
None
} else {
Some(self.dp / (self.weight - ddof as f64))
}
}
}

impl CovState {
fn new(x: &[f64], y: &[f64]) -> Self {
assert!(x.len() == y.len());
if x.is_empty() {
return Self::default();
}

let weight = x.len() as f64;
let inv_weight = 1.0 / weight;
let mean_x = alg_sum(x.iter().copied()) * inv_weight;
let mean_y = alg_sum(y.iter().copied()) * inv_weight;
Self {
weight,
mean_x,
mean_y,
dp_xy: alg_sum(
x.iter()
.zip(y)
.map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
),
}
}

pub fn combine(&mut self, other: &Self) {
if other.weight == 0.0 {
return;
}

let new_weight = self.weight + other.weight;
let inv_weight = 1.0 / new_weight;
let other_weight_frac = other.weight * inv_weight;
let delta_mean_x = self.mean_x - other.mean_x;
let delta_mean_y = self.mean_y - other.mean_y;
let new_mean_x = self.mean_x - delta_mean_x * other_weight_frac;
let new_mean_y = self.mean_y - delta_mean_y * other_weight_frac;
self.dp_xy += other.dp_xy + other.weight * (new_mean_x - other.mean_x) * delta_mean_y;
self.weight = new_weight;
self.mean_x = new_mean_x;
self.mean_y = new_mean_y;
}

pub fn finalize(&mut self, ddof: u8) -> Option<f64> {
if self.weight <= ddof as f64 {
None
} else {
Some(self.dp_xy / (self.weight - ddof as f64))
}
}
}

impl PearsonState {
fn new(x: &[f64], y: &[f64]) -> Self {
assert!(x.len() == y.len());
if x.is_empty() {
return Self::default();
}

let weight = x.len() as f64;
let inv_weight = 1.0 / weight;
let mean_x = alg_sum(x.iter().copied()) * inv_weight;
let mean_y = alg_sum(y.iter().copied()) * inv_weight;
let mut dp_xx = 0.0;
let mut dp_xy = 0.0;
let mut dp_yy = 0.0;
for (xi, yi) in x.iter().zip(y.iter()) {
dp_xx = alg_add(dp_xx, (xi - mean_x) * (xi - mean_x));
dp_xy = alg_add(dp_xy, (xi - mean_x) * (yi - mean_y));
dp_yy = alg_add(dp_yy, (yi - mean_y) * (yi - mean_y));
}
Self {
weight,
mean_x,
mean_y,
dp_xx,
dp_xy,
dp_yy,
}
}

pub fn combine(&mut self, other: &Self) {
if other.weight == 0.0 {
return;
}

let new_weight = self.weight + other.weight;
let inv_weight = 1.0 / new_weight;
let other_weight_frac = other.weight * inv_weight;
let delta_mean_x = self.mean_x - other.mean_x;
let delta_mean_y = self.mean_y - other.mean_y;
let new_mean_x = self.mean_x - delta_mean_x * other_weight_frac;
let new_mean_y = self.mean_y - delta_mean_y * other_weight_frac;
self.dp_xx += other.dp_xx + other.weight * (new_mean_x - other.mean_x) * delta_mean_x;
self.dp_xy += other.dp_xy + other.weight * (new_mean_x - other.mean_x) * delta_mean_y;
self.dp_yy += other.dp_yy + other.weight * (new_mean_y - other.mean_y) * delta_mean_y;
self.weight = new_weight;
self.mean_x = new_mean_x;
self.mean_y = new_mean_y;
}

pub fn finalize(&mut self, _ddof: u8) -> f64 {
// The division by sample_weight - ddof on both sides cancels out.
let denom = (self.dp_xx * self.dp_yy).sqrt();
if denom == 0.0 {
f64::NAN
} else {
self.dp_xy / denom
}
}
}

fn chunk_as_float<T, I, F>(it: I, mut f: F)
where
T: NativeType + AsPrimitive<f64>,
I: IntoIterator<Item = T>,
F: FnMut(&[f64]),
{
let mut chunk = [0.0; CHUNK_SIZE];
let mut i = 0;
for val in it {
if i >= CHUNK_SIZE {
f(&chunk);
i = 0;
}
chunk[i] = val.as_();
i += 1;
}
if i > 0 {
f(&chunk[..i]);
}
}

fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
where
T: NativeType + AsPrimitive<f64>,
U: NativeType + AsPrimitive<f64>,
I: IntoIterator<Item = (T, U)>,
F: FnMut(&[f64], &[f64]),
{
let mut left_chunk = [0.0; CHUNK_SIZE];
let mut right_chunk = [0.0; CHUNK_SIZE];
let mut i = 0;
for (l, r) in it {
if i >= CHUNK_SIZE {
f(&left_chunk, &right_chunk);
i = 0;
}
left_chunk[i] = l.as_();
right_chunk[i] = r.as_();
i += 1;
}
if i > 0 {
f(&left_chunk[..i], &right_chunk[..i]);
}
}

pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
where
T: NativeType + AsPrimitive<f64>,
{
let mut out = VarState::default();
if arr.has_nulls() {
chunk_as_float(arr.non_null_values_iter(), |chunk| {
out.combine(&VarState::new(chunk))
});
} else {
chunk_as_float(arr.values().iter().copied(), |chunk| {
out.combine(&VarState::new(chunk))
});
}
out
}

pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
where
T: NativeType + AsPrimitive<f64>,
U: NativeType + AsPrimitive<f64>,
{
assert!(x.len() == y.len());
let mut out = CovState::default();
if x.has_nulls() || y.has_nulls() {
chunk_as_float_binary(
x.iter()
.zip(y.iter())
.filter_map(|(l, r)| l.copied().zip(r.copied())),
|l, r| out.combine(&CovState::new(l, r)),
);
} else {
chunk_as_float_binary(
x.values().iter().copied().zip(y.values().iter().copied()),
|l, r| out.combine(&CovState::new(l, r)),
);
}
out
}

pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
where
T: NativeType + AsPrimitive<f64>,
U: NativeType + AsPrimitive<f64>,
{
assert!(x.len() == y.len());
let mut out = PearsonState::default();
if x.has_nulls() || y.has_nulls() {
chunk_as_float_binary(
x.iter()
.zip(y.iter())
.filter_map(|(l, r)| l.copied().zip(r.copied())),
|l, r| out.combine(&PearsonState::new(l, r)),
);
} else {
chunk_as_float_binary(
x.values().iter().copied().zip(y.values().iter().copied()),
|l, r| out.combine(&PearsonState::new(l, r)),
);
}
out
}
19 changes: 5 additions & 14 deletions crates/polars-core/src/chunked_array/ops/aggregate/var.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arity::unary_elementwise_values;
use polars_compute::var_cov::VarState;

use super::*;

Expand All @@ -15,20 +15,11 @@ where
ChunkedArray<T>: ChunkAgg<T::Native>,
{
fn var(&self, ddof: u8) -> Option<f64> {
let n_values = self.len() - self.null_count();
if n_values <= ddof as usize {
return None;
let mut out = VarState::default();
for arr in self.downcast_iter() {
out.combine(&polars_compute::var_cov::var(arr))
}

let mean = self.mean()?;
let squared: Float64Chunked = unary_elementwise_values(self, |value| {
let tmp = value.to_f64().unwrap() - mean;
tmp * tmp
});

squared
.sum()
.map(|sum| sum / (n_values as f64 - ddof as f64))
out.finalize(ddof)
}

fn std(&self, ddof: u8) -> Option<f64> {
Expand Down
Loading

0 comments on commit 94a58ff

Please sign in to comment.