From 8bb26ae17af05d58c17fd2a5e47ead6041d2a24f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 21 Jul 2024 10:03:25 -0600 Subject: [PATCH 1/6] add criterion benchmarks for IsNullExpr and IsNotNullExpr --- datafusion/physical-expr/Cargo.toml | 4 + datafusion/physical-expr/benches/is_null.rs | 95 +++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 datafusion/physical-expr/benches/is_null.rs diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 067617a697a9..8436b5279bd7 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -81,3 +81,7 @@ name = "in_list" [[bench]] harness = false name = "case_when" + +[[bench]] +harness = false +name = "is_null" diff --git a/datafusion/physical-expr/benches/is_null.rs b/datafusion/physical-expr/benches/is_null.rs new file mode 100644 index 000000000000..3dad8e9b456a --- /dev/null +++ b/datafusion/physical-expr/benches/is_null.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::builder::Int32Builder; +use arrow_schema::DataType; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr_common::expressions::column::Column; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use std::sync::Arc; + +fn criterion_benchmark(c: &mut Criterion) { + // create input data + let mut c1 = Int32Builder::new(); + let mut c2 = Int32Builder::new(); + let mut c3 = Int32Builder::new(); + for i in 0..1000 { + // c1 is always null + c1.append_null(); + // c2 is never null + c2.append_value(i); + // c3 is a mix of values and nulls + if i % 7 == 0 { + c3.append_null(); + } else { + c3.append_value(i); + } + } + let c1 = Arc::new(c1.finish()); + let c2 = Arc::new(c2.finish()); + let c3 = Arc::new(c3.finish()); + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, false), + Field::new("c3", DataType::Int32, true), + ]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2, c3]).unwrap(); + + c.bench_function("is_null: column is all nulls", |b| { + let expr = is_null("c1", 0); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + + c.bench_function("is_null: column is never null", |b| { + let expr = is_null("c2", 1); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + + c.bench_function("is_null: column is mix of values and nulls", |b| { + let expr = is_null("c3", 2); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + + c.bench_function("is_not_null: column is all nulls", |b| { + let expr = is_not_null("c1", 0); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + + c.bench_function("is_not_null: column is never null", |b| { + let expr = is_not_null("c2", 1); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + + c.bench_function("is_not_null: column is mix of values and nulls", |b| { + let expr = is_not_null("c3", 2); + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); +} + +fn is_null(name: &str, index: usize) -> Arc { + Arc::new(IsNullExpr::new(Arc::new(Column::new(name, index)))) +} + +fn is_not_null(name: &str, index: usize) -> Arc { + Arc::new(IsNotNullExpr::new(Arc::new(Column::new(name, index)))) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From bf7cf9779ce13ae5011c96dbc22333b980618981 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 21 Jul 2024 10:11:48 -0600 Subject: [PATCH 2/6] Improve IsNotNull performance by avoiding calling is_null then not and just calling is_not_null kernel directly --- .../physical-expr/src/expressions/is_not_null.rs | 4 +--- datafusion/physical-expr/src/expressions/is_null.rs | 11 +++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 9f7438d13e05..58559352d44c 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -22,7 +22,6 @@ use std::{any::Any, sync::Arc}; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; -use arrow::compute; use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, @@ -74,8 +73,7 @@ impl PhysicalExpr for IsNotNullExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let is_null = super::is_null::compute_is_null(array)?; - let is_not_null = compute::not(&is_null)?; + let is_not_null = super::is_null::compute_is_not_null(array)?; Ok(ColumnarValue::Array(Arc::new(is_not_null))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index e2dc941e26bc..9e9209d1a6c8 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -117,6 +117,17 @@ pub(crate) fn compute_is_null(array: ArrayRef) -> Result { } } +/// workaround , +/// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed. +pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result { + if array.as_any().is::() { + let is_null = compute_is_null(array)?; + compute::not(&is_null).map_err(Into::into) + } else { + compute::is_not_null(array.as_ref()).map_err(Into::into) + } +} + fn dense_union_is_null( union_array: &UnionArray, offsets: &ScalarBuffer, From 70d254f5b97acc74cc5ac10e500398bde1d9f48e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 21 Jul 2024 10:25:15 -0600 Subject: [PATCH 3/6] fast path if input array is all nulls or no nulls --- .../src/expressions/is_not_null.rs | 16 ++++++++++++++-- .../physical-expr/src/expressions/is_null.rs | 19 ++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 58559352d44c..e303772102ed 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -26,6 +26,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_array::UnionArray; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -72,9 +73,20 @@ impl PhysicalExpr for IsNotNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { + ColumnarValue::Array(array) if array.as_any().is::() => { + Ok(ColumnarValue::Array(Arc::new( + super::is_null::compute_is_not_null(array)?, + ))) + } ColumnarValue::Array(array) => { - let is_not_null = super::is_null::compute_is_not_null(array)?; - Ok(ColumnarValue::Array(Arc::new(is_not_null))) + if array.null_count() == 0 { + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))) + } else if array.null_count() == array.len() { + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)))) + } else { + let is_not_null = super::is_null::compute_is_not_null(array)?; + Ok(ColumnarValue::Array(Arc::new(is_not_null))) + } } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(!scalar.is_null())), diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 9e9209d1a6c8..117605a08d22 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -77,9 +77,18 @@ impl PhysicalExpr for IsNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => { + ColumnarValue::Array(array) if array.as_any().is::() => { Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) } + ColumnarValue::Array(array) => { + if array.null_count() == 0 { + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)))) + } else if array.null_count() == array.len() { + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))) + } else { + Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) + } + } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), )), @@ -120,8 +129,12 @@ pub(crate) fn compute_is_null(array: ArrayRef) -> Result { /// workaround , /// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed. pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result { - if array.as_any().is::() { - let is_null = compute_is_null(array)?; + if let Some(union_array) = array.as_any().downcast_ref::() { + let is_null = if let Some(offsets) = union_array.offsets() { + dense_union_is_null(union_array, offsets)? + } else { + sparse_union_is_null(union_array)? + }; compute::not(&is_null).map_err(Into::into) } else { compute::is_not_null(array.as_ref()).map_err(Into::into) From 34b0b7f0638cfd5acb329b07b272315dbd4549c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 21 Jul 2024 10:45:34 -0600 Subject: [PATCH 4/6] revert experimental change --- .../physical-expr/src/expressions/is_not_null.rs | 15 ++------------- .../physical-expr/src/expressions/is_null.rs | 11 +---------- 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index e303772102ed..85c6f37476cc 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -73,20 +73,9 @@ impl PhysicalExpr for IsNotNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) if array.as_any().is::() => { - Ok(ColumnarValue::Array(Arc::new( - super::is_null::compute_is_not_null(array)?, - ))) - } ColumnarValue::Array(array) => { - if array.null_count() == 0 { - Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))) - } else if array.null_count() == array.len() { - Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)))) - } else { - let is_not_null = super::is_null::compute_is_not_null(array)?; - Ok(ColumnarValue::Array(Arc::new(is_not_null))) - } + let is_not_null = super::is_null::compute_is_not_null(array)?; + Ok(ColumnarValue::Array(Arc::new(is_not_null))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(!scalar.is_null())), diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 117605a08d22..03932c79f50c 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -77,17 +77,8 @@ impl PhysicalExpr for IsNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) if array.as_any().is::() => { - Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) - } ColumnarValue::Array(array) => { - if array.null_count() == 0 { - Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)))) - } else if array.null_count() == array.len() { - Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)))) - } else { - Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) - } + Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), From 75caa4526b48b576332239c44953db2a866ea9d3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 21 Jul 2024 13:21:59 -0600 Subject: [PATCH 5/6] remove unused import --- datafusion/physical-expr/src/expressions/is_not_null.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 85c6f37476cc..58559352d44c 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -26,7 +26,6 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_array::UnionArray; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; From 2b7613dc0e8084bb650d05779f8a713487a6eec1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 22 Jul 2024 15:32:17 -0600 Subject: [PATCH 6/6] simplify PR --- datafusion/physical-expr/src/expressions/is_null.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 03932c79f50c..3cdb49bcab42 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -120,13 +120,8 @@ pub(crate) fn compute_is_null(array: ArrayRef) -> Result { /// workaround , /// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed. pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result { - if let Some(union_array) = array.as_any().downcast_ref::() { - let is_null = if let Some(offsets) = union_array.offsets() { - dense_union_is_null(union_array, offsets)? - } else { - sparse_union_is_null(union_array)? - }; - compute::not(&is_null).map_err(Into::into) + if array.as_any().is::() { + compute::not(&compute_is_null(array)?).map_err(Into::into) } else { compute::is_not_null(array.as_ref()).map_err(Into::into) }