From e0cc8c8218c32de2d77e00148c6b31500e71ca13 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 13 Jul 2023 10:24:40 -0400 Subject: [PATCH] Vectorized hash grouping (#6904) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Vectorized hash grouping * Prepare for merge to main * Improve comments and update size calculations * Implement test for accumulate_boolean refactor * Use resize instead of resize_with * fix avg size calculation * Simplify sum accumulator * Add comments explaining i64 as counts * Clarify `aggreate_arguments` * Apply suggestions from code review Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Clarify rationale for ScratchSpace being a field * use slice syntax * Update datafusion/physical-expr/src/aggregate/average.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Update datafusion/physical-expr/src/aggregate/count.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * fix diagram * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * simplify the supported logic * Add a log message when using slow adapter * fmt * Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.0 (#6848)" (#6896) This reverts commit d0def42c828246fed0ac7aa2ccb8877eac248a3d. * Make FileScanConfig::project pub (#6931) Co-authored-by: Daniël Heres * feat: add round trip test of physical plan in tpch unit tests (#6918) * Use thiserror to implement the From trait for DFSqlLogicTestError (#6924) * parallel csv scan (#6801) * parallel csv scan * add max line length * Update according to review comments * Update Configuration doc --------- Co-authored-by: Andrew Lamb * Add additional test coverage for aggregaes using dates/times/timestamps/decimals (#6939) * Add additional test coverage for aggregaes using dates/times/timestamps/decimals * Add coverage for date32/date64 * Support timestamp types for min/max * Fix aggregate nullability calculation --------- Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Daniël Heres Co-authored-by: Daniël Heres Co-authored-by: r.4ntix Co-authored-by: Jonah Gao Co-authored-by: Yongting You <2010youy01@gmail.com> --- Cargo.toml | 2 +- datafusion-cli/Cargo.lock | 1 + .../core/src/physical_plan/aggregates/mod.rs | 2 +- .../src/physical_plan/aggregates/row_hash.rs | 901 +++++++----------- .../user_defined_window_functions.rs | 7 +- datafusion/execution/src/memory_pool/proxy.rs | 8 + datafusion/physical-expr/Cargo.toml | 1 + .../physical-expr/src/aggregate/average.rs | 242 ++++- .../src/aggregate/bit_and_or_xor.rs | 167 +++- .../src/aggregate/bool_and_or.rs | 42 +- .../physical-expr/src/aggregate/count.rs | 130 ++- .../groups_accumulator/accumulate.rs | 854 +++++++++++++++++ .../aggregate/groups_accumulator/adapter.rs | 355 +++++++ .../aggregate/groups_accumulator/bool_op.rs | 127 +++ .../src/aggregate/groups_accumulator/mod.rs | 122 +++ .../aggregate/groups_accumulator/prim_op.rs | 131 +++ .../physical-expr/src/aggregate/min_max.rs | 412 +++++++- datafusion/physical-expr/src/aggregate/mod.rs | 21 + datafusion/physical-expr/src/aggregate/sum.rs | 80 +- .../physical-expr/src/aggregate/utils.rs | 24 + datafusion/physical-expr/src/lib.rs | 2 + 21 files changed, 3056 insertions(+), 575 deletions(-) create mode 100644 datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs create mode 100644 datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs create mode 100644 datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs create mode 100644 datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs create mode 100644 datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs diff --git a/Cargo.toml b/Cargo.toml index cc13c8077100..2db0379b0657 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,4 +56,4 @@ lto = false opt-level = 3 overflow-checks = false panic = 'unwind' -rpath = false +rpath = false \ No newline at end of file diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index af50b5c22a24..da31a8ce65d0 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1197,6 +1197,7 @@ dependencies = [ "itertools 0.11.0", "lazy_static", "libc", + "log", "md-5", "paste", "petgraph", diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 60d483d8c800..5edb0476778d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -998,7 +998,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { Arc::new(Schema::new(group_fields)) } -/// returns physical expressions to evaluate against a batch +/// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: /// * Partial: AggregateExpr::expressions /// * Final: columns of `AggregateExpr::state_fields()` diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index beb70f1b4c55..e272b60b054a 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -16,100 +16,189 @@ // under the License. //! Hash aggregation through row format +//! +//! POC demonstration of GroupByHashApproach -use std::cmp::min; -use std::ops::Range; +use datafusion_physical_expr::{ + AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter, +}; +use log::debug; use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; use ahash::RandomState; -use arrow::row::{RowConverter, SortField}; +use arrow::row::{RowConverter, Rows, SortField}; use datafusion_physical_expr::hash_utils::create_hashes; use futures::ready; use futures::stream::{Stream, StreamExt}; -use crate::physical_plan::aggregates::utils::{ - aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters, - read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState, -}; use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode, - PhysicalGroupBy, RowAccumulatorItem, + PhysicalGroupBy, }; use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; -use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{aggregates, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; -use arrow::compute::cast; -use arrow::datatypes::DataType; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use datafusion_common::cast::as_boolean_array; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; -use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; -use datafusion_row::layout::RowLayout; use hashbrown::raw::RawTable; -use itertools::izip; + +#[derive(Debug, Clone)] +/// This object tracks the aggregation phase (input/output) +pub(crate) enum ExecutionState { + ReadingInput, + /// When producing output, the remaining rows to output are stored + /// here and are sliced off as needed in batch_size chunks + ProducingOutput(RecordBatch), + Done, +} use super::AggregateExec; -/// Grouping aggregate with row-format aggregation states inside. +/// Hash based Grouping Aggregator +/// +/// # Design Goals +/// +/// This structure is designed so that updating the aggregates can be +/// vectorized (done in a tight loop) without allocations. The +/// accumulator state is *not* managed by this operator (e.g in the +/// hash table) and instead is delegated to the individual +/// accumulators which have type specialized inner loops that perform +/// the aggregation. +/// +/// # Architecture +/// +/// ```text +/// +/// stores "group stores group values, internally stores aggregate +/// indexes" in arrow_row format values, for all groups /// -/// For each aggregation entry, we use: -/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes. -/// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often. +/// ┌─────────────┐ ┌────────────┐ ┌──────────────┐ ┌──────────────┐ +/// │ ┌─────┐ │ │ ┌────────┐ │ │┌────────────┐│ │┌────────────┐│ +/// │ │ 5 │ │ ┌────┼▶│ "A" │ │ ││accumulator ││ ││accumulator ││ +/// │ ├─────┤ │ │ │ ├────────┤ │ ││ 0 ││ ││ N ││ +/// │ │ 9 │ │ │ │ │ "Z" │ │ ││ ┌────────┐ ││ ││ ┌────────┐ ││ +/// │ └─────┘ │ │ │ └────────┘ │ ││ │ state │ ││ ││ │ state │ ││ +/// │ ... │ │ │ │ ││ │┌─────┐ │ ││ ... ││ │┌─────┐ │ ││ +/// │ ┌─────┐ │ │ │ ... │ ││ │├─────┤ │ ││ ││ │├─────┤ │ ││ +/// │ │ 1 │───┼─┘ │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// │ ├─────┤ │ │ │ ││ │ │ ││ ││ │ │ ││ +/// │ │ 13 │───┼─┐ │ ┌────────┐ │ ││ │ ... │ ││ ││ │ ... │ ││ +/// │ └─────┘ │ └────┼▶│ "Q" │ │ ││ │ │ ││ ││ │ │ ││ +/// └─────────────┘ │ └────────┘ │ ││ │┌─────┐ │ ││ ││ │┌─────┐ │ ││ +/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// └────────────┘ ││ └────────┘ ││ ││ └────────┘ ││ +/// │└────────────┘│ │└────────────┘│ +/// └──────────────┘ └──────────────┘ /// -/// The architecture is the following: +/// map group_values accumulators +/// (Hash Table) /// -/// 1. For each input RecordBatch, update aggregation states corresponding to all appeared grouping keys. -/// 2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row -/// 3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch. -/// 4. The state's RecordBatch is `merge`d to a new state -/// 5. The state is mapped to the final value +/// ``` /// -/// [WordAligned]: datafusion_row::layout +/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`, +/// [`group_values`] will store the distinct values of `z`. There will +/// be one accumulator for `COUNT(x)`, specialized for the data type +/// of `x` and one accumulator for `SUM(y)`, specialized for the data +/// type of `y`. +/// +/// # Description +/// +/// The hash table does not store any aggregate state inline. It only +/// stores "group indices", one for each (distinct) group value. The +/// accumulators manage the in-progress aggregate state for each +/// group, and the group values themselves are stored in +/// [`group_values`] at the corresponding group index. +/// +/// The accumulator state (e.g partial sums) is managed by and stored +/// by a [`GroupsAccumulator`] accumulator. There is one accumulator +/// per aggregate expression (COUNT, AVG, etc) in the +/// stream. Internally, each `GroupsAccumulator` manages the state for +/// multiple groups, and is passed `group_indexes` during update. Note +/// The accumulator state is not managed by this operator (e.g in the +/// hash table). +/// +/// [`group_values`]: Self::group_values pub(crate) struct GroupedHashAggregateStream { schema: SchemaRef, input: SendableRecordBatchStream, mode: AggregateMode, - normal_aggr_expr: Vec>, - /// Aggregate expressions not supporting row accumulation - normal_aggregate_expressions: Vec>>, - /// Filter expression for each normal aggregate expression - normal_filter_expressions: Vec>>, - - /// Aggregate expressions supporting row accumulation - row_aggregate_expressions: Vec>>, - /// Filter expression for each row aggregate expression - row_filter_expressions: Vec>>, - row_accumulators: Vec, + /// Accumulators, one for each `AggregateExpr` in the query + /// + /// For example, if the query has aggregates, `SUM(x)`, + /// `COUNT(y)`, there will be two accumulators, each one + /// specialized for that particular aggregate and its input types + accumulators: Vec>, + + /// Arguments to pass to each accumulator. + /// + /// The arguments in `accumulator[i]` is passed `aggregate_arguments[i]` + /// + /// The argument to each accumulator is itself a `Vec` because + /// some aggregates such as `CORR` can accept more than one + /// argument. + aggregate_arguments: Vec>>, + + /// Optional filter expression to evaluate, one for each for + /// accumulator. If present, only those rows for which the filter + /// evaluate to true should be included in the aggregate results. + /// + /// For example, for an aggregate like `SUM(x FILTER x > 100)`, + /// the filter expression is `x > 100`. + filter_expressions: Vec>>, + + /// Converter for the group values row_converter: RowConverter, - row_aggr_schema: SchemaRef, - row_aggr_layout: Arc, + /// GROUP BY expressions group_by: PhysicalGroupBy, - aggr_state: AggregationState, + /// The memory reservation for this grouping + reservation: MemoryReservation, + + /// Logically maps group values to a group_index in + /// [`Self::group_values`] and in each accumulator + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys (group values) in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, group_index) + map: RawTable<(u64, usize)>, + + /// The actual group by values, stored in arrow [`Row`] format. + /// `group_values[i]` holds the group value for group_index `i`. + /// + /// The row format is used to compare group keys quickly and store + /// them efficiently in memory. Quick comparison is especially + /// important for multi-column group keys. + /// + /// [`Row`]: arrow::row::Row + group_values: Rows, + + /// scratch space for the current input [`RecordBatch`] being + /// processed. The reason this is a field is so it can be reused + /// for all input batches, avoiding the need to reallocate Vecs on + /// each input. + scratch_space: ScratchSpace, + + /// Tracks if this stream is generating input or output exec_state: ExecutionState, + + /// Execution metrics baseline_metrics: BaselineMetrics, + + /// Random state for creating hashes random_state: RandomState, - /// size to be used for resulting RecordBatches + + /// max rows in output RecordBatches batch_size: usize, - /// threshold for using `ScalarValue`s to update - /// accumulators during high-cardinality aggregations for each input batch. - scalar_update_factor: usize, - /// if the result is chunked into batches, - /// last offset is preserved for continuation. - row_group_skip_position: usize, - /// keeps range for each accumulator in the field - /// first element in the array corresponds to normal accumulators - /// second element in the array corresponds to row accumulators - indices: [Vec>; 2], } impl GroupedHashAggregateStream { @@ -119,71 +208,39 @@ impl GroupedHashAggregateStream { context: Arc, partition: usize, ) -> Result { + debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); let agg_group_by = agg.group_by.clone(); let agg_filter_expr = agg.filter_expr.clone(); let batch_size = context.session_config().batch_size(); - let scalar_update_factor = context.session_config().agg_scalar_update_factor(); let input = agg.input.execute(partition, Arc::clone(&context))?; let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let timer = baseline_metrics.elapsed_compute().timer(); - let mut start_idx = agg_group_by.expr.len(); - let mut row_aggr_expr = vec![]; - let mut row_agg_indices = vec![]; - let mut row_aggregate_expressions = vec![]; - let mut row_filter_expressions = vec![]; - let mut normal_aggr_expr = vec![]; - let mut normal_agg_indices = vec![]; - let mut normal_aggregate_expressions = vec![]; - let mut normal_filter_expressions = vec![]; - // The expressions to evaluate the batch, one vec of expressions per aggregation. - // Assuming create_schema() always puts group columns in front of aggregation columns, we set - // col_idx_base to the group expression count. - let all_aggregate_expressions = - aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, start_idx)?; + let aggregate_exprs = agg.aggr_expr.clone(); + + // arguments for each aggregate, one vec of expressions per + // aggregate + let aggregate_arguments = aggregates::aggregate_expressions( + &agg.aggr_expr, + &agg.mode, + agg_group_by.expr.len(), + )?; + let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { vec![None; agg.aggr_expr.len()] } }; - for ((expr, others), filter) in agg - .aggr_expr - .iter() - .zip(all_aggregate_expressions.into_iter()) - .zip(filter_expressions.into_iter()) - { - let n_fields = match agg.mode { - // In partial aggregation, we keep additional fields in order to successfully - // merge aggregation results downstream. - AggregateMode::Partial => expr.state_fields()?.len(), - _ => 1, - }; - // Stores range of each expression: - let aggr_range = Range { - start: start_idx, - end: start_idx + n_fields, - }; - if expr.row_accumulator_supported() { - row_aggregate_expressions.push(others); - row_filter_expressions.push(filter.clone()); - row_agg_indices.push(aggr_range); - row_aggr_expr.push(expr.clone()); - } else { - normal_aggregate_expressions.push(others); - normal_filter_expressions.push(filter.clone()); - normal_agg_indices.push(aggr_range); - normal_aggr_expr.push(expr.clone()); - } - start_idx += n_fields; - } - let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?; - - let row_aggr_schema = aggr_state_schema(&row_aggr_expr); + // Instantiate the accumulators + let accumulators: Vec<_> = aggregate_exprs + .iter() + .map(create_group_accumulator) + .collect::>()?; let group_schema = group_schema(&agg_schema, agg_group_by.expr.len()); let row_converter = RowConverter::new( @@ -194,14 +251,10 @@ impl GroupedHashAggregateStream { .collect(), )?; - let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema)); - let name = format!("GroupedHashAggregateStream[{partition}]"); - let aggr_state = AggregationState { - reservation: MemoryConsumer::new(name).register(context.memory_pool()), - map: RawTable::with_capacity(0), - group_states: Vec::with_capacity(0), - }; + let reservation = MemoryConsumer::new(name).register(context.memory_pool()); + let map = RawTable::with_capacity(0); + let group_values = row_converter.empty_rows(0, 0); timer.done(); @@ -211,28 +264,43 @@ impl GroupedHashAggregateStream { schema: agg_schema, input, mode: agg.mode, - normal_aggr_expr, - normal_aggregate_expressions, - normal_filter_expressions, - row_aggregate_expressions, - row_filter_expressions, - row_accumulators, + accumulators, + aggregate_arguments, + filter_expressions, row_converter, - row_aggr_schema, - row_aggr_layout, group_by: agg_group_by, - aggr_state, + reservation, + map, + group_values, + scratch_space: ScratchSpace::new(), exec_state, baseline_metrics, random_state: Default::default(), batch_size, - scalar_update_factor, - row_group_skip_position: 0, - indices: [normal_agg_indices, row_agg_indices], }) } } +/// Create an accumulator for `agg_expr` -- a [`GroupsAccumulator`] if +/// that is supported by the aggregate, or a +/// [`GroupsAccumulatorAdapter`] if not. +fn create_group_accumulator( + agg_expr: &Arc, +) -> Result> { + if agg_expr.groups_accumulator_supported() { + agg_expr.create_groups_accumulator() + } else { + // Note in the log when the slow path is used + debug!( + "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}", + agg_expr.name() + ); + let agg_expr_captured = agg_expr.clone(); + let factory = move || agg_expr_captured.create_accumulator(); + Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) + } +} + impl Stream for GroupedHashAggregateStream { type Item = Result; @@ -243,7 +311,8 @@ impl Stream for GroupedHashAggregateStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); loop { - match self.exec_state { + let exec_state = self.exec_state.clone(); + match exec_state { ExecutionState::ReadingInput => { match ready!(self.input.poll_next_unpin(cx)) { // new batch to aggregate @@ -252,11 +321,15 @@ impl Stream for GroupedHashAggregateStream { let result = self.group_aggregate_batch(batch); timer.done(); - // allocate memory - // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with - // overshooting a bit. Also this means we either store the whole record batch or not. + // allocate memory AFTER we actually used + // the memory, which simplifies the whole + // accounting and we are OK with + // overshooting a bit. + // + // Also this means we either store the + // whole record batch or not. let result = result.and_then(|allocated| { - self.aggr_state.reservation.try_grow(allocated) + self.reservation.try_grow(allocated) }); if let Err(e) = result { @@ -267,32 +340,36 @@ impl Stream for GroupedHashAggregateStream { Some(Err(e)) => return Poll::Ready(Some(Err(e))), // inner is done, producing output None => { - self.exec_state = ExecutionState::ProducingOutput; + let timer = elapsed_compute.timer(); + match self.create_batch_from_map() { + Ok(batch) => { + self.exec_state = + ExecutionState::ProducingOutput(batch) + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + timer.done(); } } } - ExecutionState::ProducingOutput => { - let timer = elapsed_compute.timer(); - let result = self.create_batch_from_map(); - - timer.done(); - self.row_group_skip_position += self.batch_size; - - match result { - // made output - Ok(Some(result)) => { - let batch = result.record_output(&self.baseline_metrics); - return Poll::Ready(Some(Ok(batch))); - } - // end of output - Ok(None) => { - self.exec_state = ExecutionState::Done; - } - // error making output - Err(error) => return Poll::Ready(Some(Err(error))), - } + ExecutionState::ProducingOutput(batch) => { + // slice off a part of the batch, if needed + let output_batch = if batch.num_rows() <= self.batch_size { + self.exec_state = ExecutionState::Done; + batch + } else { + // output first batch_size rows + let num_remaining = batch.num_rows() - self.batch_size; + let remaining = batch.slice(self.batch_size, num_remaining); + self.exec_state = ExecutionState::ProducingOutput(remaining); + batch.slice(0, self.batch_size) + }; + return Poll::Ready(Some(Ok( + output_batch.record_output(&self.baseline_metrics) + ))); } + ExecutionState::Done => return Poll::Ready(None), } } @@ -306,460 +383,194 @@ impl RecordBatchStream for GroupedHashAggregateStream { } impl GroupedHashAggregateStream { - // Update the row_aggr_state according to groub_by values (result of group_by_expressions) + /// Calculates the group indices for each input row of + /// `group_values`. + /// + /// At the return of this function, + /// `self.scratch_space.current_group_indices` has the same number + /// of entries as each array in `group_values` and holds the + /// correct group_index for that row. + /// + /// This is one of the core hot loops in the algorithm fn update_group_state( &mut self, group_values: &[ArrayRef], allocated: &mut usize, - ) -> Result> { + ) -> Result<()> { + // Convert the group keys into the row format + // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available let group_rows = self.row_converter.convert_columns(group_values)?; let n_rows = group_rows.num_rows(); - // 1.1 construct the key from the group values - // 1.2 construct the mapping key if it does not exist - // 1.3 add the row' index to `indices` - // track which entries in `aggr_state` have rows in this batch to aggregate - let mut groups_with_rows = vec![]; + // track memory used + let group_values_size_pre = self.group_values.size(); + let scratch_size_pre = self.scratch_space.size(); - // 1.1 Calculate the group keys for the group values - let mut batch_hashes = vec![0; n_rows]; - create_hashes(group_values, &self.random_state, &mut batch_hashes)?; + // tracks to which group each of the input rows belongs + let group_indices = &mut self.scratch_space.current_group_indices; + group_indices.clear(); - let AggregationState { - map, group_states, .. - } = &mut self.aggr_state; + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.scratch_space.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(group_values, &self.random_state, batch_hashes)?; - for (row, hash) in batch_hashes.into_iter().enumerate() { - let entry = map.get_mut(hash, |(_hash, group_idx)| { + for (row, &hash) in batch_hashes.iter().enumerate() { + let entry = self.map.get_mut(hash, |(_hash, group_idx)| { // verify that a group that we are inserting with hash is // actually the same key value as the group in // existing_idx (aka group_values @ row) - let group_state = &group_states[*group_idx]; - - group_rows.row(row) == group_state.group_by_values.row() + group_rows.row(row) == self.group_values.row(*group_idx) }); - match entry { - // Existing entry for this group value - Some((_hash, group_idx)) => { - let group_state = &mut group_states[*group_idx]; - - // 1.3 - if group_state.indices.is_empty() { - groups_with_rows.push(*group_idx); - }; - - group_state.indices.push_accounted(row as u32, allocated); // remember this row - } - // 1.2 Need to create new entry + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx)) => *group_idx, + // 1.2 Need to create new entry for the group None => { - let accumulator_set = - aggregates::create_accumulators(&self.normal_aggr_expr)?; - // Add new entry to group_states and save newly created index - let group_state = GroupState { - group_by_values: group_rows.row(row).owned(), - aggregation_buffer: vec![ - 0; - self.row_aggr_layout.fixed_part_width() - ], - accumulator_set, - indices: vec![row as u32], // 1.3 - }; - let group_idx = group_states.len(); - - // NOTE: do NOT include the `GroupState` struct size in here because this is captured by - // `group_states` (see allocation down below) - *allocated += std::mem::size_of_val(&group_state.group_by_values) - + (std::mem::size_of::() - * group_state.aggregation_buffer.capacity()) - + (std::mem::size_of::() * group_state.indices.capacity()); - - // Allocation done by normal accumulators - *allocated += (std::mem::size_of::>() - * group_state.accumulator_set.capacity()) - + group_state - .accumulator_set - .iter() - .map(|accu| accu.size()) - .sum::(); + // Add new entry to aggr_state and save newly created index + let group_idx = self.group_values.num_rows(); + self.group_values.push(group_rows.row(row)); // for hasher function, use precomputed hash value - map.insert_accounted( + self.map.insert_accounted( (hash, group_idx), |(hash, _group_index)| *hash, allocated, ); - - group_states.push_accounted(group_state, allocated); - - groups_with_rows.push(group_idx); + group_idx } }; + group_indices.push(group_idx); } - Ok(groups_with_rows) - } - // Update the accumulator results, according to row_aggr_state. - #[allow(clippy::too_many_arguments)] - fn update_accumulators_using_batch( - &mut self, - groups_with_rows: &[usize], - offsets: &[usize], - row_values: &[Vec], - normal_values: &[Vec], - row_filter_values: &[Option], - normal_filter_values: &[Option], - allocated: &mut usize, - ) -> Result<()> { - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - groups_with_rows - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(group_idx, offsets)| { - let group_state = &mut self.aggr_state.group_states[*group_idx]; - // 2.2 - // Process row accumulators - self.row_accumulators - .iter_mut() - .zip(row_values.iter()) - .zip(row_filter_values.iter()) - .try_for_each(|((accumulator, aggr_array), filter_opt)| { - let values = slice_and_maybe_filter( - aggr_array, - filter_opt.as_ref(), - offsets, - )?; - let mut state_accessor = - RowAccessor::new_from_layout(self.row_aggr_layout.clone()); - state_accessor - .point_to(0, group_state.aggregation_buffer.as_mut_slice()); - match self.mode { - AggregateMode::Partial | AggregateMode::Single => { - accumulator.update_batch(&values, &mut state_accessor) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values, &mut state_accessor) - } - } - })?; - // normal accumulators - group_state - .accumulator_set - .iter_mut() - .zip(normal_values.iter()) - .zip(normal_filter_values.iter()) - .try_for_each(|((accumulator, aggr_array), filter_opt)| { - let values = slice_and_maybe_filter( - aggr_array, - filter_opt.as_ref(), - offsets, - )?; - let size_pre = accumulator.size(); - let res = match self.mode { - AggregateMode::Partial | AggregateMode::Single => { - accumulator.update_batch(&values) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }; - let size_post = accumulator.size(); - *allocated += size_post.saturating_sub(size_pre); - res - }) - // 2.5 - .and({ - group_state.indices.clear(); - Ok(()) - }) - })?; - Ok(()) - } + // account for memory growth in scratch space + *allocated += self.scratch_space.size(); + *allocated -= scratch_size_pre; // subtract after adding to avoid underflow - // Update the accumulator results, according to row_aggr_state. - fn update_accumulators_using_scalar( - &mut self, - groups_with_rows: &[usize], - row_values: &[Vec], - row_filter_values: &[Option], - ) -> Result<()> { - let filter_bool_array = row_filter_values - .iter() - .map(|filter_opt| match filter_opt { - Some(f) => Ok(Some(as_boolean_array(f)?)), - None => Ok(None), - }) - .collect::>>()?; - - for group_idx in groups_with_rows { - let group_state = &mut self.aggr_state.group_states[*group_idx]; - let mut state_accessor = - RowAccessor::new_from_layout(self.row_aggr_layout.clone()); - state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice()); - for idx in &group_state.indices { - for (accumulator, values_array, filter_array) in izip!( - self.row_accumulators.iter_mut(), - row_values.iter(), - filter_bool_array.iter() - ) { - if values_array.len() == 1 { - let scalar_value = - col_to_scalar(&values_array[0], filter_array, *idx as usize)?; - accumulator.update_scalar(&scalar_value, &mut state_accessor)?; - } else { - let scalar_values = values_array - .iter() - .map(|array| { - col_to_scalar(array, filter_array, *idx as usize) - }) - .collect::>>()?; - accumulator - .update_scalar_values(&scalar_values, &mut state_accessor)?; - } - } - } - // clear the group indices in this group - group_state.indices.clear(); - } + // account for any memory increase used to store group_values + *allocated += self.group_values.size(); + *allocated -= group_values_size_pre; // subtract after adding to avoid underflow Ok(()) } /// Perform group-by aggregation for the given [`RecordBatch`]. /// - /// If successful, this returns the additional number of bytes that were allocated during this process. - /// + /// If successful, returns the additional amount of memory, in + /// bytes, that were allocated during this process. fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result { - // Evaluate the grouping expressions: + // Evaluate the grouping expressions let group_by_values = evaluate_group_by(&self.group_by, &batch)?; + // Keep track of memory allocated: let mut allocated = 0usize; // Evaluate the aggregation expressions. - // We could evaluate them after the `take`, but since we need to evaluate all - // of them anyways, it is more performant to do it while they are together. - let row_aggr_input_values = - evaluate_many(&self.row_aggregate_expressions, &batch)?; - let normal_aggr_input_values = - evaluate_many(&self.normal_aggregate_expressions, &batch)?; - let row_filter_values = evaluate_optional(&self.row_filter_expressions, &batch)?; - let normal_filter_values = - evaluate_optional(&self.normal_filter_expressions, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + + // Evaluate the filter expressions, if any, against the inputs + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; let row_converter_size_pre = self.row_converter.size(); + for group_values in &group_by_values { - let groups_with_rows = - self.update_group_state(group_values, &mut allocated)?; - // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet: - // 1) The aggregation mode is Partial or Single - // 2) There is not normal aggregation expressions - // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case - if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single) - && normal_aggr_input_values.is_empty() - && normal_filter_values.is_empty() - && groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor - { - self.update_accumulators_using_scalar( - &groups_with_rows, - &row_aggr_input_values, - &row_filter_values, - )?; - } else { - // Collect all indices + offsets based on keys in this vec - let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); - let mut offsets = vec![0]; - let mut offset_so_far = 0; - for &group_idx in groups_with_rows.iter() { - let indices = &self.aggr_state.group_states[group_idx].indices; - batch_indices.append_slice(indices); - offset_so_far += indices.len(); - offsets.push(offset_so_far); + // calculate the group indices for each input row + self.update_group_state(group_values, &mut allocated)?; + let group_indices = &self.scratch_space.current_group_indices; + + // Gather the inputs to call the actual accumulator + let t = self + .accumulators + .iter_mut() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + let total_num_groups = self.group_values.num_rows(); + + for ((acc, values), opt_filter) in t { + let acc_size_pre = acc.size(); + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + + // Call the appropriate method on each aggregator with + // the entire input row and the relevant group indexes + match self.mode { + AggregateMode::Partial | AggregateMode::Single => { + acc.update_batch( + values, + group_indices, + opt_filter, + total_num_groups, + )?; + } + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // if aggregation is over intermediate states, + // use merge + acc.merge_batch( + values, + group_indices, + opt_filter, + total_num_groups, + )?; + } } - let batch_indices = batch_indices.finish(); - - let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; - let normal_values = - get_at_indices(&normal_aggr_input_values, &batch_indices)?; - let row_filter_values = - get_optional_filters(&row_filter_values, &batch_indices); - let normal_filter_values = - get_optional_filters(&normal_filter_values, &batch_indices); - self.update_accumulators_using_batch( - &groups_with_rows, - &offsets, - &row_values, - &normal_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; + + allocated += acc.size(); + allocated -= acc_size_pre; } } - allocated += self - .row_converter - .size() - .saturating_sub(row_converter_size_pre); - Ok(allocated) - } -} - -/// The state of all the groups -pub(crate) struct AggregationState { - pub reservation: MemoryReservation, - - /// Logically maps group values to an index in `group_states` - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, index into `group_states`) - pub map: RawTable<(u64, usize)>, - - /// State for each group - pub group_states: Vec, -} + allocated += self.row_converter.size(); + allocated -= row_converter_size_pre; -impl std::fmt::Debug for AggregationState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - // hashes are not store inline, so could only get values - let map_string = "RawTable"; - f.debug_struct("AggregationState") - .field("map", &map_string) - .field("group_states", &self.group_states) - .finish() + Ok(allocated) } -} -impl GroupedHashAggregateStream { - /// Create a RecordBatch with all group keys and accumulator' states or values. - fn create_batch_from_map(&mut self) -> Result> { - let skip_items = self.row_group_skip_position; - if skip_items > self.aggr_state.group_states.len() { - return Ok(None); - } - if self.aggr_state.group_states.is_empty() { + /// Create an output RecordBatch with all group keys and accumulator states/values + fn create_batch_from_map(&mut self) -> Result { + if self.group_values.num_rows() == 0 { let schema = self.schema.clone(); - return Ok(Some(RecordBatch::new_empty(schema))); + return Ok(RecordBatch::new_empty(schema)); } - let end_idx = min( - skip_items + self.batch_size, - self.aggr_state.group_states.len(), - ); - let group_state_chunk = &self.aggr_state.group_states[skip_items..end_idx]; - - if group_state_chunk.is_empty() { - let schema = self.schema.clone(); - return Ok(Some(RecordBatch::new_empty(schema))); + // First output rows are the groups + let groups_rows = self.group_values.iter(); + let mut output: Vec = self.row_converter.convert_rows(groups_rows)?; + + // Next output each aggregate value, from the accumulators + for acc in self.accumulators.iter_mut() { + match self.mode { + AggregateMode::Partial => output.extend(acc.state()?), + AggregateMode::Final + | AggregateMode::FinalPartitioned + | AggregateMode::Single => output.push(acc.evaluate()?), + } } - // Buffers for each distinct group (i.e. row accumulator memories) - let mut state_buffers = group_state_chunk - .iter() - .map(|gs| gs.aggregation_buffer.clone()) - .collect::>(); - - let output_fields = self.schema.fields(); - // Store row accumulator results (either final output or intermediate state): - let row_columns = match self.mode { - AggregateMode::Partial => { - read_as_batch(&state_buffers, &self.row_aggr_schema) - } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single => { - let mut results = vec![]; - for (idx, acc) in self.row_accumulators.iter().enumerate() { - let mut state_accessor = RowAccessor::new(&self.row_aggr_schema); - let current = state_buffers - .iter_mut() - .map(|buffer| { - state_accessor.point_to(0, buffer); - acc.evaluate(&state_accessor) - }) - .collect::>>()?; - // Get corresponding field for row accumulator - let field = &output_fields[self.indices[1][idx].start]; - let result = if current.is_empty() { - Ok(arrow::array::new_empty_array(field.data_type())) - } else { - let item = ScalarValue::iter_to_array(current)?; - // cast output if needed (e.g. for types like Dictionary where - // the intermediate GroupByScalar type was not the same as the - // output - cast(&item, field.data_type()) - }?; - results.push(result); - } - results - } - }; + Ok(RecordBatch::try_new(self.schema.clone(), output)?) + } +} - // Store normal accumulator results (either final output or intermediate state): - let mut columns = vec![]; - for (idx, &Range { start, end }) in self.indices[0].iter().enumerate() { - for (field_idx, field) in output_fields[start..end].iter().enumerate() { - let current = match self.mode { - AggregateMode::Partial => ScalarValue::iter_to_array( - group_state_chunk.iter().map(|group_state| { - group_state.accumulator_set[idx] - .state() - .map(|v| v[field_idx].clone()) - .expect("Unexpected accumulator state in hash aggregate") - }), - ), - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single => ScalarValue::iter_to_array( - group_state_chunk.iter().map(|group_state| { - group_state.accumulator_set[idx] - .evaluate() - .expect("Unexpected accumulator state in hash aggregate") - }), - ), - }?; - // Cast output if needed (e.g. for types like Dictionary where - // the intermediate GroupByScalar type was not the same as the - // output - let result = cast(¤t, field.data_type())?; - columns.push(result); - } - } +/// Holds structures used for the current input [`RecordBatch`] being +/// processed. Reused across batches here to avoid reallocations +#[derive(Debug, Default)] +struct ScratchSpace { + /// scratch space for the current input [`RecordBatch`] being + /// processed. Reused across batches here to avoid reallocations + current_group_indices: Vec, + // buffer to be reused to store hashes + hashes_buffer: Vec, +} - // Stores the group by fields - let group_buffers = group_state_chunk - .iter() - .map(|gs| gs.group_by_values.row()) - .collect::>(); - let mut output: Vec = self.row_converter.convert_rows(group_buffers)?; +impl ScratchSpace { + fn new() -> Self { + Default::default() + } - // The size of the place occupied by row and normal accumulators - let extra: usize = self - .indices - .iter() - .flatten() - .map(|Range { start, end }| end - start) - .sum(); - let empty_arr = new_null_array(&DataType::Null, 1); - output.extend(std::iter::repeat(empty_arr).take(extra)); - - // Write results of both accumulator types to the corresponding location in - // the output schema: - let results = [columns.into_iter(), row_columns.into_iter()]; - for (outer, mut current) in results.into_iter().enumerate() { - for &Range { start, end } in self.indices[outer].iter() { - for item in output.iter_mut().take(end).skip(start) { - *item = current.next().expect("Columns cannot be empty"); - } - } - } - Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?)) + /// Return the amount of memory alocated by this structure in bytes + fn size(&self) -> usize { + std::mem::size_of_val(self) + + self.current_group_indices.allocated_size() + + self.hashes_buffer.allocated_size() } } diff --git a/datafusion/core/tests/user_defined/user_defined_window_functions.rs b/datafusion/core/tests/user_defined/user_defined_window_functions.rs index 8736ede690f1..1331347fac80 100644 --- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs @@ -515,7 +515,7 @@ impl PartitionEvaluator for OddCounter { println!("evaluate, values: {values:#?}, range: {range:?}"); self.test_state.inc_evaluate_called(); - let values: &Int64Array = values.get(0).unwrap().as_primitive(); + let values: &Int64Array = values[0].as_primitive(); let values = values.slice(range.start, range.len()); let scalar = ScalarValue::Int64( match (odd_count(&values), self.test_state.null_for_zero) { @@ -534,10 +534,7 @@ impl PartitionEvaluator for OddCounter { println!("evaluate_all, values: {values:#?}, num_rows: {num_rows}"); self.test_state.inc_evaluate_all_called(); - Ok(odd_count_arr( - values.get(0).unwrap().as_primitive(), - num_rows, - )) + Ok(odd_count_arr(values[0].as_primitive(), num_rows)) } fn evaluate_all_with_rank( diff --git a/datafusion/execution/src/memory_pool/proxy.rs b/datafusion/execution/src/memory_pool/proxy.rs index 43532f9a81f1..2bf485c6ee76 100644 --- a/datafusion/execution/src/memory_pool/proxy.rs +++ b/datafusion/execution/src/memory_pool/proxy.rs @@ -26,6 +26,11 @@ pub trait VecAllocExt { /// [Push](Vec::push) new element to vector and store additional allocated bytes in `accounting` (additive). fn push_accounted(&mut self, x: Self::T, accounting: &mut usize); + + /// Return the amount of memory allocated by this Vec (not + /// recursively counting any heap allocations contained within the + /// structure). Does not include the size of `self` + fn allocated_size(&self) -> usize; } impl VecAllocExt for Vec { @@ -44,6 +49,9 @@ impl VecAllocExt for Vec { self.push(x); } + fn allocated_size(&self) -> usize { + std::mem::size_of::() * self.capacity() + } } /// Extension trait for [`RawTable`] to account for allocations. diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index d1c2f7bf3377..b7ffa1810cce 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -62,6 +62,7 @@ indexmap = "2.0.0" itertools = { version = "0.11", features = ["use_std"] } lazy_static = { version = "^1.4.0" } libc = "0.2.140" +log = "^0.4" md-5 = { version = "^0.10.0", optional = true } paste = "^1.0" petgraph = "0.6.2" diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 3c76da51a9d4..e95e9fcf877a 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -17,10 +17,14 @@ //! Defines physical expressions that can evaluated at runtime during query execution +use arrow::array::{AsArray, PrimitiveBuilder}; +use log::debug; + use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; +use crate::aggregate::groups_accumulator::accumulate::NullState; use crate::aggregate::row_accumulator::{ is_row_accumulator_support_dtype, RowAccumulator, }; @@ -29,19 +33,23 @@ use crate::aggregate::sum::sum_batch; use crate::aggregate::utils::calculate_result_decimal_for_avg; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; -use crate::{AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::compute; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type}; use arrow::{ array::{ArrayRef, UInt64Array}, datatypes::Field, }; -use arrow_array::Array; +use arrow_array::{ + Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray, +}; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; use datafusion_row::accessor::RowAccessor; +use super::utils::{adjust_output_array, Decimal128Averager}; + /// AVG aggregate expression #[derive(Debug, Clone)] pub struct Avg { @@ -155,6 +163,50 @@ impl AggregateExpr for Avg { &self.rt_data_type, )?)) } + + fn groups_accumulator_supported(&self) -> bool { + use DataType::*; + + matches!(&self.rt_data_type, Float64 | Decimal128(_, _)) + } + + fn create_groups_accumulator(&self) -> Result> { + use DataType::*; + // instantiate specialized accumulator based for the type + match (&self.sum_data_type, &self.rt_data_type) { + (Float64, Float64) => { + Ok(Box::new(AvgGroupsAccumulator::::new( + &self.sum_data_type, + &self.rt_data_type, + |sum: f64, count: u64| Ok(sum / count as f64), + ))) + } + ( + Decimal128(_sum_precision, sum_scale), + Decimal128(target_precision, target_scale), + ) => { + let decimal_averager = Decimal128Averager::try_new( + *sum_scale, + *target_precision, + *target_scale, + )?; + + let avg_fn = + move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128); + + Ok(Box::new(AvgGroupsAccumulator::::new( + &self.sum_data_type, + &self.rt_data_type, + avg_fn, + ))) + } + + _ => Err(DataFusionError::NotImplemented(format!( + "AvgGroupsAccumulator for ({} --> {})", + self.sum_data_type, self.rt_data_type, + ))), + } + } } impl PartialEq for Avg { @@ -383,6 +435,190 @@ impl RowAccumulator for AvgRowAccumulator { } } +/// An accumulator to compute the average of `[PrimitiveArray]`. +/// Stores values as native types, and does overflow checking +/// +/// F: Function that calculates the average value from a sum of +/// T::Native and a total count +#[derive(Debug)] +struct AvgGroupsAccumulator +where + T: ArrowNumericType + Send, + F: Fn(T::Native, u64) -> Result + Send, +{ + /// The type of the internal sum + sum_data_type: DataType, + + /// The type of the returned sum + return_data_type: DataType, + + /// Count per group (use u64 to make UInt64Array) + counts: Vec, + + /// Sums per group, stored as the native type + sums: Vec, + + /// Track nulls in the input / filters + null_state: NullState, + + /// Function that computes the final average (value / count) + avg_fn: F, +} + +impl AvgGroupsAccumulator +where + T: ArrowNumericType + Send, + F: Fn(T::Native, u64) -> Result + Send, +{ + pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn: F) -> Self { + debug!( + "AvgGroupsAccumulator ({}, sum type: {sum_data_type:?}) --> {return_data_type:?}", + std::any::type_name::() + ); + + Self { + return_data_type: return_data_type.clone(), + sum_data_type: sum_data_type.clone(), + counts: vec![], + sums: vec![], + null_state: NullState::new(), + avg_fn, + } + } +} + +impl GroupsAccumulator for AvgGroupsAccumulator +where + T: ArrowNumericType + Send, + F: Fn(T::Native, u64) -> Result + Send, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_primitive::(); + + // increment counts, update sums + self.counts.resize(total_num_groups, 0); + self.sums.resize(total_num_groups, T::default_value()); + self.null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let sum = &mut self.sums[group_index]; + *sum = sum.add_wrapping(new_value); + + self.counts[group_index] += 1; + }, + ); + + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 2, "two arguments to merge_batch"); + // first batch is counts, second is partial sums + let partial_counts = values[0].as_primitive::(); + let partial_sums = values[1].as_primitive::(); + // update counts with partial counts + self.counts.resize(total_num_groups, 0); + self.null_state.accumulate( + group_indices, + partial_counts, + opt_filter, + total_num_groups, + |group_index, partial_count| { + self.counts[group_index] += partial_count; + }, + ); + + // update sums + self.sums.resize(total_num_groups, T::default_value()); + self.null_state.accumulate( + group_indices, + partial_sums, + opt_filter, + total_num_groups, + |group_index, new_value: ::Native| { + let sum = &mut self.sums[group_index]; + *sum = sum.add_wrapping(new_value); + }, + ); + + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let counts = std::mem::take(&mut self.counts); + let sums = std::mem::take(&mut self.sums); + let nulls = self.null_state.build(); + + assert_eq!(nulls.len(), sums.len()); + assert_eq!(counts.len(), sums.len()); + + // don't evaluate averages with null inputs to avoid errors on null values + + let array: PrimitiveArray = if nulls.null_count() > 0 { + let mut builder = PrimitiveBuilder::::with_capacity(nulls.len()); + let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter()); + + for ((sum, count), is_valid) in iter { + if is_valid { + builder.append_value((self.avg_fn)(sum, count)?) + } else { + builder.append_null(); + } + } + builder.finish() + } else { + let averages: Vec = sums + .into_iter() + .zip(counts.into_iter()) + .map(|(sum, count)| (self.avg_fn)(sum, count)) + .collect::>>()?; + PrimitiveArray::new(averages.into(), Some(nulls)) // no copy + }; + + // fix up decimal precision and scale for decimals + let array = adjust_output_array(&self.return_data_type, Arc::new(array))?; + + Ok(array) + } + + // return arrays for sums and counts + fn state(&mut self) -> Result> { + let nulls = Some(self.null_state.build()); + let counts = std::mem::take(&mut self.counts); + let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy + + let sums = std::mem::take(&mut self.sums); + let sums = PrimitiveArray::::new(sums.into(), nulls); // zero copy + let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?; + + Ok(vec![ + Arc::new(counts) as ArrayRef, + Arc::new(sums) as ArrayRef, + ]) + } + + fn size(&self) -> usize { + self.counts.capacity() * std::mem::size_of::() + + self.sums.capacity() * std::mem::size_of::() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 4bbe563edce8..ab37e5891e3f 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -15,15 +15,18 @@ // specific language governing permissions and limitations // under the License. -//! Defines physical expressions that can evaluated at runtime during query execution +//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators use ahash::RandomState; use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use crate::{AggregateExpr, PhysicalExpr}; -use arrow::datatypes::DataType; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use arrow::datatypes::{ + DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, +}; use arrow::{ array::{ ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, @@ -35,6 +38,7 @@ use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; use std::collections::HashSet; +use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use crate::aggregate::row_accumulator::{ is_row_accumulator_support_dtype, RowAccumulator, }; @@ -44,6 +48,19 @@ use arrow::array::Array; use arrow::compute::{bit_and, bit_or, bit_xor}; use datafusion_row::accessor::RowAccessor; +/// Creates a [`PrimitiveGroupsAccumulator`] with the specified +/// [`ArrowPrimitiveType`] which applies `$FN` to each element +/// +/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType +macro_rules! instantiate_primitive_accumulator { + ($SELF:expr, $PRIMTYPE:ident, $FN:expr) => {{ + Ok(Box::new(PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new( + &$SELF.data_type, + $FN, + ))) + }}; +} + // returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account macro_rules! typed_bit_and_or_xor_batch { ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ @@ -254,6 +271,54 @@ impl AggregateExpr for BitAnd { ))) } + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + use std::ops::BitAndAssign; + match self.data_type { + DataType::Int8 => { + instantiate_primitive_accumulator!(self, Int8Type, |x, y| x + .bitand_assign(y)) + } + DataType::Int16 => { + instantiate_primitive_accumulator!(self, Int16Type, |x, y| x + .bitand_assign(y)) + } + DataType::Int32 => { + instantiate_primitive_accumulator!(self, Int32Type, |x, y| x + .bitand_assign(y)) + } + DataType::Int64 => { + instantiate_primitive_accumulator!(self, Int64Type, |x, y| x + .bitand_assign(y)) + } + DataType::UInt8 => { + instantiate_primitive_accumulator!(self, UInt8Type, |x, y| x + .bitand_assign(y)) + } + DataType::UInt16 => { + instantiate_primitive_accumulator!(self, UInt16Type, |x, y| x + .bitand_assign(y)) + } + DataType::UInt32 => { + instantiate_primitive_accumulator!(self, UInt32Type, |x, y| x + .bitand_assign(y)) + } + DataType::UInt64 => { + instantiate_primitive_accumulator!(self, UInt64Type, |x, y| x + .bitand_assign(y)) + } + + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -444,6 +509,54 @@ impl AggregateExpr for BitOr { ))) } + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + use std::ops::BitOrAssign; + match self.data_type { + DataType::Int8 => { + instantiate_primitive_accumulator!(self, Int8Type, |x, y| x + .bitor_assign(y)) + } + DataType::Int16 => { + instantiate_primitive_accumulator!(self, Int16Type, |x, y| x + .bitor_assign(y)) + } + DataType::Int32 => { + instantiate_primitive_accumulator!(self, Int32Type, |x, y| x + .bitor_assign(y)) + } + DataType::Int64 => { + instantiate_primitive_accumulator!(self, Int64Type, |x, y| x + .bitor_assign(y)) + } + DataType::UInt8 => { + instantiate_primitive_accumulator!(self, UInt8Type, |x, y| x + .bitor_assign(y)) + } + DataType::UInt16 => { + instantiate_primitive_accumulator!(self, UInt16Type, |x, y| x + .bitor_assign(y)) + } + DataType::UInt32 => { + instantiate_primitive_accumulator!(self, UInt32Type, |x, y| x + .bitor_assign(y)) + } + DataType::UInt64 => { + instantiate_primitive_accumulator!(self, UInt64Type, |x, y| x + .bitor_assign(y)) + } + + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -635,6 +748,54 @@ impl AggregateExpr for BitXor { ))) } + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + use std::ops::BitXorAssign; + match self.data_type { + DataType::Int8 => { + instantiate_primitive_accumulator!(self, Int8Type, |x, y| x + .bitxor_assign(y)) + } + DataType::Int16 => { + instantiate_primitive_accumulator!(self, Int16Type, |x, y| x + .bitxor_assign(y)) + } + DataType::Int32 => { + instantiate_primitive_accumulator!(self, Int32Type, |x, y| x + .bitxor_assign(y)) + } + DataType::Int64 => { + instantiate_primitive_accumulator!(self, Int64Type, |x, y| x + .bitxor_assign(y)) + } + DataType::UInt8 => { + instantiate_primitive_accumulator!(self, UInt8Type, |x, y| x + .bitxor_assign(y)) + } + DataType::UInt16 => { + instantiate_primitive_accumulator!(self, UInt16Type, |x, y| x + .bitxor_assign(y)) + } + DataType::UInt32 => { + instantiate_primitive_accumulator!(self, UInt32Type, |x, y| x + .bitxor_assign(y)) + } + DataType::UInt64 => { + instantiate_primitive_accumulator!(self, UInt64Type, |x, y| x + .bitxor_assign(y)) + } + + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs index e444dc61ee1b..6107b0972c81 100644 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs @@ -17,10 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use std::any::Any; -use std::sync::Arc; - -use crate::{AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::datatypes::DataType; use arrow::{ array::{ArrayRef, BooleanArray}, @@ -28,7 +25,10 @@ use arrow::{ }; use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; +use std::any::Any; +use std::sync::Arc; +use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; use crate::aggregate::row_accumulator::{ is_row_accumulator_support_dtype, RowAccumulator, }; @@ -193,6 +193,23 @@ impl AggregateExpr for BoolAnd { ))) } + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + match self.data_type { + DataType::Boolean => { + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y))) + } + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -381,6 +398,23 @@ impl AggregateExpr for BoolOr { ))) } + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + match self.data_type { + DataType::Boolean => { + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y))) + } + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 22cb2512fc42..e0b9ffd81ae5 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -24,11 +24,14 @@ use std::sync::Arc; use crate::aggregate::row_accumulator::RowAccumulator; use crate::aggregate::utils::down_cast_any_ref; -use crate::{AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::array::{Array, Int64Array}; use arrow::compute; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::Field}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::PrimitiveArray; use arrow_buffer::BooleanBuffer; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; @@ -37,6 +40,8 @@ use datafusion_row::accessor::RowAccessor; use crate::expressions::format_state_name; +use super::groups_accumulator::accumulate::accumulate_indices; + /// COUNT aggregate expression /// Returns the amount of non-null values of the given expression. #[derive(Debug, Clone)] @@ -44,6 +49,10 @@ pub struct Count { name: String, data_type: DataType, nullable: bool, + /// Input exprs + /// + /// For `COUNT(c1)` this is `[c1]` + /// For `COUNT(c1, c2)` this is `[c1, c2]` exprs: Vec>, } @@ -76,6 +85,114 @@ impl Count { } } +/// An accumulator to compute the counts of [`PrimitiveArray`]. +/// Stores values as native types, and does overflow checking +/// +/// Unlike most other accumulators, COUNT never produces NULLs. If no +/// non-null values are seen in any group the output is 0. Thus, this +/// accumulator has no additional null or seen filter tracking. +#[derive(Debug)] +struct CountGroupsAccumulator { + /// Count per group. + /// + /// Note this is an i64 and not a u64 (or usize) because the + /// output type of count is `DataType::Int64`. Thus by using `i64` + /// for the counts, the output [`Int64Array`] can be created + /// without copy. + counts: Vec, +} + +impl CountGroupsAccumulator { + pub fn new() -> Self { + Self { counts: vec![] } + } +} + +impl GroupsAccumulator for CountGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = &values[0]; + + // Add one to each group's counter for each non null, non + // filtered value + self.counts.resize(total_num_groups, 0); + accumulate_indices( + group_indices, + values.nulls(), // ignore values + opt_filter, + |group_index| { + self.counts[group_index] += 1; + }, + ); + + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "one argument to merge_batch"); + // first batch is counts, second is partial sums + let partial_counts = values[0].as_primitive::(); + + // intermediate counts are always created as non null + assert_eq!(partial_counts.null_count(), 0); + let partial_counts = partial_counts.values(); + + // Adds the counts with the partial counts + self.counts.resize(total_num_groups, 0); + match opt_filter { + Some(filter) => filter + .iter() + .zip(group_indices.iter()) + .zip(partial_counts.iter()) + .for_each(|((filter_value, &group_index), partial_count)| { + if let Some(true) = filter_value { + self.counts[group_index] += partial_count; + } + }), + None => group_indices.iter().zip(partial_counts.iter()).for_each( + |(&group_index, partial_count)| { + self.counts[group_index] += partial_count; + }, + ), + } + + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let counts = std::mem::take(&mut self.counts); + + // Count is always non null (null inputs just don't contribute to the overall values) + let nulls = None; + let array = PrimitiveArray::::new(counts.into(), nulls); + + Ok(Arc::new(array)) + } + + // return arrays for counts + fn state(&mut self) -> Result> { + let counts = std::mem::take(&mut self.counts); + let counts: PrimitiveArray = Int64Array::from(counts); // zero copy, no nulls + Ok(vec![Arc::new(counts) as ArrayRef]) + } + + fn size(&self) -> usize { + self.counts.capacity() * std::mem::size_of::() + } +} + /// count null values for multiple columns /// for each row if one column value is null, then null_count + 1 fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize { @@ -133,6 +250,12 @@ impl AggregateExpr for Count { true } + fn groups_accumulator_supported(&self) -> bool { + // groups accumulator only supports `COUNT(c1)`, not + // `COUNT(c1, c2)`, etc + self.exprs.len() == 1 + } + fn create_row_accumulator( &self, start_index: usize, @@ -147,6 +270,11 @@ impl AggregateExpr for Count { fn create_sliding_accumulator(&self) -> Result> { Ok(Box::new(CountAccumulator::new())) } + + fn create_groups_accumulator(&self) -> Result> { + // instantiate specialized accumulator + Ok(Box::new(CountGroupsAccumulator::new())) + } } impl PartialEq for Count { diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs new file mode 100644 index 000000000000..bcc9d30bedd8 --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs @@ -0,0 +1,854 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GroupsAccumulator`] helpers: [`NullState`] and [`accumulate_indices`] +//! +//! [`GroupsAccumulator`]: crate::GroupsAccumulator + +use arrow::datatypes::ArrowPrimitiveType; +use arrow_array::{Array, BooleanArray, PrimitiveArray}; +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + +/// Track the accumulator null state per row: if any values for that +/// group were null and if any values have been seen at all for that group. +/// +/// This is part of the inner loop for many [`GroupsAccumulator`]s, +/// and thus the performance is critical and so there are multiple +/// specialized implementations, invoked depending on the specific +/// combinations of the input. +/// +/// Typically there are 4 potential combinations of inputs must be +/// special cased for performance: +/// +/// * With / Without filter +/// * With / Without nulls in the input +/// +/// If the input has nulls, then the accumulator must potentially +/// handle each input null value specially (e.g. for `SUM` to mark the +/// corresponding sum as null) +/// +/// If there are filters present, `NullState` tracks if it has seen +/// *any* value for that group (as some values may be filtered +/// out). Without a filter, the accumulator is only passed groups that +/// had at least one value to accumulate so they do not need to track +/// if they have seen values for a particular group. +/// +/// [`GroupsAccumulator`]: crate::GroupsAccumulator +#[derive(Debug)] +pub struct NullState { + /// Have we seen any non-filtered input values for `group_index`? + /// + /// If `seen_values[i]` is true, have seen at least one non null + /// value for group `i` + /// + /// If `seen_values[i]` is false, have not seen any values that + /// pass the filter yet for group `i` + seen_values: BooleanBufferBuilder, +} + +impl NullState { + pub fn new() -> Self { + Self { + seen_values: BooleanBufferBuilder::new(0), + } + } + + /// return the size of all buffers allocated by this null state, not including self + pub fn size(&self) -> usize { + // capacity is in bits, so convert to bytes + self.seen_values.capacity() / 8 + } + + /// Invokes `value_fn(group_index, value)` for each non null, non + /// filtered value of `value`, while tracking which groups have + /// seen null inputs and which groups have seen any inputs if necessary + // + /// # Arguments: + /// + /// * `values`: the input arguments to the accumulator + /// * `group_indices`: To which groups do the rows in `values` belong, (aka group_index) + /// * `opt_filter`: if present, only rows for which is Some(true) are included + /// * `value_fn`: function invoked for (group_index, value) where value is non null + /// + /// # Example + /// + /// ```text + /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ + /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ + /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ + /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ + /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ + /// │ └─────┘ │ │ └─────┘ │ └─────┘ + /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ + /// + /// group_indices values opt_filter + /// ``` + /// + /// In the example above, `value_fn` is invoked for each (group_index, + /// value) pair where `opt_filter[i]` is true and values is non null + /// + /// ```text + /// value_fn(2, 200) + /// value_fn(0, 200) + /// value_fn(0, 300) + /// ``` + /// + /// It also sets + /// + /// 1. `self.seen_values[group_index]` to true for all rows that had a non null vale + pub fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + mut value_fn: F, + ) where + T: ArrowPrimitiveType + Send, + F: FnMut(usize, T::Native) + Send, + { + let data: &[T::Native] = values.values(); + assert_eq!(data.len(), group_indices.len()); + + // ensure the seen_values is big enough (start everything at + // "not seen" valid) + let seen_values = + initialize_builder(&mut self.seen_values, total_num_groups, false); + + match (values.null_count() > 0, opt_filter) { + // no nulls, no filter, + (false, None) => { + let iter = group_indices.iter().zip(data.iter()); + for (&group_index, &new_value) in iter { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + } + // nulls, no filter + (true, None) => { + let nulls = values.nulls().unwrap(); + // This is based on (ahem, COPY/PASTE) arrow::compute::aggregate::sum + // iterate over in chunks of 64 bits for more efficient null checking + let group_indices_chunks = group_indices.chunks_exact(64); + let data_chunks = data.chunks_exact(64); + let bit_chunks = nulls.inner().bit_chunks(); + + let group_indices_remainder = group_indices_chunks.remainder(); + let data_remainder = data_chunks.remainder(); + + group_indices_chunks + .zip(data_chunks) + .zip(bit_chunks.iter()) + .for_each(|((group_index_chunk, data_chunk), mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + group_index_chunk.iter().zip(data_chunk.iter()).for_each( + |(&group_index, &new_value)| { + // valid bit was set, real value + let is_valid = (mask & index_mask) != 0; + if is_valid { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + index_mask <<= 1; + }, + ) + }); + + // handle any remaining bits (after the initial 64) + let remainder_bits = bit_chunks.remainder_bits(); + group_indices_remainder + .iter() + .zip(data_remainder.iter()) + .enumerate() + .for_each(|(i, (&group_index, &new_value))| { + let is_valid = remainder_bits & (1 << i) != 0; + if is_valid { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + }); + } + // no nulls, but a filter + (false, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than a single + // iterator. TODO file a ticket + group_indices + .iter() + .zip(data.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, &new_value), filter_value)| { + if let Some(true) = filter_value { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + }) + } + // both null values and filters + (true, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than using + // iterators. TODO file a ticket + filter + .iter() + .zip(group_indices.iter()) + .zip(values.iter()) + .for_each(|((filter_value, &group_index), new_value)| { + if let Some(true) = filter_value { + if let Some(new_value) = new_value { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value) + } + } + }) + } + } + } + + /// Invokes `value_fn(group_index, value)` for each non null, non + /// filtered value in `values`, while tracking which groups have + /// seen null inputs and which groups have seen any inputs, for + /// [`BooleanArray`]s. + /// + /// Since `BooleanArray` is not a [`PrimitiveArray`] it must be + /// handled specially. + /// + /// See [`Self::accumulate`], which handles `PrimitiveArray`s, for + /// more details on other arguments. + pub fn accumulate_boolean( + &mut self, + group_indices: &[usize], + values: &BooleanArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + mut value_fn: F, + ) where + F: FnMut(usize, bool) + Send, + { + let data = values.values(); + assert_eq!(data.len(), group_indices.len()); + + // ensure the seen_values is big enough (start everything at + // "not seen" valid) + let seen_values = + initialize_builder(&mut self.seen_values, total_num_groups, false); + + // These could be made more performant by iterating in chunks of 64 bits at a time + match (values.null_count() > 0, opt_filter) { + // no nulls, no filter, + (false, None) => { + // if we have previously seen nulls, ensure the null + // buffer is big enough (start everything at valid) + group_indices.iter().zip(data.iter()).for_each( + |(&group_index, new_value)| { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value) + }, + ) + } + // nulls, no filter + (true, None) => { + let nulls = values.nulls().unwrap(); + group_indices + .iter() + .zip(data.iter()) + .zip(nulls.iter()) + .for_each(|((&group_index, new_value), is_valid)| { + if is_valid { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + }) + } + // no nulls, but a filter + (false, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + + group_indices + .iter() + .zip(data.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, new_value), filter_value)| { + if let Some(true) = filter_value { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value); + } + }) + } + // both null values and filters + (true, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + filter + .iter() + .zip(group_indices.iter()) + .zip(values.iter()) + .for_each(|((filter_value, &group_index), new_value)| { + if let Some(true) = filter_value { + if let Some(new_value) = new_value { + seen_values.set_bit(group_index, true); + value_fn(group_index, new_value) + } + } + }) + } + } + } + + /// Creates the final [`NullBuffer`] representing which + /// group_indices should have null values (because they never saw + /// any values) + /// + /// resets the internal state to empty + pub fn build(&mut self) -> NullBuffer { + NullBuffer::new(self.seen_values.finish()) + } +} + +/// This function is called to update the accumulator state per row +/// when the value is not needed (e.g. COUNT) +/// +/// `F`: Invoked like `value_fn(group_index) for all non null values +/// passing the filter. Note that no tracking is done for null inputs +/// or which groups have seen any values +/// +/// See [`NullState::accumulate`], for more details on other +/// arguments. +pub fn accumulate_indices( + group_indices: &[usize], + nulls: Option<&NullBuffer>, + opt_filter: Option<&BooleanArray>, + mut index_fn: F, +) where + F: FnMut(usize) + Send, +{ + match (nulls, opt_filter) { + (None, None) => { + for &group_index in group_indices.iter() { + index_fn(group_index) + } + } + (None, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than a single + // iterator. TODO file a ticket + let iter = group_indices.iter().zip(filter.iter()); + for (&group_index, filter_value) in iter { + if let Some(true) = filter_value { + index_fn(group_index) + } + } + } + (Some(valids), None) => { + assert_eq!(valids.len(), group_indices.len()); + // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum + // iterate over in chunks of 64 bits for more efficient null checking + let group_indices_chunks = group_indices.chunks_exact(64); + let bit_chunks = valids.inner().bit_chunks(); + + let group_indices_remainder = group_indices_chunks.remainder(); + + group_indices_chunks.zip(bit_chunks.iter()).for_each( + |(group_index_chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + group_index_chunk.iter().for_each(|&group_index| { + // valid bit was set, real vale + let is_valid = (mask & index_mask) != 0; + if is_valid { + index_fn(group_index); + } + index_mask <<= 1; + }) + }, + ); + + // handle any remaining bits (after the intial 64) + let remainder_bits = bit_chunks.remainder_bits(); + group_indices_remainder + .iter() + .enumerate() + .for_each(|(i, &group_index)| { + let is_valid = remainder_bits & (1 << i) != 0; + if is_valid { + index_fn(group_index) + } + }); + } + + (Some(valids), Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + assert_eq!(valids.len(), group_indices.len()); + // The performance with a filter could likely be improved by + // iterating over the filter in chunks, rather than using + // iterators. TODO file a ticket + filter + .iter() + .zip(group_indices.iter()) + .zip(valids.iter()) + .for_each(|((filter_value, &group_index), is_valid)| { + if let (Some(true), true) = (filter_value, is_valid) { + index_fn(group_index) + } + }) + } + } +} + +/// Ensures that `builder` contains a `BooleanBufferBuilder with at +/// least `total_num_groups`. +/// +/// All new entries are initialized to `default_value` +fn initialize_builder( + builder: &mut BooleanBufferBuilder, + total_num_groups: usize, + default_value: bool, +) -> &mut BooleanBufferBuilder { + if builder.len() < total_num_groups { + let new_groups = total_num_groups - builder.len(); + builder.append_n(new_groups, default_value); + } + builder +} + +#[cfg(test)] +mod test { + use super::*; + + use arrow_array::UInt32Array; + use arrow_buffer::BooleanBuffer; + use hashbrown::HashSet; + use rand::{rngs::ThreadRng, Rng}; + + #[test] + fn accumulate() { + let group_indices = (0..100).collect(); + let values = (0..100).map(|i| (i + 1) * 10).collect(); + let values_with_nulls = (0..100) + .map(|i| if i % 3 == 0 { None } else { Some((i + 1) * 10) }) + .collect(); + + // default to every fifth value being false, every even + // being null + let filter: BooleanArray = (0..100) + .map(|i| { + let is_even = i % 2 == 0; + let is_fifth = i % 5 == 0; + if is_even { + None + } else if is_fifth { + Some(false) + } else { + Some(true) + } + }) + .collect(); + + Fixture { + group_indices, + values, + values_with_nulls, + filter, + } + .run() + } + + #[test] + fn accumulate_fuzz() { + let mut rng = rand::thread_rng(); + for _ in 0..100 { + Fixture::new_random(&mut rng).run(); + } + } + + /// Values for testing (there are enough values to exercise the 64 bit chunks + struct Fixture { + /// 100..0 + group_indices: Vec, + + /// 10, 20, ... 1010 + values: Vec, + + /// same as values, but every third is null: + /// None, Some(20), Some(30), None ... + values_with_nulls: Vec>, + + /// filter (defaults to None) + filter: BooleanArray, + } + + impl Fixture { + fn new_random(rng: &mut ThreadRng) -> Self { + // Number of input values in a batch + let num_values: usize = rng.gen_range(1..200); + // number of distinct groups + let num_groups: usize = rng.gen_range(2..1000); + let max_group = num_groups - 1; + + let group_indices: Vec = (0..num_values) + .map(|_| rng.gen_range(0..max_group)) + .collect(); + + let values: Vec = (0..num_values).map(|_| rng.gen()).collect(); + + // 10% chance of false + // 10% change of null + // 80% chance of true + let filter: BooleanArray = (0..num_values) + .map(|_| { + let filter_value = rng.gen_range(0.0..1.0); + if filter_value < 0.1 { + Some(false) + } else if filter_value < 0.2 { + None + } else { + Some(true) + } + }) + .collect(); + + // random values with random number and location of nulls + // random null percentage + let null_pct: f32 = rng.gen_range(0.0..1.0); + let values_with_nulls: Vec> = (0..num_values) + .map(|_| { + let is_null = null_pct < rng.gen_range(0.0..1.0); + if is_null { + None + } else { + Some(rng.gen()) + } + }) + .collect(); + + Self { + group_indices, + values, + values_with_nulls, + filter, + } + } + + /// returns `Self::values` an Array + fn values_array(&self) -> UInt32Array { + UInt32Array::from(self.values.clone()) + } + + /// returns `Self::values_with_nulls` as an Array + fn values_with_nulls_array(&self) -> UInt32Array { + UInt32Array::from(self.values_with_nulls.clone()) + } + + /// Calls `NullState::accumulate` and `accumulate_indices` + /// with all combinations of nulls and filter values + fn run(&self) { + let total_num_groups = *self.group_indices.iter().max().unwrap() + 1; + + let group_indices = &self.group_indices; + let values_array = self.values_array(); + let values_with_nulls_array = self.values_with_nulls_array(); + let filter = &self.filter; + + // no null, no filters + Self::accumulate_test(group_indices, &values_array, None, total_num_groups); + + // nulls, no filters + Self::accumulate_test( + group_indices, + &values_with_nulls_array, + None, + total_num_groups, + ); + + // no nulls, filters + Self::accumulate_test( + group_indices, + &values_array, + Some(filter), + total_num_groups, + ); + + // nulls, filters + Self::accumulate_test( + group_indices, + &values_with_nulls_array, + Some(filter), + total_num_groups, + ); + } + + /// Calls `NullState::accumulate` and `accumulate_indices` to + /// ensure it generates the correct values. + /// + fn accumulate_test( + group_indices: &[usize], + values: &UInt32Array, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) { + Self::accumulate_values_test( + group_indices, + values, + opt_filter, + total_num_groups, + ); + Self::accumulate_indices_test(group_indices, values.nulls(), opt_filter); + + // Convert values into a boolean array (anything above the + // average is true, otherwise false) + let avg: usize = values.iter().filter_map(|v| v.map(|v| v as usize)).sum(); + let boolean_values: BooleanArray = + values.iter().map(|v| v.map(|v| v as usize > avg)).collect(); + Self::accumulate_boolean_test( + group_indices, + &boolean_values, + opt_filter, + total_num_groups, + ); + } + + /// This is effectively a different implementation of + /// accumulate that we compare with the above implementation + fn accumulate_values_test( + group_indices: &[usize], + values: &UInt32Array, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) { + let mut accumulated_values = vec![]; + let mut null_state = NullState::new(); + + null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, value| { + accumulated_values.push((group_index, value)); + }, + ); + + // Figure out the expected values + let mut expected_values = vec![]; + let mut mock = MockNullState::new(); + + match opt_filter { + None => group_indices.iter().zip(values.iter()).for_each( + |(&group_index, value)| { + if let Some(value) = value { + mock.saw_value(group_index); + expected_values.push((group_index, value)); + } + }, + ), + Some(filter) => { + group_indices + .iter() + .zip(values.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, value), is_included)| { + // if value passed filter + if let Some(true) = is_included { + if let Some(value) = value { + mock.saw_value(group_index); + expected_values.push((group_index, value)); + } + } + }); + } + } + + assert_eq!(accumulated_values, expected_values, + "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); + let seen_values = null_state.seen_values.finish_cloned(); + mock.validate_seen_values(&seen_values); + + // Validate the final buffer (one value per group) + let expected_null_buffer = mock.expected_null_buffer(total_num_groups); + + let null_buffer = null_state.build(); + + assert_eq!(null_buffer, expected_null_buffer); + } + + // Calls `accumulate_indices` + // and opt_filter and ensures it calls the right values + fn accumulate_indices_test( + group_indices: &[usize], + nulls: Option<&NullBuffer>, + opt_filter: Option<&BooleanArray>, + ) { + let mut accumulated_values = vec![]; + + accumulate_indices(group_indices, nulls, opt_filter, |group_index| { + accumulated_values.push(group_index); + }); + + // Figure out the expected values + let mut expected_values = vec![]; + + match (nulls, opt_filter) { + (None, None) => group_indices.iter().for_each(|&group_index| { + expected_values.push(group_index); + }), + (Some(nulls), None) => group_indices.iter().zip(nulls.iter()).for_each( + |(&group_index, is_valid)| { + if is_valid { + expected_values.push(group_index); + } + }, + ), + (None, Some(filter)) => group_indices.iter().zip(filter.iter()).for_each( + |(&group_index, is_included)| { + if let Some(true) = is_included { + expected_values.push(group_index); + } + }, + ), + (Some(nulls), Some(filter)) => { + group_indices + .iter() + .zip(nulls.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, is_valid), is_included)| { + // if value passed filter + if let (true, Some(true)) = (is_valid, is_included) { + expected_values.push(group_index); + } + }); + } + } + + assert_eq!(accumulated_values, expected_values, + "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); + } + + /// This is effectively a different implementation of + /// accumulate_boolean that we compare with the above implementation + fn accumulate_boolean_test( + group_indices: &[usize], + values: &BooleanArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) { + let mut accumulated_values = vec![]; + let mut null_state = NullState::new(); + + null_state.accumulate_boolean( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, value| { + accumulated_values.push((group_index, value)); + }, + ); + + // Figure out the expected values + let mut expected_values = vec![]; + let mut mock = MockNullState::new(); + + match opt_filter { + None => group_indices.iter().zip(values.iter()).for_each( + |(&group_index, value)| { + if let Some(value) = value { + mock.saw_value(group_index); + expected_values.push((group_index, value)); + } + }, + ), + Some(filter) => { + group_indices + .iter() + .zip(values.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, value), is_included)| { + // if value passed filter + if let Some(true) = is_included { + if let Some(value) = value { + mock.saw_value(group_index); + expected_values.push((group_index, value)); + } + } + }); + } + } + + assert_eq!(accumulated_values, expected_values, + "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); + + let seen_values = null_state.seen_values.finish_cloned(); + mock.validate_seen_values(&seen_values); + + // Validate the final buffer (one value per group) + let expected_null_buffer = mock.expected_null_buffer(total_num_groups); + + let null_buffer = null_state.build(); + + assert_eq!(null_buffer, expected_null_buffer); + } + } + + /// Parallel implementaiton of NullState to check expected values + #[derive(Debug, Default)] + struct MockNullState { + /// group indices that had values that passed the filter + seen_values: HashSet, + } + + impl MockNullState { + fn new() -> Self { + Default::default() + } + + fn saw_value(&mut self, group_index: usize) { + self.seen_values.insert(group_index); + } + + /// did this group index see any input? + fn expected_seen(&self, group_index: usize) -> bool { + self.seen_values.contains(&group_index) + } + + /// Validate that the seen_values matches self.seen_values + fn validate_seen_values(&self, seen_values: &BooleanBuffer) { + for (group_index, is_seen) in seen_values.iter().enumerate() { + let expected_seen = self.expected_seen(group_index); + assert_eq!( + expected_seen, is_seen, + "mismatch at for group {group_index}" + ); + } + } + + /// Create the expected null buffer based on if the input had nulls and a filter + fn expected_null_buffer(&self, total_num_groups: usize) -> NullBuffer { + (0..total_num_groups) + .map(|group_index| self.expected_seen(group_index)) + .collect() + } + } +} diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs new file mode 100644 index 000000000000..7b4c61fe7dc4 --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] + +use super::GroupsAccumulator; +use arrow::{ + array::{AsArray, UInt32Builder}, + compute, + datatypes::UInt32Type, +}; +use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; +use datafusion_common::{ + utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::Accumulator; + +/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] +/// +/// While [`Accumulator`] are simpler to implement and can support +/// more general calculations (like retractable window functions), +/// they are not as fast as a specialized `GroupsAccumulator`. This +/// interface bridges the gap so the group by operator only operates +/// in terms of [`Accumulator`]. +pub struct GroupsAccumulatorAdapter { + factory: Box Result> + Send>, + + /// state for each group, stored in group_index order + states: Vec, + + /// Current memory usage, in bytes. + /// + /// Note this is incrementally updated to avoid size() being a + /// bottleneck, which we saw in earlier implementations. + allocation_bytes: usize, +} + +struct AccumulatorState { + /// [`Accumulator`] that stores the per-group state + accumulator: Box, + + // scratch space: indexes in the input array that will be fed to + // this accumulator. Stores indexes as `u32` to match the arrow + // `take` kernel input. + indices: Vec, +} + +impl AccumulatorState { + fn new(accumulator: Box) -> Self { + Self { + accumulator, + indices: vec![], + } + } + + /// Returns the amount of memory taken by this structre and its accumulator + fn size(&self) -> usize { + self.accumulator.size() + + std::mem::size_of_val(self) + + std::mem::size_of::() * self.indices.capacity() + } +} + +impl GroupsAccumulatorAdapter { + /// Create a new adapter that will create a new [`Accumulator`] + /// for each group, using the specified factory function + pub fn new(factory: F) -> Self + where + F: Fn() -> Result> + Send + 'static, + { + let mut new_self = Self { + factory: Box::new(factory), + states: vec![], + allocation_bytes: 0, + }; + new_self.reset_allocation(); + new_self + } + + // Reset the allocation bytes to empty state + fn reset_allocation(&mut self) { + assert!(self.states.is_empty()); + self.allocation_bytes = std::mem::size_of::(); + } + + /// Ensure that self.accumulators has total_num_groups + fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { + // can't shrink + assert!(total_num_groups >= self.states.len()); + let vec_size_pre = + std::mem::size_of::() * self.states.capacity(); + + // instantiate new accumulators + let new_accumulators = total_num_groups - self.states.len(); + for _ in 0..new_accumulators { + let accumulator = (self.factory)()?; + let state = AccumulatorState::new(accumulator); + self.allocation_bytes += state.size(); + self.states.push(state); + } + + self.allocation_bytes += + std::mem::size_of::() * self.states.capacity(); + self.allocation_bytes -= vec_size_pre; + Ok(()) + } + + /// invokes f(accumulator, values) for each group that has values + /// in group_indices. + /// + /// This function first reorders the input and filter so that + /// values for each group_index are contiguous and then invokes f + /// on the contiguous ranges, to minimize per-row overhead + /// + /// ```text + /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ + /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐ + /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │ + /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘ + /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘ + /// + /// logical group values opt_filter logical group values opt_filter + /// + /// ``` + fn invoke_per_accumulator( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + + assert_eq!(values[0].len(), group_indices.len()); + + // figure out which input rows correspond to which groups. + // Note that self.state.indices starts empty for all groups + // (it is cleared out below) + for (idx, group_index) in group_indices.iter().enumerate() { + self.states[*group_index].indices.push(idx as u32); + } + + // groups_with_rows holds a list of group indexes that have + // any rows that need to be accumulated, stored in order of + // group_index + + let mut groups_with_rows = vec![]; + + // batch_indices holds indices into values, each group is contiguous + let mut batch_indices = UInt32Builder::with_capacity(0); + + // offsets[i] is index into batch_indices where the rows for + // group_index i starts + let mut offsets = vec![0]; + + let mut offset_so_far = 0; + for (group_index, state) in self.states.iter_mut().enumerate() { + let indices = &state.indices; + if indices.is_empty() { + continue; + } + + groups_with_rows.push(group_index); + batch_indices.append_slice(indices); + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // reorder the values and opt_filter by batch_indices so that + // all values for each group are contiguous, then invoke the + // accumulator once per group with values + let values = get_arrayref_at_indices(values, &batch_indices)?; + let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + + // invoke each accumulator with the appropriate rows, first + // pulling the input arguments for this group into their own + // RecordBatch(es) + let iter = groups_with_rows.iter().zip(offsets.windows(2)); + + for (&group_idx, offsets) in iter { + let state = &mut self.states[group_idx]; + let size_pre = state.size(); + + let values_to_accumulate = + slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?; + (f)(state.accumulator.as_mut(), &values_to_accumulate)?; + + // clear out the state so they are empty for next + // iteration + state.indices.clear(); + + self.allocation_bytes += state.size(); + self.allocation_bytes -= size_pre; + } + Ok(()) + } +} + +impl GroupsAccumulator for GroupsAccumulatorAdapter { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.update_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let states = std::mem::take(&mut self.states); + + let results: Vec = states + .into_iter() + .map(|state| state.accumulator.evaluate()) + .collect::>()?; + + let result = ScalarValue::iter_to_array(results); + self.reset_allocation(); + result + } + + fn state(&mut self) -> Result> { + let states = std::mem::take(&mut self.states); + + // each accumulator produces a potential vector of values + // which we need to form into columns + let mut results: Vec> = vec![]; + + for state in states { + let accumulator_state = state.accumulator.state()?; + results.resize_with(accumulator_state.len(), Vec::new); + for (idx, state_val) in accumulator_state.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + // create an array for each intermediate column + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + // double check each array has the same length (aka the + // accumulator was implemented correctly + if let Some(first_col) = arrays.get(0) { + for arr in &arrays { + assert_eq!(arr.len(), first_col.len()) + } + } + + self.reset_allocation(); + Ok(arrays) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.merge_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn size(&self) -> usize { + self.allocation_bytes + } +} + +fn get_filter_at_indices( + opt_filter: Option<&BooleanArray>, + indices: &PrimitiveArray, +) -> Result> { + opt_filter + .map(|filter| { + compute::take( + &filter, indices, None, // None: no index check + ) + }) + .transpose() + .map_err(DataFusionError::ArrowError) +} + +// Copied from physical-plan +pub(crate) fn slice_and_maybe_filter( + aggr_array: &[ArrayRef], + filter_opt: Option<&ArrayRef>, + offsets: &[usize], +) -> Result> { + let (offset, length) = (offsets[0], offsets[1] - offsets[0]); + let sliced_arrays: Vec = aggr_array + .iter() + .map(|array| array.slice(offset, length)) + .collect(); + + if let Some(f) = filter_opt { + let filter_array = f.slice(offset, length); + let filter_array = filter_array.as_boolean(); + + sliced_arrays + .iter() + .map(|array| { + compute::filter(array, filter_array).map_err(DataFusionError::ArrowError) + }) + .collect() + } else { + Ok(sliced_arrays) + } +} diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs new file mode 100644 index 000000000000..83ffc3717b44 --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::AsArray; +use arrow_array::{ArrayRef, BooleanArray}; +use arrow_buffer::BooleanBufferBuilder; +use datafusion_common::Result; + +use crate::GroupsAccumulator; + +use super::accumulate::NullState; + +/// An accumulator that implements a single operation over a +/// [`BooleanArray`] where the accumulated state is also boolean (such +/// as [`BitAndAssign`]) +/// +/// F: The function to apply to two elements. The first argument is +/// the existing value and should be updated with the second value +/// (e.g. [`BitAndAssign`] style). +/// +/// [`BitAndAssign`]: std::ops::BitAndAssign +#[derive(Debug)] +pub struct BooleanGroupsAccumulator +where + F: Fn(bool, bool) -> bool + Send + Sync, +{ + /// values per group + values: BooleanBufferBuilder, + + /// Track nulls in the input / filters + null_state: NullState, + + /// Function that computes the output + bool_fn: F, +} + +impl BooleanGroupsAccumulator +where + F: Fn(bool, bool) -> bool + Send + Sync, +{ + pub fn new(bitop_fn: F) -> Self { + Self { + values: BooleanBufferBuilder::new(0), + null_state: NullState::new(), + bool_fn: bitop_fn, + } + } +} + +impl GroupsAccumulator for BooleanGroupsAccumulator +where + F: Fn(bool, bool) -> bool + Send + Sync, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_boolean(); + + if self.values.len() < total_num_groups { + let new_groups = total_num_groups - self.values.len(); + self.values.append_n(new_groups, Default::default()); + } + + // NullState dispatches / handles tracking nulls and groups that saw no values + self.null_state.accumulate_boolean( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let current_value = self.values.get_bit(group_index); + let value = (self.bool_fn)(current_value, new_value); + self.values.set_bit(group_index, value); + }, + ); + + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let values = self.values.finish(); + let nulls = self.null_state.build(); + let values = BooleanArray::new(values, Some(nulls)); + Ok(Arc::new(values)) + } + + fn state(&mut self) -> Result> { + self.evaluate().map(|arr| vec![arr]) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + // update / merge are the same + self.update_batch(values, group_indices, opt_filter, total_num_groups) + } + + fn size(&self) -> usize { + // capacity is in bits, so convert to bytes + self.values.capacity() / 8 + self.null_state.size() + } +} diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs new file mode 100644 index 000000000000..49d62e7a9394 --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Vectorized [`GroupsAccumulator`] + +pub(crate) mod accumulate; +mod adapter; +pub use adapter::GroupsAccumulatorAdapter; + +pub(crate) mod bool_op; +pub(crate) mod prim_op; + +use arrow_array::{ArrayRef, BooleanArray}; +use datafusion_common::Result; + +/// `GroupAccumulator` implements a single aggregate (e.g. AVG) and +/// stores the state for *all* groups internally. +/// +/// Each group is assigned a `group_index` by the hash table and each +/// accumulator manages the specific state, one per group_index. +/// +/// group_indexes are contiguous (there aren't gaps), and thus it is +/// expected that each GroupAccumulator will use something like `Vec<..>` +/// to store the group states. +pub trait GroupsAccumulator: Send { + /// Updates the accumulator's state from its arguments, encoded as + /// a vector of [`ArrayRef`]s. + /// + /// * `values`: the input arguments to the accumulator + /// + /// * `group_indices`: To which groups do the rows in `values` + /// belong, group id) + /// + /// * `opt_filter`: if present, only update aggregate state using + /// `values[i]` if `opt_filter[i]` is true + /// + /// * `total_num_groups`: the number of groups (the largest + /// group_index is thus `total_num_groups - 1`). + /// + /// Note that subsequent calls to update_batch may have larger + /// total_num_groups as new groups are seen. + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// Returns the final aggregate value for each group as a single + /// `RecordBatch`, resetting the internal state. + /// + /// The rows returned *must* be in group_index order: The value + /// for group_index 0, followed by 1, etc. Any group_index that + /// did not have values, should be null. + /// + /// For example, a `SUM` accumulator maintains a running sum for + /// each group, and `evaluate` will produce that running sum as + /// its output for all groups, in group_index order + /// + /// The accumulator should free to release / reset it is internal + /// state after this call to the same as it was after being + /// initially created. + fn evaluate(&mut self) -> Result; + + /// Returns the intermediate aggregate state for this accumulator, + /// used for multi-phase grouping, resetting its internal state. + /// + /// The rows returned *must* be in group_index order: The value + /// for group_index 0, followed by 1, etc. Any group_index that + /// did not have values, should be null. + /// + /// For example, `AVG` might return two arrays: `SUM` and `COUNT` + /// but the `MIN` aggregate would just return a single array. + /// + /// Note more sophisticated internal state can be passed as + /// single `StructArray` rather than multiple arrays. + /// + /// The accumulator should free to release / reset its internal + /// state after this call to the same as it was after being + /// initially created. + fn state(&mut self) -> Result>; + + /// Merges intermediate state (the output from [`Self::state`]) + /// into this accumulator's values. + /// + /// For some aggregates (such as `SUM`), `merge_batch` is the same + /// as `update_batch`, but for some aggregates (such as `COUNT`, + /// where the partial counts must be summed) the operations + /// differ. See [`Self::state`] for more details on how state is + /// used and merged. + /// + /// * `values`: arrays produced from calling `state` previously to the accumulator + /// + /// Other arguments are the same as for [`Self::update_batch`]; + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// Amount of memory used to store the state of this accumulator, + /// in bytes. This function is called once per batch, so it should + /// be `O(n)` to compute, not `O(num_groups)` + fn size(&self) -> usize; +} diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs new file mode 100644 index 000000000000..860301078909 --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; +use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; +use arrow_schema::DataType; +use datafusion_common::Result; + +use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator}; + +use super::accumulate::NullState; + +/// An accumulator that implements a single operation over +/// [`ArrowPrimitiveType`] where the accumulated state is the same as +/// the input type (such as `Sum`) +/// +/// F: The function to apply to two elements. The first argument is +/// the existing value and should be updated with the second value +/// (e.g. [`BitAndAssign`] style). +/// +/// [`BitAndAssign`]: std::ops::BitAndAssign +#[derive(Debug)] +pub struct PrimitiveGroupsAccumulator +where + T: ArrowPrimitiveType + Send, + F: Fn(&mut T::Native, T::Native) + Send + Sync, +{ + /// values per group, stored as the native type + values: Vec, + + /// The output type (needed for Decimal precision and scale) + data_type: DataType, + + /// Track nulls in the input / filters + null_state: NullState, + + /// Function that computes the primitive result + prim_fn: F, +} + +impl PrimitiveGroupsAccumulator +where + T: ArrowPrimitiveType + Send, + F: Fn(&mut T::Native, T::Native) + Send + Sync, +{ + pub fn new(data_type: &DataType, prim_fn: F) -> Self { + Self { + values: vec![], + data_type: data_type.clone(), + null_state: NullState::new(), + prim_fn, + } + } +} + +impl GroupsAccumulator for PrimitiveGroupsAccumulator +where + T: ArrowPrimitiveType + Send, + F: Fn(&mut T::Native, T::Native) + Send + Sync, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_primitive::(); + + // update values + self.values.resize(total_num_groups, T::default_value()); + + // NullState dispatches / handles tracking nulls and groups that saw no values + self.null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let value = &mut self.values[group_index]; + (self.prim_fn)(value, new_value); + }, + ); + + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let values = std::mem::take(&mut self.values); + let nulls = self.null_state.build(); + let values = PrimitiveArray::::new(values.into(), Some(nulls)); // no copy + + adjust_output_array(&self.data_type, Arc::new(values)) + } + + fn state(&mut self) -> Result> { + self.evaluate().map(|arr| vec![arr]) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + // update / merge are the same + self.update_batch(values, group_indices, opt_filter, total_num_groups) + } + + fn size(&self) -> usize { + self.values.capacity() * std::mem::size_of::() + self.null_state.size() + } +} diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index e3c061dc1354..ebf317e6d0f3 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -21,9 +21,13 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use crate::{AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::compute; -use arrow::datatypes::{DataType, TimeUnit}; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, + Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; use arrow::{ array::{ ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, @@ -35,9 +39,16 @@ use arrow::{ }, datatypes::Field, }; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + Decimal128Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_array::{ArrowNumericType, PrimitiveArray}; use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::Accumulator; +use log::debug; use crate::aggregate::row_accumulator::{ is_row_accumulator_support_dtype, RowAccumulator, @@ -48,7 +59,9 @@ use arrow::array::Array; use arrow::array::Decimal128Array; use datafusion_row::accessor::RowAccessor; +use super::groups_accumulator::accumulate::NullState; use super::moving_min_max; +use super::utils::adjust_output_array; // Min/max aggregation can take Dictionary encode input but always produces unpacked // (aka non Dictionary) output. We need to adjust the output data type to reflect this. @@ -87,6 +100,15 @@ impl Max { } } +macro_rules! instantiate_min_max_accumulator { + ($SELF:expr, $NUMERICTYPE:ident, $MIN:expr) => {{ + Ok(Box::new(MinMaxGroupsPrimitiveAccumulator::< + $NUMERICTYPE, + $MIN, + >::new(&$SELF.data_type))) + }}; +} + impl AggregateExpr for Max { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -125,6 +147,28 @@ impl AggregateExpr for Max { is_row_accumulator_support_dtype(&self.data_type) } + fn groups_accumulator_supported(&self) -> bool { + use DataType::*; + matches!( + self.data_type, + Int8 | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float32 + | Float64 + | Decimal128(_, _) + | Date32 + | Date64 + | Time32(_) + | Time64(_) + | Timestamp(_, _) + ) + } + fn create_row_accumulator( &self, start_index: usize, @@ -135,6 +179,66 @@ impl AggregateExpr for Max { ))) } + fn create_groups_accumulator(&self) -> Result> { + use DataType::*; + use TimeUnit::*; + + match self.data_type { + Int8 => instantiate_min_max_accumulator!(self, Int8Type, false), + Int16 => instantiate_min_max_accumulator!(self, Int16Type, false), + Int32 => instantiate_min_max_accumulator!(self, Int32Type, false), + Int64 => instantiate_min_max_accumulator!(self, Int64Type, false), + UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, false), + UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, false), + UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, false), + UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, false), + Float32 => { + instantiate_min_max_accumulator!(self, Float32Type, false) + } + Float64 => { + instantiate_min_max_accumulator!(self, Float64Type, false) + } + Date32 => instantiate_min_max_accumulator!(self, Date32Type, false), + Date64 => instantiate_min_max_accumulator!(self, Date64Type, false), + Time32(Second) => { + instantiate_min_max_accumulator!(self, Time32SecondType, false) + } + Time32(Millisecond) => { + instantiate_min_max_accumulator!(self, Time32MillisecondType, false) + } + Time64(Microsecond) => { + instantiate_min_max_accumulator!(self, Time64MicrosecondType, false) + } + Time64(Nanosecond) => { + instantiate_min_max_accumulator!(self, Time64NanosecondType, false) + } + Timestamp(Second, _) => { + instantiate_min_max_accumulator!(self, TimestampSecondType, false) + } + Timestamp(Millisecond, _) => { + instantiate_min_max_accumulator!(self, TimestampMillisecondType, false) + } + Timestamp(Microsecond, _) => { + instantiate_min_max_accumulator!(self, TimestampMicrosecondType, false) + } + Timestamp(Nanosecond, _) => { + instantiate_min_max_accumulator!(self, TimestampNanosecondType, false) + } + + // It would be nice to have a fast implementation for Strings as well + // https://github.com/apache/arrow-datafusion/issues/6906 + Decimal128(_, _) => Ok(Box::new(MinMaxGroupsPrimitiveAccumulator::< + Decimal128Type, + false, + >::new(&self.data_type))), + // This is only reached if groups_accumulator_supported is out of sync + _ => Err(DataFusionError::Internal(format!( + "MinMaxGroupsPrimitiveAccumulator not supported for max({})", + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -835,6 +939,84 @@ impl AggregateExpr for Min { ))) } + fn groups_accumulator_supported(&self) -> bool { + use DataType::*; + matches!( + self.data_type, + Int8 | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float32 + | Float64 + | Decimal128(_, _) + | Date32 + | Date64 + | Time32(_) + | Time64(_) + | Timestamp(_, _) + ) + } + + fn create_groups_accumulator(&self) -> Result> { + use DataType::*; + use TimeUnit::*; + match self.data_type { + Int8 => instantiate_min_max_accumulator!(self, Int8Type, true), + Int16 => instantiate_min_max_accumulator!(self, Int16Type, true), + Int32 => instantiate_min_max_accumulator!(self, Int32Type, true), + Int64 => instantiate_min_max_accumulator!(self, Int64Type, true), + UInt8 => instantiate_min_max_accumulator!(self, UInt8Type, true), + UInt16 => instantiate_min_max_accumulator!(self, UInt16Type, true), + UInt32 => instantiate_min_max_accumulator!(self, UInt32Type, true), + UInt64 => instantiate_min_max_accumulator!(self, UInt64Type, true), + Float32 => { + instantiate_min_max_accumulator!(self, Float32Type, true) + } + Float64 => { + instantiate_min_max_accumulator!(self, Float64Type, true) + } + Date32 => instantiate_min_max_accumulator!(self, Date32Type, true), + Date64 => instantiate_min_max_accumulator!(self, Date64Type, true), + Time32(Second) => { + instantiate_min_max_accumulator!(self, Time32SecondType, true) + } + Time32(Millisecond) => { + instantiate_min_max_accumulator!(self, Time32MillisecondType, true) + } + Time64(Microsecond) => { + instantiate_min_max_accumulator!(self, Time64MicrosecondType, true) + } + Time64(Nanosecond) => { + instantiate_min_max_accumulator!(self, Time64NanosecondType, true) + } + Timestamp(Second, _) => { + instantiate_min_max_accumulator!(self, TimestampSecondType, true) + } + Timestamp(Millisecond, _) => { + instantiate_min_max_accumulator!(self, TimestampMillisecondType, true) + } + Timestamp(Microsecond, _) => { + instantiate_min_max_accumulator!(self, TimestampMicrosecondType, true) + } + Timestamp(Nanosecond, _) => { + instantiate_min_max_accumulator!(self, TimestampNanosecondType, true) + } + Decimal128(_, _) => Ok(Box::new(MinMaxGroupsPrimitiveAccumulator::< + Decimal128Type, + true, + >::new(&self.data_type))), + // This is only reached if groups_accumulator_supported is out of sync + _ => Err(DataFusionError::Internal(format!( + "MinMaxGroupsPrimitiveAccumulator not supported for min({})", + self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -1022,6 +1204,232 @@ impl RowAccumulator for MinRowAccumulator { } } +trait MinMax { + fn min() -> Self; + fn max() -> Self; +} + +impl MinMax for u8 { + fn min() -> Self { + u8::MIN + } + fn max() -> Self { + u8::MAX + } +} +impl MinMax for i8 { + fn min() -> Self { + i8::MIN + } + fn max() -> Self { + i8::MAX + } +} +impl MinMax for u16 { + fn min() -> Self { + u16::MIN + } + fn max() -> Self { + u16::MAX + } +} +impl MinMax for i16 { + fn min() -> Self { + i16::MIN + } + fn max() -> Self { + i16::MAX + } +} +impl MinMax for u32 { + fn min() -> Self { + u32::MIN + } + fn max() -> Self { + u32::MAX + } +} +impl MinMax for i32 { + fn min() -> Self { + i32::MIN + } + fn max() -> Self { + i32::MAX + } +} +impl MinMax for i64 { + fn min() -> Self { + i64::MIN + } + fn max() -> Self { + i64::MAX + } +} +impl MinMax for u64 { + fn min() -> Self { + u64::MIN + } + fn max() -> Self { + u64::MAX + } +} +impl MinMax for f32 { + fn min() -> Self { + f32::MIN + } + fn max() -> Self { + f32::MAX + } +} +impl MinMax for f64 { + fn min() -> Self { + f64::MIN + } + fn max() -> Self { + f64::MAX + } +} +impl MinMax for i128 { + fn min() -> Self { + i128::MIN + } + fn max() -> Self { + i128::MAX + } +} + +/// An accumulator to compute the min or max of a [`PrimitiveArray`]. +/// +/// Stores values as native/primitive type +/// +/// Note this doesn't use [`PrimitiveGroupsAccumulator`] because it +/// needs to control the default accumulator value (which is not +/// `default::Default()`) +/// +/// [`PrimitiveGroupsAccumulator`]: crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator +#[derive(Debug)] +struct MinMaxGroupsPrimitiveAccumulator +where + T: ArrowNumericType + Send, + T::Native: MinMax, +{ + /// Min/max per group, stored as the native type + min_max: Vec, + + /// Track nulls in the input / filters + null_state: NullState, + + /// The output datatype (needed for decimal precision/scale) + data_type: DataType, +} + +impl MinMaxGroupsPrimitiveAccumulator +where + T: ArrowNumericType + Send, + T::Native: MinMax, +{ + pub fn new(data_type: &DataType) -> Self { + debug!( + "MinMaxGroupsPrimitiveAccumulator ({}, {})", + std::any::type_name::(), + MIN, + ); + + Self { + min_max: vec![], + null_state: NullState::new(), + data_type: data_type.clone(), + } + } +} + +impl GroupsAccumulator for MinMaxGroupsPrimitiveAccumulator +where + T: ArrowNumericType + Send, + T::Native: MinMax, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_primitive::(); + + self.min_max.resize( + total_num_groups, + if MIN { + T::Native::max() + } else { + T::Native::min() + }, + ); + + // NullState dispatches / handles tracking nulls and groups that saw no values + self.null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let val = &mut self.min_max[group_index]; + match MIN { + true => { + if new_value < *val { + *val = new_value; + } + } + false => { + if new_value > *val { + *val = new_value; + } + } + } + }, + ); + + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + Self::update_batch(self, values, group_indices, opt_filter, total_num_groups) + } + + fn evaluate(&mut self) -> Result { + let min_max = std::mem::take(&mut self.min_max); + let nulls = self.null_state.build(); + + let min_max = PrimitiveArray::::new(min_max.into(), Some(nulls)); // no copy + let min_max = adjust_output_array(&self.data_type, Arc::new(min_max))?; + + Ok(Arc::new(min_max)) + } + + // return arrays for min/max values + fn state(&mut self) -> Result> { + let nulls = self.null_state.build(); + + let min_max = std::mem::take(&mut self.min_max); + let min_max = PrimitiveArray::::new(min_max.into(), Some(nulls)); // zero copy + + let min_max = adjust_output_array(&self.data_type, Arc::new(min_max))?; + + Ok(vec![min_max]) + } + + fn size(&self) -> usize { + self.min_max.capacity() * std::mem::size_of::() + self.null_state.size() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 9be6d5e1ba12..21efb3c2f91b 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -25,6 +25,8 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; +use self::groups_accumulator::GroupsAccumulator; + pub(crate) mod approx_distinct; pub(crate) mod approx_median; pub(crate) mod approx_percentile_cont; @@ -45,6 +47,7 @@ pub(crate) mod median; #[macro_use] pub(crate) mod min_max; pub mod build_in; +pub(crate) mod groups_accumulator; mod hyperloglog; pub mod moving_min_max; pub mod row_accumulator; @@ -118,6 +121,24 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { ))) } + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + fn groups_accumulator_supported(&self) -> bool { + false + } + + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + fn create_groups_accumulator(&self) -> Result> { + Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator hasn't been implemented for {self:?} yet" + ))) + } + /// Construct an expression that calculates the aggregate in reverse. /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). /// For aggregates that do not support calculation in reverse, diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 29996eaf5cdf..5f00e594fef5 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -19,10 +19,20 @@ use std::any::Any; use std::convert::TryFrom; +use std::ops::AddAssign; use std::sync::Arc; -use crate::{AggregateExpr, PhysicalExpr}; +use super::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use crate::aggregate::row_accumulator::{ + is_row_accumulator_support_dtype, RowAccumulator, +}; +use crate::aggregate::utils::down_cast_any_ref; +use crate::expressions::format_state_name; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use arrow::array::Array; +use arrow::array::Decimal128Array; use arrow::compute; +use arrow::compute::kernels::cast; use arrow::datatypes::DataType; use arrow::{ array::{ @@ -31,17 +41,12 @@ use arrow::{ }, datatypes::Field, }; +use arrow_array::types::{ + Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type, + UInt64Type, +}; use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; - -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use arrow::array::Array; -use arrow::array::Decimal128Array; -use arrow::compute::cast; use datafusion_row::accessor::RowAccessor; /// SUM aggregate expression @@ -86,6 +91,19 @@ impl Sum { } } +/// Creates a [`PrimitiveGroupsAccumulator`] with the specified +/// [`ArrowPrimitiveType`] which applies `$FN` to each element +/// +/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType +macro_rules! instantiate_primitive_accumulator { + ($SELF:expr, $PRIMTYPE:ident, $FN:expr) => {{ + Ok(Box::new(PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new( + &$SELF.data_type, + $FN, + ))) + }}; +} + impl AggregateExpr for Sum { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -124,6 +142,10 @@ impl AggregateExpr for Sum { is_row_accumulator_support_dtype(&self.data_type) } + fn groups_accumulator_supported(&self) -> bool { + true + } + fn create_row_accumulator( &self, start_index: usize, @@ -134,6 +156,44 @@ impl AggregateExpr for Sum { ))) } + fn create_groups_accumulator(&self) -> Result> { + // instantiate specialized accumulator + match self.data_type { + DataType::UInt64 => { + instantiate_primitive_accumulator!(self, UInt64Type, |x, y| x + .add_assign(y)) + } + DataType::Int64 => { + instantiate_primitive_accumulator!(self, Int64Type, |x, y| x + .add_assign(y)) + } + DataType::UInt32 => { + instantiate_primitive_accumulator!(self, UInt32Type, |x, y| x + .add_assign(y)) + } + DataType::Int32 => { + instantiate_primitive_accumulator!(self, Int32Type, |x, y| x + .add_assign(y)) + } + DataType::Float32 => { + instantiate_primitive_accumulator!(self, Float32Type, |x, y| x + .add_assign(y)) + } + DataType::Float64 => { + instantiate_primitive_accumulator!(self, Float64Type, |x, y| x + .add_assign(y)) + } + DataType::Decimal128(_, _) => { + instantiate_primitive_accumulator!(self, Decimal128Type, |x, y| x + .add_assign(y)) + } + _ => Err(DataFusionError::NotImplemented(format!( + "GroupsAccumulator not supported for {}: {}", + self.name, self.data_type + ))), + } + } + fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index dbbe0c3f92c0..aada51c9efcd 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -20,6 +20,8 @@ use crate::{AggregateExpr, PhysicalSortExpr}; use arrow::array::ArrayRef; use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION}; +use arrow_array::cast::AsArray; +use arrow_array::types::Decimal128Type; use arrow_schema::{DataType, Field}; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; @@ -145,6 +147,28 @@ pub fn calculate_result_decimal_for_avg( } } +/// Adjust array type metadata if needed +/// +/// Since `Decimal128Arrays` created from `Vec` have +/// default precision and scale, this function adjusts the output to +/// match `data_type`, if necessary +pub fn adjust_output_array( + data_type: &DataType, + array: ArrayRef, +) -> Result { + let array = match data_type { + DataType::Decimal128(p, s) => Arc::new( + array + .as_primitive::() + .clone() + .with_precision_and_scale(*p, *s)?, + ), + // no adjustment needed for other arrays + _ => array, + }; + Ok(array) +} + /// Downcast a `Box` or `Arc` /// and return the inner trait object as [`Any`](std::any::Any) so /// that it can be downcast to a specific implementation. diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 1484cf7ff52c..b695ee169eed 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -47,7 +47,9 @@ pub mod var_provider; pub mod window; // reexport this to maintain compatibility with anything that used from_slice previously +pub use aggregate::groups_accumulator::{GroupsAccumulator, GroupsAccumulatorAdapter}; pub use aggregate::AggregateExpr; + pub use equivalence::{ project_equivalence_properties, project_ordering_equivalence_properties, EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,