diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 6ad0f94e032a7..a1d77a2d8849e 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -25,9 +25,6 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::sum; use crate::aggregate::sum::sum_batch; use crate::aggregate::utils::calculate_result_decimal_for_avg; @@ -46,7 +43,6 @@ use arrow_array::{ use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; use super::groups_accumulator::EmitTo; use super::utils::{adjust_output_array, Decimal128Averager}; @@ -139,21 +135,6 @@ impl AggregateExpr for Avg { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.sum_data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(AvgRowAccumulator::new( - start_index, - &self.sum_data_type, - &self.rt_data_type, - ))) - } - fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -321,121 +302,6 @@ impl Accumulator for AvgAccumulator { } } -#[derive(Debug)] -struct AvgRowAccumulator { - state_index: usize, - sum_datatype: DataType, - return_data_type: DataType, -} - -impl AvgRowAccumulator { - pub fn new( - start_index: usize, - sum_datatype: &DataType, - return_data_type: &DataType, - ) -> Self { - Self { - state_index: start_index, - sum_datatype: sum_datatype.clone(), - return_data_type: return_data_type.clone(), - } - } -} - -impl RowAccumulator for AvgRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - // count - let delta = (values.len() - values.null_count()) as u64; - accessor.add_u64(self.state_index(), delta); - - // sum - sum::add_to_row( - self.state_index() + 1, - accessor, - &sum::sum_batch(values, &self.sum_datatype)?, - ) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - sum::update_avg_to_row(self.state_index(), accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - sum::update_avg_to_row(self.state_index(), accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let counts = downcast_value!(states[0], UInt64Array); - // count - let delta = compute::sum(counts).unwrap_or(0); - accessor.add_u64(self.state_index(), delta); - - // sum - let difference = sum::sum_batch(&states[1], &self.sum_datatype)?; - sum::add_to_row(self.state_index() + 1, accessor, &difference) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - match self.sum_datatype { - DataType::Decimal128(p, s) => { - match accessor.get_u64_opt(self.state_index()) { - None => Ok(ScalarValue::Decimal128(None, p, s)), - Some(0) => Ok(ScalarValue::Decimal128(None, p, s)), - Some(n) => { - // now the sum_type and return type is not the same, need to convert the sum type to return type - accessor.get_i128_opt(self.state_index() + 1).map_or_else( - || Ok(ScalarValue::Decimal128(None, p, s)), - |f| { - calculate_result_decimal_for_avg( - f, - n as i128, - s, - &self.return_data_type, - ) - }, - ) - } - } - } - DataType::Float64 => Ok(match accessor.get_u64_opt(self.state_index()) { - None => ScalarValue::Float64(None), - Some(0) => ScalarValue::Float64(None), - Some(n) => ScalarValue::Float64( - accessor - .get_f64_opt(self.state_index() + 1) - .map(|f| f / n as f64), - ), - }), - _ => Err(DataFusionError::Internal( - "Sum should be f64 or decimal128 on average".to_string(), - )), - } - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.state_index - } -} - /// An accumulator to compute the average of `[PrimitiveArray]`. /// Stores values as native types, and does overflow checking /// diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 4c7733520e0c6..c9e3e73357123 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -39,14 +39,10 @@ use datafusion_expr::Accumulator; use std::collections::HashSet; use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::compute::{bit_and, bit_or, bit_xor}; -use datafusion_row::accessor::RowAccessor; /// Creates a [`PrimitiveGroupsAccumulator`] with the specified /// [`ArrowPrimitiveType`] that initailizes each accumulator to $START @@ -123,81 +119,7 @@ fn bit_xor_batch(values: &ArrayRef) -> Result { bit_and_or_xor_batch!(values, bit_xor) } -// bit_and/bit_or/bit_xor of two scalar values. -macro_rules! typed_bit_and_or_xor_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - -macro_rules! bit_and_or_xor_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::UInt64(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u64, $OP) - } - ScalarValue::UInt32(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u32, $OP) - } - ScalarValue::UInt16(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u16, $OP) - } - ScalarValue::UInt8(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u8, $OP) - } - ScalarValue::Int64(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i64, $OP) - } - ScalarValue::Int32(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i32, $OP) - } - ScalarValue::Int16(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i16, $OP) - } - ScalarValue::Int8(rhs) => { - typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i8, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "BIT AND/BIT OR/BIT XOR is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} - -pub fn bit_and_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitand) -} - -pub fn bit_or_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitor) -} -pub fn bit_xor_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bit_and_or_xor_v2!(index, accessor, s, bitxor) -} /// BIT_AND aggregate expression #[derive(Debug, Clone)] @@ -258,19 +180,6 @@ impl AggregateExpr for BitAnd { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitAndRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { true @@ -378,64 +287,6 @@ impl Accumulator for BitAndAccumulator { } } -#[derive(Debug)] -struct BitAndRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitAndRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitAndRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_and_batch(values)?; - bit_and_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_and_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_and_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BIT_OR aggregate expression #[derive(Debug, Clone)] pub struct BitOr { @@ -495,19 +346,6 @@ impl AggregateExpr for BitOr { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitOrRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { true @@ -608,65 +446,6 @@ impl Accumulator for BitOrAccumulator { } } -#[derive(Debug)] -struct BitOrRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitOrRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitOrRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_or_batch(values)?; - bit_or_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_or_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_or_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BIT_XOR aggregate expression #[derive(Debug, Clone)] pub struct BitXor { @@ -726,19 +505,6 @@ impl AggregateExpr for BitXor { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BitXorRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { true @@ -839,65 +605,6 @@ impl Accumulator for BitXorAccumulator { } } -#[derive(Debug)] -struct BitXorRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BitXorRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BitXorRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bit_xor_batch(values)?; - bit_xor_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bit_xor_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bit_xor_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// Expression for a BIT_XOR(DISTINCT) aggregation. #[derive(Debug, Clone)] pub struct DistinctBitXor { diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs index 6107b0972c81d..57a0ca2373b50 100644 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs @@ -29,14 +29,10 @@ use std::any::Any; use std::sync::Arc; use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::compute::{bool_and, bool_or}; -use datafusion_row::accessor::RowAccessor; // returns the new value after bool_and/bool_or with the new values, taking nullability into account macro_rules! typed_bool_and_or_batch { @@ -73,52 +69,8 @@ fn bool_or_batch(values: &ArrayRef) -> Result { bool_and_or_batch!(values, bool_or) } -// bool_and/bool_or of two scalar values. -macro_rules! typed_bool_and_or_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - -macro_rules! bool_and_or_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::Boolean(rhs) => { - typed_bool_and_or_v2!($INDEX, $ACC, rhs, bool, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "BOOL AND/BOOL OR is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} -pub fn bool_and_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bool_and_or_v2!(index, accessor, s, bitand) -} -pub fn bool_or_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - bool_and_or_v2!(index, accessor, s, bitor) -} /// BOOL_AND aggregate expression #[derive(Debug, Clone)] @@ -179,19 +131,6 @@ impl AggregateExpr for BoolAnd { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BoolAndRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { true @@ -267,64 +206,6 @@ impl Accumulator for BoolAndAccumulator { } } -#[derive(Debug)] -struct BoolAndRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BoolAndRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BoolAndRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bool_and_batch(values)?; - bool_and_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bool_and_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bool_and_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// BOOL_OR aggregate expression #[derive(Debug, Clone)] pub struct BoolOr { @@ -384,19 +265,6 @@ impl AggregateExpr for BoolOr { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(BoolOrRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { true @@ -472,65 +340,6 @@ impl Accumulator for BoolOrAccumulator { } } -#[derive(Debug)] -struct BoolOrRowAccumulator { - index: usize, - datatype: DataType, -} - -impl BoolOrRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for BoolOrRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &bool_or_batch(values)?; - bool_or_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - bool_or_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - bool_or_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 60e15a673a0c2..3743b36b4d2bf 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -22,7 +22,6 @@ use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; -use crate::aggregate::row_accumulator::RowAccumulator; use crate::aggregate::utils::down_cast_any_ref; use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::array::{Array, Int64Array}; @@ -36,7 +35,6 @@ use arrow_buffer::BooleanBuffer; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; use crate::expressions::format_state_name; @@ -247,9 +245,6 @@ impl AggregateExpr for Count { &self.name } - fn row_accumulator_supported(&self) -> bool { - true - } fn groups_accumulator_supported(&self) -> bool { // groups accumulator only supports `COUNT(c1)`, not @@ -257,13 +252,6 @@ impl AggregateExpr for Count { self.exprs.len() == 1 } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(CountRowAccumulator::new(start_index))) - } - fn reverse_expr(&self) -> Option> { Some(Arc::new(self.clone())) } @@ -348,79 +336,6 @@ impl Accumulator for CountAccumulator { } } -#[derive(Debug)] -struct CountRowAccumulator { - state_index: usize, -} - -impl CountRowAccumulator { - pub fn new(index: usize) -> Self { - Self { state_index: index } - } -} - -impl RowAccumulator for CountRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let array = &values[0]; - let delta = (array.len() - null_count_for_multiple_cols(values)) as u64; - accessor.add_u64(self.state_index, delta); - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - if !values.iter().any(|s| matches!(s, ScalarValue::Null)) { - accessor.add_u64(self.state_index, 1) - } - Ok(()) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - match value { - ScalarValue::Null => { - // do not update the accumulator - } - _ => accessor.add_u64(self.state_index, 1), - } - Ok(()) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let counts = downcast_value!(states[0], Int64Array); - let delta = &compute::sum(counts); - if let Some(d) = delta { - accessor.add_i64(self.state_index, *d); - } - Ok(()) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(ScalarValue::Int64(Some( - accessor.get_u64_opt(self.state_index()).unwrap_or(0) as i64, - ))) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.state_index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index cc230c174b4f9..b9d57673d87a0 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -48,14 +48,10 @@ use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::Accumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; -use datafusion_row::accessor::RowAccessor; use super::moving_min_max; @@ -172,9 +168,6 @@ impl AggregateExpr for Max { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } fn groups_accumulator_supported(&self) -> bool { use DataType::*; @@ -198,16 +191,6 @@ impl AggregateExpr for Max { ) } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(MaxRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn create_groups_accumulator(&self) -> Result> { use DataType::*; use TimeUnit::*; @@ -457,18 +440,6 @@ macro_rules! typed_min_max { }}; } -// min/max of two non-string scalar values. -macro_rules! typed_min_max_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{ - paste::item! { - match $SCALAR { - None => {} - Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE) - } - } - }}; -} - // min/max of two scalar string values. macro_rules! typed_min_max_string { ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ @@ -666,77 +637,16 @@ macro_rules! min_max { }}; } -// min/max of two scalar values of the same type -macro_rules! min_max_v2 { - ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{ - Ok(match $SCALAR { - ScalarValue::Boolean(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, bool, $OP) - } - ScalarValue::Float64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, f64, $OP) - } - ScalarValue::Float32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, f32, $OP) - } - ScalarValue::UInt64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u64, $OP) - } - ScalarValue::UInt32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u32, $OP) - } - ScalarValue::UInt16(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u16, $OP) - } - ScalarValue::UInt8(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, u8, $OP) - } - ScalarValue::Int64(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i64, $OP) - } - ScalarValue::Int32(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i32, $OP) - } - ScalarValue::Int16(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i16, $OP) - } - ScalarValue::Int8(rhs) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i8, $OP) - } - ScalarValue::Decimal128(rhs, ..) => { - typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP) - } - ScalarValue::Null => { - // do nothing - } - e => { - return Err(DataFusionError::Internal(format!( - "MIN/MAX is not expected to receive scalars of incompatible types {:?}", - e - ))) - } - }) - }}; -} - /// the minimum of two scalar values pub fn min(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { min_max!(lhs, rhs, min) } -pub fn min_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Result<()> { - min_max_v2!(index, accessor, s, min) -} - /// the maximum of two scalar values pub fn max(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { min_max!(lhs, rhs, max) } -pub fn max_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Result<()> { - min_max_v2!(index, accessor, s, max) -} - /// An accumulator to compute the maximum value #[derive(Debug)] pub struct MaxAccumulator { @@ -837,64 +747,6 @@ impl Accumulator for SlidingMaxAccumulator { } } -#[derive(Debug)] -struct MaxRowAccumulator { - index: usize, - data_type: DataType, -} - -impl MaxRowAccumulator { - pub fn new(index: usize, data_type: DataType) -> Self { - Self { index, data_type } - } -} - -impl RowAccumulator for MaxRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &max_batch(values)?; - max_row(self.index, accessor, delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - max_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - max_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.data_type, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - /// MIN aggregate expression #[derive(Debug, Clone)] pub struct Min { @@ -954,19 +806,6 @@ impl AggregateExpr for Min { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } - - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(MinRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } fn groups_accumulator_supported(&self) -> bool { use DataType::*; @@ -1173,65 +1012,6 @@ impl Accumulator for SlidingMinAccumulator { } } -#[derive(Debug)] -struct MinRowAccumulator { - index: usize, - data_type: DataType, -} - -impl MinRowAccumulator { - pub fn new(index: usize, data_type: DataType) -> Self { - Self { index, data_type } - } -} - -impl RowAccumulator for MinRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = &min_batch(values)?; - min_row(self.index, accessor, delta)?; - Ok(()) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - min_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - min_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.data_type, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 21efb3c2f91bc..5490b875763a3 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregate::row_accumulator::RowAccumulator; use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::Field; @@ -50,7 +49,6 @@ pub mod build_in; pub(crate) mod groups_accumulator; mod hyperloglog; pub mod moving_min_max; -pub mod row_accumulator; pub(crate) mod stats; pub(crate) mod stddev; pub(crate) mod sum; @@ -102,25 +100,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { "AggregateExpr: default name" } - /// If the aggregate expression is supported by row format - fn row_accumulator_supported(&self) -> bool { - false - } - - /// RowAccumulator to access/update row-based aggregation state in-place. - /// Currently, row accumulator only supports states of fixed-sized type. - /// - /// We recommend implementing `RowAccumulator` along with the standard `Accumulator`, - /// when its state is of fixed size, as RowAccumulator is more memory efficient and CPU-friendly. - fn create_row_accumulator( - &self, - _start_index: usize, - ) -> Result> { - Err(DataFusionError::NotImplemented(format!( - "RowAccumulator hasn't been implemented for {self:?} yet" - ))) - } - /// If the aggregate expression has a specialized /// [`GroupsAccumulator`] implementation. If this returns true, /// `[Self::create_groups_accumulator`] will be called. diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs deleted file mode 100644 index e5282629220f7..0000000000000 --- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Accumulator over row format - -use arrow::array::ArrayRef; -use arrow_schema::DataType; -use datafusion_common::{Result, ScalarValue}; -use datafusion_row::accessor::RowAccessor; -use std::fmt::Debug; - -/// Row-based accumulator where the internal aggregate state(s) are stored using row format. -/// -/// Unlike the [`datafusion_expr::Accumulator`], the [`RowAccumulator`] does not store the state internally. -/// Instead, it knows how to access/update the state stored in a row via the the provided accessor and -/// its state's starting field index in the row. -/// -/// For example, we are evaluating `SELECT a, sum(b), avg(c), count(d) from GROUP BY a;`, we would have one row used as -/// aggregation state for each distinct `a` value, the index of the first and the only state of `sum(b)` would be 0, -/// the index of the first state of `avg(c)` would be 1, and the index of the first and only state of `cound(d)` would be 3: -/// -/// sum(b) state_index = 0 count(d) state_index = 3 -/// | | -/// v v -/// +--------+----------+--------+----------+ -/// | sum(b) | count(c) | sum(c) | count(d) | -/// +--------+----------+--------+----------+ -/// ^ -/// | -/// avg(c) state_index = 1 -/// -pub trait RowAccumulator: Send + Sync + Debug { - /// updates the accumulator's state from a vector of arrays. - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a vector of Scalar value. - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a Scalar value. - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// updates the accumulator's state from a vector of states. - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()>; - - /// returns its value based on its current state. - fn evaluate(&self, accessor: &RowAccessor) -> Result; - - /// State's starting field index in the row. - fn state_index(&self) -> usize; -} - -/// Returns if `data_type` is supported with `RowAccumulator` -pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool { - matches!( - data_type, - DataType::Boolean - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - | DataType::Decimal128(_, _) - ) -} diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 5f00e594fef57..db649bf35b4cb 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -23,9 +23,6 @@ use std::ops::AddAssign; use std::sync::Arc; use super::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::row_accumulator::{ - is_row_accumulator_support_dtype, RowAccumulator, -}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; @@ -47,7 +44,6 @@ use arrow_array::types::{ }; use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; -use datafusion_row::accessor::RowAccessor; /// SUM aggregate expression #[derive(Debug, Clone)] @@ -138,24 +134,11 @@ impl AggregateExpr for Sum { &self.name } - fn row_accumulator_supported(&self) -> bool { - is_row_accumulator_support_dtype(&self.data_type) - } fn groups_accumulator_supported(&self) -> bool { true } - fn create_row_accumulator( - &self, - start_index: usize, - ) -> Result> { - Ok(Box::new(SumRowAccumulator::new( - start_index, - self.data_type.clone(), - ))) - } - fn create_groups_accumulator(&self) -> Result> { // instantiate specialized accumulator match self.data_type { @@ -299,100 +282,7 @@ pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result {{ - paste::item! { - if let Some(v) = $DELTA { - $ACC.[]($INDEX, *v) - } - } - }}; -} - -macro_rules! avg_row { - ($INDEX:ident, $ACC:ident, $DELTA:expr, $TYPE:ident) => {{ - paste::item! { - if let Some(v) = $DELTA { - $ACC.add_u64($INDEX, 1); - $ACC.[]($INDEX + 1, *v) - } - } - }}; -} - -pub(crate) fn add_to_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - match s { - ScalarValue::Null => { - // do nothing - } - ScalarValue::Float64(rhs) => { - sum_row!(index, accessor, rhs, f64) - } - ScalarValue::Float32(rhs) => { - sum_row!(index, accessor, rhs, f32) - } - ScalarValue::UInt64(rhs) => { - sum_row!(index, accessor, rhs, u64) - } - ScalarValue::Int64(rhs) => { - sum_row!(index, accessor, rhs, i64) - } - ScalarValue::Decimal128(rhs, _, _) => { - sum_row!(index, accessor, rhs, i128) - } - ScalarValue::Dictionary(_, value) => { - let value = value.as_ref(); - return add_to_row(index, accessor, value); - } - _ => { - let msg = - format!("Row sum updater is not expected to receive a scalar {s:?}"); - return Err(DataFusionError::Internal(msg)); - } - } - Ok(()) -} -pub(crate) fn update_avg_to_row( - index: usize, - accessor: &mut RowAccessor, - s: &ScalarValue, -) -> Result<()> { - match s { - ScalarValue::Null => { - // do nothing - } - ScalarValue::Float64(rhs) => { - avg_row!(index, accessor, rhs, f64) - } - ScalarValue::Float32(rhs) => { - avg_row!(index, accessor, rhs, f32) - } - ScalarValue::UInt64(rhs) => { - avg_row!(index, accessor, rhs, u64) - } - ScalarValue::Int64(rhs) => { - avg_row!(index, accessor, rhs, i64) - } - ScalarValue::Decimal128(rhs, _, _) => { - avg_row!(index, accessor, rhs, i128) - } - ScalarValue::Dictionary(_, value) => { - let value = value.as_ref(); - return update_avg_to_row(index, accessor, value); - } - _ => { - let msg = - format!("Row avg updater is not expected to receive a scalar {s:?}"); - return Err(DataFusionError::Internal(msg)); - } - } - Ok(()) -} impl Accumulator for SumAccumulator { fn state(&self) -> Result> { @@ -467,73 +357,14 @@ impl Accumulator for SlidingSumAccumulator { } } -#[derive(Debug)] -struct SumRowAccumulator { - index: usize, - datatype: DataType, -} - -impl SumRowAccumulator { - pub fn new(index: usize, datatype: DataType) -> Self { - Self { index, datatype } - } -} - -impl RowAccumulator for SumRowAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - let values = &values[0]; - let delta = sum_batch(values, &self.datatype)?; - add_to_row(self.index, accessor, &delta) - } - - fn update_scalar_values( - &mut self, - values: &[ScalarValue], - accessor: &mut RowAccessor, - ) -> Result<()> { - let value = &values[0]; - add_to_row(self.index, accessor, value) - } - - fn update_scalar( - &mut self, - value: &ScalarValue, - accessor: &mut RowAccessor, - ) -> Result<()> { - add_to_row(self.index, accessor, value) - } - - fn merge_batch( - &mut self, - states: &[ArrayRef], - accessor: &mut RowAccessor, - ) -> Result<()> { - self.update_batch(states, accessor) - } - - fn evaluate(&self, accessor: &RowAccessor) -> Result { - Ok(accessor.get_as_scalar(&self.datatype, self.index)) - } - - #[inline(always)] - fn state_index(&self) -> usize { - self.index - } -} - #[cfg(test)] mod tests { use super::*; use crate::expressions::tests::aggregate; - use crate::expressions::{col, Avg}; + use crate::expressions::{col}; use crate::generic_test_op; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; - use arrow_array::DictionaryArray; use datafusion_common::Result; #[test] @@ -656,75 +487,4 @@ mod tests { Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); generic_test_op!(a, DataType::Float64, Sum, ScalarValue::from(15_f64)) } - - fn row_aggregate( - array: &ArrayRef, - agg: Arc, - row_accessor: &mut RowAccessor, - row_indexs: Vec, - ) -> Result { - let mut accum = agg.create_row_accumulator(0)?; - - for row_index in row_indexs { - let scalar_value = ScalarValue::try_from_array(array, row_index)?; - accum.update_scalar(&scalar_value, row_accessor)?; - } - accum.evaluate(row_accessor) - } - - #[test] - fn sum_dictionary_f64() -> Result<()> { - let keys = Int32Array::from(vec![2, 3, 1, 0, 1]); - let values = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64])); - - let a: ArrayRef = Arc::new(DictionaryArray::try_new(keys, values).unwrap()); - - let row_schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]); - let mut row_accessor = RowAccessor::new(&row_schema); - let mut buffer: Vec = vec![0; 16]; - row_accessor.point_to(0, &mut buffer); - - let expected = ScalarValue::from(9_f64); - - let agg = Arc::new(Sum::new( - col("a", &row_schema)?, - "bla".to_string(), - expected.get_datatype(), - )); - - let actual = row_aggregate(&a, agg, &mut row_accessor, vec![0, 1, 2])?; - assert_eq!(expected, actual); - - Ok(()) - } - - #[test] - fn avg_dictionary_f64() -> Result<()> { - let keys = Int32Array::from(vec![2, 1, 1, 3, 0]); - let values = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64])); - - let a: ArrayRef = Arc::new(DictionaryArray::try_new(keys, values).unwrap()); - - let row_schema = Schema::new(vec![ - Field::new("count", DataType::UInt64, true), - Field::new("a", DataType::Float64, true), - ]); - let mut row_accessor = RowAccessor::new(&row_schema); - let mut buffer: Vec = vec![0; 24]; - row_accessor.point_to(0, &mut buffer); - - let expected = ScalarValue::from(2.3333333333333335_f64); - - let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]); - let agg = Arc::new(Avg::new( - col("a", &schema)?, - "bla".to_string(), - expected.get_datatype(), - )); - - let actual = row_aggregate(&a, agg, &mut row_accessor, vec![0, 1, 2])?; - assert_eq!(expected, actual); - - Ok(()) - } }