Skip to content

Commit

Permalink
refactor(rust): Add streaming groupby for reductions (#19291)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Oct 23, 2024
1 parent ba35d07 commit 4cfe56f
Show file tree
Hide file tree
Showing 43 changed files with 1,088 additions and 378 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-arrow/src/compute/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Contains different aggregation functions
/// ! Contains different aggregation functions
#[cfg(feature = "compute_aggregate")]
mod sum;
#[cfg(feature = "compute_aggregate")]
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-core/src/chunked_array/logical/categorical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ impl CategoricalChunked {
}
}

/// Create a [`CategoricalChunked`] from a physical array and dtype.
///
/// # Safety
/// It's not checked that the indices are in-bounds or that the dtype is
/// correct.
pub unsafe fn from_cats_and_dtype_unchecked(idx: UInt32Chunked, dtype: DataType) -> Self {
debug_assert!(matches!(
dtype,
DataType::Enum { .. } | DataType::Categorical { .. }
));
let mut logical = Logical::<UInt32Type, _>::new_logical::<CategoricalType>(idx);
logical.2 = Some(dtype);
Self {
physical: logical,
bit_settings: Default::default(),
}
}

/// Create a [`CategoricalChunked`] from an array of `idx` and an existing [`RevMapping`]: `rev_map`.
///
/// # Safety
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) mod nulls;
mod reverse;
#[cfg(feature = "rolling_window")]
pub(crate) mod rolling_window;
pub mod row_encode;
pub mod search_sorted;
mod set;
mod shift;
Expand Down
220 changes: 220 additions & 0 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use arrow::compute::utils::combine_validities_and_many;
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub(crate) fn convert_series_for_row_encoding(s: &Series) -> PolarsResult<Series> {
use DataType::*;
let out = match s.dtype() {
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) | Enum(_, _) => s.rechunk(),
Binary | Boolean => s.clone(),
BinaryOffset => s.clone(),
String => s.str().unwrap().as_binary().into_series(),
#[cfg(feature = "dtype-struct")]
Struct(_) => {
let ca = s.struct_().unwrap();
let new_fields = ca
.fields_as_series()
.iter()
.map(convert_series_for_row_encoding)
.collect::<PolarsResult<Vec<_>>>()?;
let mut out =
StructChunked::from_series(ca.name().clone(), ca.len(), new_fields.iter())?;
out.zip_outer_validity(ca);
out.into_series()
},
// we could fallback to default branch, but decimal is not numeric dtype for now, so explicit here
#[cfg(feature = "dtype-decimal")]
Decimal(_, _) => s.clone(),
List(inner) if !inner.is_nested() => s.clone(),
Null => s.clone(),
_ => {
let phys = s.to_physical_repr().into_owned();
polars_ensure!(
phys.dtype().is_numeric(),
InvalidOperation: "cannot sort column of dtype `{}`", s.dtype()
);
phys
},
};
Ok(out)
}

pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
let by = convert_series_for_row_encoding(by)?;
let by = by.rechunk();

let out = match by.dtype() {
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_, _) | DataType::Enum(_, _) => {
let ca = by.categorical().unwrap();
if ca.uses_lexical_ordering() {
by.to_arrow(0, CompatLevel::newest())
} else {
ca.physical().chunks[0].clone()
}
},
// Take physical
_ => by.chunks()[0].clone(),
};
Ok(out)
}

pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}

// Almost the same but broadcast nulls to the row-encoded array.
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;

let validities = sliced
.iter()
.flat_map(|s| {
let s = s.rechunk();
#[allow(clippy::unnecessary_to_owned)]
s.chunks()
.to_vec()
.into_iter()
.map(|arr| arr.validity().cloned())
})
.collect::<Vec<_>>();

let validity = combine_validities_and_many(&validities);
Ok(rows.into_array().with_validity_typed(validity))
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}

pub fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk(
PlSmallStr::EMPTY,
rows.into_array(),
))
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());
for by in by {
let arr = _get_rows_encoded_compat_array(by)?;
let field = EncodingField::new_unsorted();
match arr.dtype() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(field)
}
},
_ => {
cols.push(arr);
fields.push(field)
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<RowsEncoded> {
debug_assert_eq!(by.len(), descending.len());
debug_assert_eq!(by.len(), nulls_last.len());

let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());

for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
let by = by.as_materialized_series();
let arr = _get_rows_encoded_compat_array(by)?;
let sort_field = EncodingField {
descending: *desc,
nulls_last: *null_last,
no_order: false,
};
match arr.dtype() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
let arr = arr.propagate_nulls();
for value_arr in arr.values() {
cols.push(value_arr.clone() as ArrayRef);
fields.push(sort_field);
}
},
_ => {
cols.push(arr);
fields.push(sort_field);
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded_ca(
name: PlSmallStr,
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded(by, descending, nulls_last)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub fn _get_rows_encoded_arr(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryArray<i64>> {
_get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
}

pub fn _get_rows_encoded_ca_unordered(
name: PlSmallStr,
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use polars_utils::itertools::Itertools;

use super::*;
use crate::chunked_array::ops::row_encode::_get_rows_encoded;

#[derive(Eq)]
struct CompareRow<'a> {
Expand Down
Loading

0 comments on commit 4cfe56f

Please sign in to comment.