diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index aac0a5a94091..d14975a68883 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -426,6 +426,7 @@ impl EquivalenceProperties { } (!meet.is_empty()).then_some(meet) } + /// we substitute the ordering according to input expression type, this is a simplified version /// In this case, we just substitute when the expression satisfy the following confition /// I. just have one column and is a CAST expression @@ -435,6 +436,7 @@ impl EquivalenceProperties { pub fn substitute_ordering_component( matching_exprs: Arc>>, sort_expr: &[PhysicalSortExpr], + schema: SchemaRef, ) -> Vec { sort_expr .iter() @@ -454,7 +456,11 @@ impl EquivalenceProperties { let r_expr = referring_exprs[0].clone(); if let Some(cast_expr) = r_expr.as_any().downcast_ref::() { // we need to know whether the Cast Expr matches or not - if cast_expr.expr.eq(&sort_expr.expr) { + let expr_type = + sort_expr.expr.data_type(schema.as_ref()).unwrap(); + if cast_expr.expr.eq(&sort_expr.expr) + && cast_expr.is_bigger_cast(expr_type) + { PhysicalSortExpr { expr: r_expr.clone(), options: sort_expr.options, @@ -480,6 +486,7 @@ impl EquivalenceProperties { &mut self, exprs: &[(Arc, String)], mapping: &ProjectionMapping, + schema: SchemaRef, ) { let matching_exprs: Arc> = Arc::new( exprs @@ -488,17 +495,18 @@ impl EquivalenceProperties { .map(|(source, _)| source) .collect(), ); - //println!("matching_expr is {:?}", matching_exprs); - //println!("self.ordering is {:?}", self.oeq_class); let orderings = std::mem::take(&mut self.oeq_class.orderings); let new_order = orderings .into_iter() .map(move |order| { - Self::substitute_ordering_component(matching_exprs.clone(), &order) + Self::substitute_ordering_component( + matching_exprs.clone(), + &order, + schema.clone(), + ) }) .collect(); self.oeq_class = OrderingEquivalenceClass::new(new_order); - //println!("######## the oeq_class si {:?}", self.oeq_class.orderings); } /// Projects argument `expr` according to `projection_mapping`, taking /// equivalences into account. diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index fb3b2466c746..b0e175e711fe 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::physical_expr::down_cast_any_ref; +use crate::sort_properties::SortProperties; +use crate::PhysicalExpr; use std::any::Any; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::Arc; - -use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; +use DataType::*; use arrow::compute::{can_cast_types, kernels, CastOptions}; use arrow::datatypes::{DataType, Schema}; @@ -76,6 +76,26 @@ impl CastExpr { pub fn cast_options(&self) -> &CastOptions<'static> { &self.cast_options } + pub fn is_bigger_cast(&self, src: DataType) -> bool { + if src == self.cast_type { + return true; + } + matches!( + (src, &self.cast_type), + (Int8, Int16 | Int32 | Int64) + | (Int16, Int32 | Int64) + | (Int32, Int64) + | (UInt8, UInt16 | UInt32 | UInt64) + | (UInt16, UInt32 | UInt64) + | (UInt32, UInt64) + | ( + Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32, + Float32 | Float64 + ) + | (Int64 | UInt64, Float64) + | (Utf8, LargeUtf8) + ) + } } impl fmt::Display for CastExpr { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 068e71a3afca..51423d37e77c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -70,7 +70,6 @@ impl ProjectionExec { input: Arc, ) -> Result { let input_schema = input.schema(); - let fields: Result> = expr .iter() .map(|(e, name)| { @@ -94,10 +93,11 @@ impl ProjectionExec { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; - //println!("projection_mapping is {:?}", projection_mapping); + let mut input_eqs = input.equivalence_properties(); - input_eqs.substitute_oeq_class(&expr, &projection_mapping); - //println!("input_eqs is {:?}", input_eqs); + + input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone()); + let project_eqs = input_eqs.project(&projection_mapping, schema.clone()); let output_ordering = project_eqs.oeq_class().output_ordering(); @@ -204,7 +204,11 @@ impl ExecutionPlan for ProjectionExec { fn equivalence_properties(&self) -> EquivalenceProperties { let mut equi_properties = self.input.equivalence_properties(); - equi_properties.substitute_oeq_class(&self.expr, &self.projection_mapping); + equi_properties.substitute_oeq_class( + &self.expr, + &self.projection_mapping, + self.input.schema().clone(), + ); equi_properties.project(&self.projection_mapping, self.schema()) } diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index bfe22d6adbc7..57b810673358 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -64,11 +64,23 @@ physical_plan ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] --CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false -# test for cast different type -query error DataFusion error: This feature is not implemented: Unsupported SQL type Custom\(ObjectName\(\[Ident \{ value: "UTF8", quote_style: None \}\]\), \[\]\) + +# test for cast Utf8 +query TT EXPLAIN SELECT - CAST(c_customer_sk AS UTF8) AS c_customer_sk_big, + CAST(c_customer_sk AS STRING) AS c_customer_sk_big, c_current_cdemo_sk FROM delta_encoding_required_column ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; +---- +logical_plan +Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST +--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk +----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +physical_plan +SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] +--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] +----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false