Skip to content

Commit

Permalink
Change count() return type to UInt64
Browse files Browse the repository at this point in the history
The count is never negative and this can be reflected in a more
appropriate return type.
  • Loading branch information
findepi committed Oct 22, 2024
1 parent c22abb4 commit de57e83
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 56 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::array::{Array, ArrayRef, UInt64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use arrow_schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -1194,7 +1194,7 @@ impl DataFrame {
let len = *rows
.first()
.and_then(|r| r.columns().first())
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.and_then(|a| a.values().first())
.ok_or(DataFusionError::Internal(
"Unexpected output when collecting for count()".to_string(),
Expand Down
34 changes: 17 additions & 17 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,17 +845,17 @@ mod tests {

let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs, filter);

let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];

let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
" FilterExec: NOT non_nullable_col@1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
Expand Down Expand Up @@ -1722,15 +1722,15 @@ mod tests {
// corresponding SortExecs together. Also, the inputs of these `SortExec`s
// are not necessarily the same to be able to remove them.
let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
" UnionExec",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = [
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
Expand Down Expand Up @@ -1760,14 +1760,14 @@ mod tests {

// The `WindowAggExec` can get its required sorting from the leaf nodes directly.
// The unnecessary SortExecs should be removed
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -2060,15 +2060,15 @@ mod tests {
let physical_plan =
bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);

let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];

let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Expand Down Expand Up @@ -2134,7 +2134,7 @@ mod tests {
let expected_input = vec![
"SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_eq!(
Expand Down Expand Up @@ -2386,15 +2386,15 @@ mod tests {
let physical_plan = bounded_window_exec("a", sort_exprs, spm);

let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
let expected_optimized = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
let sort = sort_exec(sort_exprs.clone(), source);
let bw = bounded_window_exec("c9", sort_exprs, sort);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
Expand All @@ -465,7 +465,7 @@ mod tests {
)];
let bw = bounded_window_exec("c9", sort_exprs, source);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
// Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
Ok(ScalarValue::UInt64(Some(self.0.non_null_len() as u64)))
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Accumulator for BytesViewDistinctCountAccumulator {
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
Ok(ScalarValue::UInt64(Some(self.0.non_null_len() as u64)))
}

fn size(&self) -> usize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
Ok(ScalarValue::UInt64(Some(self.values.len() as u64)))
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -201,7 +201,7 @@ where
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
Ok(ScalarValue::UInt64(Some(self.values.len() as u64)))
}

fn size(&self) -> usize {
Expand Down
Loading

0 comments on commit de57e83

Please sign in to comment.