diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 2c1470a1d6ec..0082ed6eb9a9 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::DFSchema; use datafusion::error::Result; use datafusion::optimizer::simplify_expressions::ExprSimplifier; -use datafusion::physical_expr::{ - analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, -}; +use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; @@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> { let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8))); // First, you make a "physical expression" from the logical `Expr` - let physical_expr = physical_expr(&batch.schema(), expr)?; + let df_schema = DFSchema::try_from(batch.schema())?; + let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; // Now, you can evaluate the expression against the RecordBatch let result = physical_expr.evaluate(&batch)?; @@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> { // `date < '2020-10-01' AND date > '2020-09-01'` // As always, we need to tell DataFusion the type of column "date" - let schema = Schema::new(vec![make_field("date", DataType::Date32)]); + let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)])); // You can provide DataFusion any known boundaries on the values of `date` // (for example, maybe you know you only have data up to `2020-09-15`), but @@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> { let boundaries = ExprBoundaries::try_new_unbounded(&schema)?; // Now, we invoke the analysis code to perform the range analysis - let physical_expr = physical_expr(&schema, expr)?; - let analysis_result = - analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?; + let df_schema = DFSchema::try_from(schema)?; + let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; + let analysis_result = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + df_schema.as_ref(), + )?; // The results of the analysis is an range, encoded as an `Interval`, for // each column in the schema, that must be true in order for the predicate @@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field { make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) } -/// Build a physical expression from a logical one, after applying simplification and type coercion -pub fn physical_expr(schema: &Schema, expr: Expr) -> Result> { - let df_schema = schema.clone().to_dfschema_ref()?; - - // Simplify - let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone())); - - // apply type coercion here to ensure types match - let expr = simplifier.coerce(expr, &df_schema)?; - - create_physical_expr(&expr, df_schema.as_ref(), &props) -} - /// This function shows how to use `Expr::get_type` to retrieve the DataType /// of an expression fn expression_type_demo() -> Result<()> { diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b2a3de72356c..3686af90db17 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -125,6 +125,20 @@ impl DFSchema { } } + /// Return a reference to the inner Arrow [`Schema`] + /// + /// Note this does not have the qualifier information + pub fn as_arrow(&self) -> &Schema { + self.inner.as_ref() + } + + /// Return a reference to the inner Arrow [`SchemaRef`] + /// + /// Note this does not have the qualifier information + pub fn inner(&self) -> &SchemaRef { + &self.inner + } + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier pub fn new_with_metadata( qualified_fields: Vec<(Option, Arc)>, @@ -806,6 +820,21 @@ impl From<&DFSchema> for Schema { } } +/// Allow DFSchema to be converted into an Arrow `&Schema` +impl AsRef for DFSchema { + fn as_ref(&self) -> &Schema { + self.as_arrow() + } +} + +/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for +/// example) +impl AsRef for DFSchema { + fn as_ref(&self) -> &SchemaRef { + self.inner() + } +} + /// Create a `DFSchema` from an Arrow schema impl TryFrom for DFSchema { type Error = DataFusionError; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d84983f08ec6..f3af31f895f9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -71,13 +71,13 @@ use datafusion_common::{ config::{ConfigExtension, TableOptions}, exec_err, not_impl_err, plan_datafusion_err, plan_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, - SchemaReference, TableReference, + DFSchema, SchemaReference, TableReference, }; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, var_provider::is_system_variables, - Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, + Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; use datafusion_sql::{ parser::{CopyToSource, CopyToStatement, DFParser}, @@ -87,15 +87,20 @@ use datafusion_sql::{ use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_common::tree_node::TreeNode; use parking_lot::RwLock; use sqlparser::dialect::dialect_from_str; use url::Url; use uuid::Uuid; +use crate::physical_expr::PhysicalExpr; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_rewriter::FunctionRewrite; +use datafusion_expr::simplify::SimplifyInfo; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; +use datafusion_physical_expr::create_physical_expr; mod avro; mod csv; @@ -523,6 +528,41 @@ impl SessionContext { } } + /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion and function rewrites. + /// + /// Note: The expression is not [simplified] or otherwise optimized: `a = 1 + /// + 2` will not be simplified to `a = 3` as this is a more involved process. + /// See the [expr_api] example for how to simplify expressions. + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use arrow::datatypes::{DataType, Field, Schema}; + /// # use datafusion::prelude::*; + /// # use datafusion_common::DFSchema; + /// // a = 1 (i64) + /// let expr = col("a").eq(lit(1i64)); + /// // provide type information that `a` is an Int32 + /// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + /// let df_schema = DFSchema::try_from(schema).unwrap(); + /// // Create a PhysicalExpr. Note DataFusion automatically coerces (casts) `1i64` to `1i32` + /// let physical_expr = SessionContext::new() + /// .create_physical_expr(expr, &df_schema).unwrap(); + /// ``` + /// # See Also + /// * [`SessionState::create_physical_expr`] for a lower level API + /// + /// [simplified]: datafusion_optimizer::simplify_expressions + /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + pub fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> Result> { + self.state.read().create_physical_expr(expr, df_schema) + } + // return an empty dataframe fn return_empty_dataframe(&self) -> Result { let plan = LogicalPlanBuilder::empty(false).build()?; @@ -1946,13 +1986,14 @@ impl SessionState { } } - /// Creates a physical plan from a logical plan. + /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. /// /// Note: this first calls [`Self::optimize`] on the provided /// plan. /// - /// This function will error for [`LogicalPlan`]s such as catalog - /// DDL `CREATE TABLE` must be handled by another layer. + /// This function will error for [`LogicalPlan`]s such as catalog DDL like + /// `CREATE TABLE`, which do not have corresponding physical plans and must + /// be handled by another layer, typically [`SessionContext`]. pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -1963,6 +2004,39 @@ impl SessionState { .await } + /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion, and function rewrites. + /// + /// Note: The expression is not [simplified] or otherwise optimized: `a = 1 + /// + 2` will not be simplified to `a = 3` as this is a more involved process. + /// See the [expr_api] example for how to simplify expressions. + /// + /// # See Also: + /// * [`SessionContext::create_physical_expr`] for a higher-level API + /// * [`create_physical_expr`] for a lower-level API + /// + /// [simplified]: datafusion_optimizer::simplify_expressions + /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + pub fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> Result> { + let simplifier = + ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema)); + // apply type coercion here to ensure types match + let mut expr = simplifier.coerce(expr, df_schema)?; + + // rewrite Exprs to functions if necessary + let config_options = self.config_options(); + for rewrite in self.analyzer.function_rewrites() { + expr = expr + .transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))? + .data; + } + create_physical_expr(&expr, df_schema, self.execution_props()) + } + /// Return the session ID pub fn session_id(&self) -> &str { &self.session_id @@ -2040,6 +2114,35 @@ impl SessionState { } } +struct SessionSimplifyProvider<'a> { + state: &'a SessionState, + df_schema: &'a DFSchema, +} + +impl<'a> SessionSimplifyProvider<'a> { + fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { + Self { state, df_schema } + } +} + +impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> { + fn is_boolean_type(&self, expr: &Expr) -> Result { + Ok(expr.get_type(self.df_schema)? == DataType::Boolean) + } + + fn nullable(&self, expr: &Expr) -> Result { + expr.nullable(self.df_schema) + } + + fn execution_props(&self) -> &ExecutionProps { + self.state.execution_props() + } + + fn get_data_type(&self, expr: &Expr) -> Result { + expr.get_type(self.df_schema) + } +} + struct SessionContextProvider<'a> { state: &'a SessionState, tables: HashMap>, diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index befefb1d7ec5..f8ad8f1554b2 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -24,6 +24,9 @@ mod dataframe; /// Run all tests that are found in the `macro_hygiene` directory mod macro_hygiene; +/// Run all tests that are found in the `expr_api` directory +mod expr_api; + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/core/tests/expr_api/mod.rs b/datafusion/core/tests/expr_api/mod.rs new file mode 100644 index 000000000000..0dde7604cce2 --- /dev/null +++ b/datafusion/core/tests/expr_api/mod.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::pretty::pretty_format_columns; +use arrow_array::builder::{ListBuilder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch, StringArray, StructArray}; +use arrow_schema::{DataType, Field}; +use datafusion::prelude::*; +use datafusion_common::DFSchema; +/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan +use std::sync::{Arc, OnceLock}; + +#[test] +fn test_eq() { + // id = '2' + evaluate_expr_test( + col("id").eq(lit("2")), + vec![ + "+-------+", + "| expr |", + "+-------+", + "| false |", + "| true |", + "| false |", + "+-------+", + ], + ); +} + +#[test] +fn test_eq_with_coercion() { + // id = 2 (need to coerce the 2 to '2' to evaluate) + evaluate_expr_test( + col("id").eq(lit(2i32)), + vec![ + "+-------+", + "| expr |", + "+-------+", + "| false |", + "| true |", + "| false |", + "+-------+", + ], + ); +} + +#[test] +fn test_get_field() { + // field access Expr::field() requires a rewrite to work + evaluate_expr_test( + col("props").field("a"), + vec![ + "+------------+", + "| expr |", + "+------------+", + "| 2021-02-01 |", + "| 2021-02-02 |", + "| 2021-02-03 |", + "+------------+", + ], + ); +} + +#[test] +fn test_nested_get_field() { + // field access Expr::field() requires a rewrite to work, test when it is + // not the root expression + evaluate_expr_test( + col("props") + .field("a") + .eq(lit("2021-02-02")) + .or(col("id").eq(lit(1))), + vec![ + "+-------+", + "| expr |", + "+-------+", + "| true |", + "| true |", + "| false |", + "+-------+", + ], + ); +} + +#[test] +fn test_list() { + // list access also requires a rewrite to work + evaluate_expr_test( + col("list").index(lit(1i64)), + vec![ + "+------+", "| expr |", "+------+", "| one |", "| two |", "| five |", + "+------+", + ], + ); +} + +#[test] +fn test_list_range() { + // range access also requires a rewrite to work + evaluate_expr_test( + col("list").range(lit(1i64), lit(2i64)), + vec![ + "+--------------+", + "| expr |", + "+--------------+", + "| [one] |", + "| [two, three] |", + "| [five] |", + "+--------------+", + ], + ); +} + +/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided +/// `RecordBatch` and compares the result to the expected result. +fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) { + let batch = test_batch(); + let df_schema = DFSchema::try_from(batch.schema()).unwrap(); + let physical_expr = SessionContext::new() + .create_physical_expr(expr, &df_schema) + .unwrap(); + + let result = physical_expr.evaluate(&batch).unwrap(); + let array = result.into_array(1).unwrap(); + let result = pretty_format_columns("expr", &[array]).unwrap().to_string(); + let actual_lines = result.lines().collect::>(); + + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); +} + +static TEST_BATCH: OnceLock = OnceLock::new(); + +fn test_batch() -> RecordBatch { + TEST_BATCH + .get_or_init(|| { + let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"])); + + // { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" } + let struct_array: ArrayRef = Arc::from(StructArray::from(vec![( + Arc::new(Field::new("a", DataType::Utf8, false)), + Arc::new(StringArray::from(vec![ + "2021-02-01", + "2021-02-02", + "2021-02-03", + ])) as _, + )])); + + // ["one"] ["two", "three", "four"] ["five"] + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.append_value([Some("one")]); + builder.append_value([Some("two"), Some("three"), Some("four")]); + builder.append_value([Some("five")]); + let list_array: ArrayRef = Arc::new(builder.finish()); + + RecordBatch::try_from_iter(vec![ + ("id", string_array), + ("props", struct_array), + ("list", list_array), + ]) + .unwrap() + }) + .clone() +} diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 1553f95266c8..121e46cc950f 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -111,6 +111,11 @@ impl Analyzer { self.function_rewrites.push(rewrite); } + /// return the list of function rewrites in this analyzer + pub fn function_rewrites(&self) -> &[Arc] { + &self.function_rewrites + } + /// Analyze the logical plan by applying analyzer rules, and /// do necessary check and fail the invalid plans pub fn execute_and_check(