Skip to content

Commit

Permalink
docs: improve the documentation for Aggregate code (apache#12617)
Browse files Browse the repository at this point in the history
* docs: improve the documentation for Aggregate code

* Add new example, fix referneces
  • Loading branch information
alamb committed Sep 30, 2024
1 parent f1aa27f commit ba4488f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
13 changes: 10 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use datafusion_physical_expr::binary_map::OutputType;

use hashbrown::raw::RawTable;

/// Compare GroupValue Rows column by column
/// A [`GroupValues`] that stores multiple columns of group values.
///
///
pub struct GroupValuesColumn {
/// The output schema
schema: SchemaRef,
Expand All @@ -55,8 +57,13 @@ pub struct GroupValuesColumn {
map_size: usize,

/// The actual group by values, stored column-wise. Compare from
/// the left to right, each column is stored as `ArrayRowEq`.
/// This is shown faster than the row format
/// the left to right, each column is stored as [`GroupColumn`].
///
/// Performance tests showed that this design is faster than using the
/// more general purpose [`GroupValuesRows`]. See the ticket for details:
/// <https://github.com/apache/datafusion/pull/12269>
///
/// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
group_values: Vec<Box<dyn GroupColumn>>,

/// reused buffer to store hashes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ use std::vec;

use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};

/// Trait for group values column-wise row comparison
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
///
/// Implementations of this trait store a in-progress collection of group values
/// Implementations of this trait store an in-progress collection of group values
/// (similar to various builders in Arrow-rs) that allow for quick comparison to
/// incoming rows.
///
/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn
pub trait GroupColumn: Send + Sync {
/// Returns equal if the row stored in this builder at `lhs_row` is equal to
/// the row in `array` at `rhs_row`
Expand All @@ -60,11 +61,13 @@ pub trait GroupColumn: Send + Sync {
fn take_n(&mut self, n: usize) -> ArrayRef;
}

/// An implementation of [`GroupColumn`] for primitive types.
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
group_values: Vec<T::Native>,
nulls: Vec<bool>,
// whether the array contains at least one null, for fast non-null path
/// whether the array contains at least one null, for fast non-null path
has_null: bool,
/// Can the input array contain nulls?
nullable: bool,
}

Expand Down Expand Up @@ -154,13 +157,14 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
}
}

/// An implementation of [`GroupColumn`] for binary and utf8 types.
pub struct ByteGroupValueBuilder<O>
where
O: OffsetSizeTrait,
{
output_type: OutputType,
buffer: BufferBuilder<u8>,
/// Offsets into `buffer` for each distinct value. These offsets as used
/// Offsets into `buffer` for each distinct value. These offsets as used
/// directly to create the final `GenericBinaryArray`. The `i`th string is
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
/// are stored as a zero length string.
Expand Down
54 changes: 50 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`GroupValues`] trait for storing and interning group keys

use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
Expand All @@ -37,18 +39,61 @@ use datafusion_physical_expr::binary_map::OutputType;

mod group_column;

/// An interning store for group keys
/// Stores the group values during hash aggregation.
///
/// # Background
///
/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
/// identify each group, and correspond to all the distinct values of `(a,b)`.
///
/// ```sql
/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
/// create table t(a int, b varchar)
/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
///
/// select a, b, count(*) from t group by a, b;
/// ----
/// 1 a 2
/// 2 b 1
/// 3 c 1
/// ```
///
/// # Design
///
/// Managing group values is a performance critical operation in hash
/// aggregation. The major operations are:
///
/// 1. Intern: Quickly finding existing and adding new group values
/// 2. Emit: Returning the group values as an array
///
/// There are multiple specialized implementations of this trait optimized for
/// different data types and number of columns, optimized for these operations.
/// See [`new_group_values`] for details.
///
/// # Group Ids
///
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
/// When the function returns, `groups` must contain the group id for each
/// row in `cols`.
///
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;

/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;

/// The number of values stored in this [`GroupValues`]
/// The number of values (distinct group values) stored in this [`GroupValues`]
fn len(&self) -> usize;

/// Emits the group values
Expand All @@ -58,6 +103,7 @@ pub trait GroupValues: Send {
fn clear_shrink(&mut self, batch: &RecordBatch);
}

/// Return a specialized implementation of [`GroupValues`] for the given schema.
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
Expand Down
10 changes: 9 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ use hashbrown::raw::RawTable;
use std::sync::Arc;

/// A [`GroupValues`] making use of [`Rows`]
///
/// This is a general implementation of [`GroupValues`] that works for any
/// combination of data types and number of columns, including nested types such as
/// structs and lists.
///
/// It uses the arrow-rs [`Rows`] to store the group values, which is a row-wise
/// representation.
pub struct GroupValuesRows {
/// The output schema
schema: SchemaRef,
Expand Down Expand Up @@ -220,7 +227,8 @@ impl GroupValues for GroupValuesRows {
}
};

// TODO: Materialize dictionaries in group keys (#7647)
// TODO: Materialize dictionaries in group keys
// https://github.com/apache/datafusion/issues/7647
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
*array = dictionary_encode_if_necessary(
Expand Down

0 comments on commit ba4488f

Please sign in to comment.