Skip to content

Commit

Permalink
Change count() return type to UInt64
Browse files Browse the repository at this point in the history
The count is never negative and this can be reflected in a more
appropriate return type.
  • Loading branch information
findepi committed Oct 25, 2024
1 parent 813220d commit b013dc4
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 68 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::array::{Array, ArrayRef, UInt64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use arrow_schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -1171,7 +1171,7 @@ impl DataFrame {
let len = *rows
.first()
.and_then(|r| r.columns().first())
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.and_then(|a| a.values().first())
.ok_or(DataFusionError::Internal(
"Unexpected output when collecting for count()".to_string(),
Expand Down
34 changes: 17 additions & 17 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,17 +845,17 @@ mod tests {

let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs, filter);

let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];

let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
" FilterExec: NOT non_nullable_col@1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
Expand Down Expand Up @@ -1722,15 +1722,15 @@ mod tests {
// corresponding SortExecs together. Also, the inputs of these `SortExec`s
// are not necessarily the same to be able to remove them.
let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
" UnionExec",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = [
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
Expand Down Expand Up @@ -1760,14 +1760,14 @@ mod tests {

// The `WindowAggExec` can get its required sorting from the leaf nodes directly.
// The unnecessary SortExecs should be removed
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -2060,15 +2060,15 @@ mod tests {
let physical_plan =
bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);

let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];

let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Expand Down Expand Up @@ -2134,7 +2134,7 @@ mod tests {
let expected_input = vec![
"SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_eq!(
Expand Down Expand Up @@ -2386,15 +2386,15 @@ mod tests {
let physical_plan = bounded_window_exec("a", sort_exprs, spm);

let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
let expected_optimized = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
let sort = sort_exec(sort_exprs.clone(), source);
let bw = bounded_window_exec("c9", sort_exprs, sort);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
Expand All @@ -465,7 +465,7 @@ mod tests {
)];
let bw = bounded_window_exec("c9", sort_exprs, source);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
// Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check.
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::array::{Int32Array, Int64Array};
use arrow::array::{Int32Array, UInt64Array};
use arrow::compute::kernels::aggregate;
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -285,12 +285,12 @@ async fn optimizers_catch_all_statistics() {

let expected = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("count(*)", DataType::Int64, false),
Field::new("count(*)", DataType::UInt64, false),
Field::new("min(test.c1)", DataType::Int32, false),
Field::new("max(test.c1)", DataType::Int32, false),
])),
vec![
Arc::new(Int64Array::from(vec![4])),
Arc::new(UInt64Array::from(vec![4])),
Arc::new(Int32Array::from(vec![1])),
Arc::new(Int32Array::from(vec![100])),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::ops::Deref;
use std::sync::Arc;

use arrow::array::{Int32Builder, Int64Array};
use arrow::array::{Int32Builder, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::catalog::TableProvider;
Expand Down Expand Up @@ -229,7 +229,7 @@ impl TableProvider for CustomProvider {
}
}

async fn assert_provider_row_count(value: i64, expected_count: i64) -> Result<()> {
async fn assert_provider_row_count(value: i64, expected_count: u64) -> Result<()> {
let provider = CustomProvider {
zero_batch: create_batch(0, 10)?,
one_batch: create_batch(1, 5)?,
Expand All @@ -242,7 +242,7 @@ async fn assert_provider_row_count(value: i64, expected_count: i64) -> Result<()
.aggregate(vec![], vec![count(col("flag"))])?;

let results = df.collect().await?;
let result_col: &Int64Array = as_primitive_array(results[0].column(0))?;
let result_col: &UInt64Array = as_primitive_array(results[0].column(0))?;
assert_eq!(result_col.value(0), expected_count);

ctx.register_table("data", Arc::new(provider))?;
Expand All @@ -252,7 +252,7 @@ async fn assert_provider_row_count(value: i64, expected_count: i64) -> Result<()
.collect()
.await?;

let sql_result_col: &Int64Array = as_primitive_array(sql_results[0].column(0))?;
let sql_result_col: &UInt64Array = as_primitive_array(sql_results[0].column(0))?;
assert_eq!(sql_result_col.value(0), expected_count);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[count(*)",
"ProjectionExec: expr=[CAST(count(*)@0 AS Int64)",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
Expand Down
Loading

0 comments on commit b013dc4

Please sign in to comment.