Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of using Expr::field in 37.1.0 #10183

Closed
wants to merge 3 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{BooleanArray, Int32Array};
use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray, StructArray};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand All @@ -29,12 +29,15 @@ use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::prelude::*;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use datafusion_expr::{
ColumnarValue, ExprSchemable, GetFieldAccess, GetIndexedField, Operator,
};

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -68,6 +71,9 @@ async fn main() -> Result<()> {
// See how to evaluate expressions
evaluate_demo()?;

// See how to evaluate expressions on structured types
evaluate_structured_demo()?;

// See how to simplify expressions
simplify_demo()?;

Expand Down Expand Up @@ -110,6 +116,48 @@ fn evaluate_demo() -> Result<()> {
Ok(())
}

/// DataFusion can also handle structured types such as '{"a": 5, "b": "foo"}'
fn evaluate_structured_demo() -> Result<()> {
// For example, let's say you have an array of structs such as
// [
// {"a": 4, "b": "foo"},
// {"a": 5, "b": "bar"}
// {"a": 6, "b": "baz"}
// }
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["foo", "bar", "baz"])) as ArrayRef,
),
]);
let batch =
RecordBatch::try_from_iter([("struct_col", Arc::new(struct_array) as _)])?;

// To evaluate `struct_col.a < 6`: extract the value of the field `a` and
// check if it is less than 6
let expr = col("struct_col").field("a").lt(lit(6));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;

// The result contain an array that is true only for where `a < 6`
let expected_result = Arc::new(BooleanArray::from(vec![true, true, false])) as _;
assert!(
matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
"result: {:?}",
result
);

Ok(())
}

/// In addition to easy construction, DataFusion exposes APIs for simplifying
/// such expression so they are more efficient to evaluate. This code is also
/// used by the query engine to optimize queries.
Expand Down Expand Up @@ -248,18 +296,35 @@ fn make_ts_field(name: &str) -> Field {
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
/// Build a physical expression from a logical one, after applying
/// simplification, type coercion, and applying function rewrites
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
// register the standard DataFusion function library
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

// Support Expr::struct by rewriting expressions
let expr = expr
.transform_up(&|expr| {
Ok(match expr {
Expr::GetIndexedField(GetIndexedField {
Copy link
Contributor Author

@alamb alamb Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically inlines the necessary code from

Expr::GetIndexedField(GetIndexedField {
expr,
field: GetFieldAccess::NamedStructField { name },
}) => {
let expr = *expr.clone();
let name = Expr::Literal(name);
Transformed::yes(get_field(expr, name.clone()))

expr,
field: GetFieldAccess::NamedStructField { name },
}) => {
let name = Expr::Literal(name);
Transformed::yes(get_field(*expr, name))
}
_ => Transformed::no(expr),
})
})?
.data;

create_physical_expr(&expr, df_schema.as_ref(), &props)
}

Expand Down
Loading