Skip to content

Commit

Permalink
adding cast UTF8 type and diable scalarfunction situation
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Feb 19, 2024
1 parent 3f80656 commit 9b912a7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 17 deletions.
18 changes: 13 additions & 5 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl EquivalenceProperties {
}
(!meet.is_empty()).then_some(meet)
}

/// we substitute the ordering according to input expression type, this is a simplified version
/// In this case, we just substitute when the expression satisfy the following confition
/// I. just have one column and is a CAST expression
Expand All @@ -435,6 +436,7 @@ impl EquivalenceProperties {
pub fn substitute_ordering_component(
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
sort_expr: &[PhysicalSortExpr],
schema: SchemaRef,
) -> Vec<PhysicalSortExpr> {
sort_expr
.iter()
Expand All @@ -454,7 +456,11 @@ impl EquivalenceProperties {
let r_expr = referring_exprs[0].clone();
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
// we need to know whether the Cast Expr matches or not
if cast_expr.expr.eq(&sort_expr.expr) {
let expr_type =
sort_expr.expr.data_type(schema.as_ref()).unwrap();
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
PhysicalSortExpr {
expr: r_expr.clone(),
options: sort_expr.options,
Expand All @@ -480,6 +486,7 @@ impl EquivalenceProperties {
&mut self,
exprs: &[(Arc<dyn PhysicalExpr>, String)],
mapping: &ProjectionMapping,
schema: SchemaRef,
) {
let matching_exprs: Arc<Vec<_>> = Arc::new(
exprs
Expand All @@ -488,17 +495,18 @@ impl EquivalenceProperties {
.map(|(source, _)| source)
.collect(),
);
//println!("matching_expr is {:?}", matching_exprs);
//println!("self.ordering is {:?}", self.oeq_class);
let orderings = std::mem::take(&mut self.oeq_class.orderings);
let new_order = orderings
.into_iter()
.map(move |order| {
Self::substitute_ordering_component(matching_exprs.clone(), &order)
Self::substitute_ordering_component(
matching_exprs.clone(),
&order,
schema.clone(),
)
})
.collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
//println!("######## the oeq_class si {:?}", self.oeq_class.orderings);
}
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
Expand Down
28 changes: 24 additions & 4 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use DataType::*;

use arrow::compute::{can_cast_types, kernels, CastOptions};
use arrow::datatypes::{DataType, Schema};
Expand Down Expand Up @@ -76,6 +76,26 @@ impl CastExpr {
pub fn cast_options(&self) -> &CastOptions<'static> {
&self.cast_options
}
pub fn is_bigger_cast(&self, src: DataType) -> bool {
if src == self.cast_type {
return true;
}
matches!(
(src, &self.cast_type),
(Int8, Int16 | Int32 | Int64)
| (Int16, Int32 | Int64)
| (Int32, Int64)
| (UInt8, UInt16 | UInt32 | UInt64)
| (UInt16, UInt32 | UInt64)
| (UInt32, UInt64)
| (
Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
Float32 | Float64
)
| (Int64 | UInt64, Float64)
| (Utf8, LargeUtf8)
)
}
}

impl fmt::Display for CastExpr {
Expand Down
14 changes: 9 additions & 5 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl ProjectionExec {
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();

let fields: Result<Vec<Field>> = expr
.iter()
.map(|(e, name)| {
Expand All @@ -94,10 +93,11 @@ impl ProjectionExec {

// construct a map from the input expressions to the output expression of the Projection
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;
//println!("projection_mapping is {:?}", projection_mapping);

let mut input_eqs = input.equivalence_properties();
input_eqs.substitute_oeq_class(&expr, &projection_mapping);
//println!("input_eqs is {:?}", input_eqs);

input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());

let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();

Expand Down Expand Up @@ -204,7 +204,11 @@ impl ExecutionPlan for ProjectionExec {

fn equivalence_properties(&self) -> EquivalenceProperties {
let mut equi_properties = self.input.equivalence_properties();
equi_properties.substitute_oeq_class(&self.expr, &self.projection_mapping);
equi_properties.substitute_oeq_class(
&self.expr,
&self.projection_mapping,
self.input.schema().clone(),
);
equi_properties.project(&self.projection_mapping, self.schema())
}

Expand Down
18 changes: 15 additions & 3 deletions datafusion/sqllogictest/test_files/monotonic_projection_test.slt
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,23 @@ physical_plan
ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

# test for cast different type
query error DataFusion error: This feature is not implemented: Unsupported SQL type Custom\(ObjectName\(\[Ident \{ value: "UTF8", quote_style: None \}\]\), \[\]\)

# test for cast Utf8
query TT
EXPLAIN
SELECT
CAST(c_customer_sk AS UTF8) AS c_customer_sk_big,
CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

0 comments on commit 9b912a7

Please sign in to comment.