From 2fe21c8370812a8f5cebd2750c7561bd31763f4b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Sep 2024 17:03:18 -0400 Subject: [PATCH] Account for constant equivalence properties in union, tests --- .../src/equivalence/properties.rs | 728 +++++++++--------- datafusion/sqllogictest/test_files/order.slt | 59 ++ 2 files changed, 437 insertions(+), 350 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index c260a8314f972..2e47edcb3278f 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -2707,379 +2707,397 @@ mod tests { )) } - #[tokio::test] - async fn test_union_equivalence_properties_multi_children() -> Result<()> { - let schema = create_test_schema()?; + #[test] + fn test_union_equivalence_properties_multi_children_1() { + let schema = create_test_schema().unwrap(); let schema2 = append_fields(&schema, "1"); let schema3 = append_fields(&schema, "2"); - let test_cases = vec![ - // --------- TEST CASE 1 ---------- - ( + UnionEquivalenceTest::new(&schema) + // Children 1 + .with_child_sort(vec![vec!["a", "b", "c"]], &schema) + // Children 2 + .with_child_sort(vec![vec!["a1", "b1", "c1"]], &schema2) + // Children 3 + .with_child_sort(vec![vec!["a2", "b2"]], &schema3) + .with_expected_sort(vec![vec!["a", "b"]]) + .run() + } + + #[test] + fn test_union_equivalence_properties_multi_children_2() { + let schema = create_test_schema().unwrap(); + let schema2 = append_fields(&schema, "1"); + let schema3 = append_fields(&schema, "2"); + UnionEquivalenceTest::new(&schema) + // Children 1 + .with_child_sort(vec![vec!["a", "b", "c"]], &schema) + // Children 2 + .with_child_sort(vec![vec!["a1", "b1", "c1"]], &schema2) + // Children 3 + .with_child_sort(vec![vec!["a2", "b2", "c2"]], &schema3) + .with_expected_sort(vec![vec!["a", "b", "c"]]) + .run() + } + + #[test] + fn test_union_equivalence_properties_multi_children_3() { + let schema = create_test_schema().unwrap(); + let schema2 = append_fields(&schema, "1"); + let schema3 = append_fields(&schema, "2"); + UnionEquivalenceTest::new(&schema) + // Children 1 + .with_child_sort(vec![vec!["a", "b"]], &schema) + // Children 2 + .with_child_sort(vec![vec!["a1", "b1", "c1"]], &schema2) + // Children 3 + .with_child_sort(vec![vec!["a2", "b2", "c2"]], &schema3) + .with_expected_sort(vec![vec!["a", "b"]]) + .run() + } + + #[test] + fn test_union_equivalence_properties_multi_children_4() { + let schema = create_test_schema().unwrap(); + let schema2 = append_fields(&schema, "1"); + let schema3 = append_fields(&schema, "2"); + UnionEquivalenceTest::new(&schema) + // Children 1 + .with_child_sort(vec![vec!["a", "b"]], &schema) + // Children 2 + .with_child_sort(vec![vec!["a1", "b1"]], &schema2) + // Children 3 + .with_child_sort(vec![vec!["b2", "c2"]], &schema3) + .with_expected_sort(vec![]) + .run() + } + + #[test] + fn test_union_equivalence_properties_multi_children_5() { + let schema = create_test_schema().unwrap(); + let schema2 = append_fields(&schema, "1"); + UnionEquivalenceTest::new(&schema) + // Children 1 + .with_child_sort(vec![vec!["a", "b"], vec!["c"]], &schema) + // Children 2 + .with_child_sort(vec![vec!["a1", "b1"], vec!["c1"]], &schema2) + .with_expected_sort(vec![vec!["a", "b"], vec!["c"]]) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_1() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child orderings: vec![ - // Children 1 - ( - // Orderings - vec![vec!["a", "b", "c"]], - Arc::clone(&schema), - ), - // Children 2 - ( - // Orderings - vec![vec!["a1", "b1", "c1"]], - Arc::clone(&schema2), - ), - // Children 3 - ( - // Orderings - vec![vec!["a2", "b2"]], - Arc::clone(&schema3), - ), + // [a ASC] + vec!["a"], ], - // Expected - vec![vec!["a", "b"]], - ), - // --------- TEST CASE 2 ---------- - ( + // First child constants + vec!["b", "c"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings vec![ - // Children 1 - ( - // Orderings - vec![vec!["a", "b", "c"]], - Arc::clone(&schema), - ), - // Children 2 - ( - // Orderings - vec![vec!["a1", "b1", "c1"]], - Arc::clone(&schema2), - ), - // Children 3 - ( - // Orderings - vec![vec!["a2", "b2", "c2"]], - Arc::clone(&schema3), - ), + // [b ASC] + vec!["b"], ], - // Expected - vec![vec!["a", "b", "c"]], - ), - // --------- TEST CASE 3 ---------- - ( + // Second child constants + vec!["a", "c"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union expected orderings vec![ - // Children 1 - ( - // Orderings - vec![vec!["a", "b"]], - Arc::clone(&schema), - ), - // Children 2 - ( - // Orderings - vec![vec!["a1", "b1", "c1"]], - Arc::clone(&schema2), - ), - // Children 3 - ( - // Orderings - vec![vec!["a2", "b2", "c2"]], - Arc::clone(&schema3), - ), + // [a ASC] + vec!["a"], + // [b ASC] + vec!["b"], ], - // Expected - vec![vec!["a", "b"]], - ), - // --------- TEST CASE 4 ---------- - ( + // Union + vec!["c"], + ) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_2() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC] + .with_child_sort_and_const_exprs( + // First child orderings vec![ - // Children 1 - ( - // Orderings - vec![vec!["a", "b"]], - Arc::clone(&schema), - ), - // Children 2 - ( - // Orderings - vec![vec!["a1", "b1"]], - Arc::clone(&schema2), - ), - // Children 3 - ( - // Orderings - vec![vec!["b2", "c2"]], - Arc::clone(&schema3), - ), + // [a ASC] + vec!["a"], ], - // Expected + // No constant vec![], - ), - // --------- TEST CASE 5 ---------- - ( + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings vec![ - // Children 1 - ( - // Orderings - vec![vec!["a", "b"], vec!["c"]], - Arc::clone(&schema), - ), - // Children 2 - ( - // Orderings - vec![vec!["a1", "b1"], vec!["c1"]], - Arc::clone(&schema2), - ), + // [a ASC, b ASC] + vec!["a", "b"], ], - // Expected - vec![vec!["a", "b"], vec!["c"]], - ), - ]; - for (children, expected) in test_cases { - let children_eqs = children - .iter() - .map(|(orderings, schema)| { - let orderings = orderings - .iter() - .map(|ordering| { - ordering - .iter() - .map(|name| PhysicalSortExpr { - expr: col(name, schema).unwrap(), - options: SortOptions::default(), - }) - .collect::>() - }) - .collect::>(); - EquivalenceProperties::new_with_orderings( - Arc::clone(schema), - &orderings, - ) - }) - .collect::>(); - let actual = calculate_union(children_eqs, Arc::clone(&schema))?; + // No constant + vec![], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings + vec![ + // [a ASC] + vec!["a"], + ], + // No constant + vec![], + ) + .run() + } - let expected_ordering = expected - .into_iter() - .map(|ordering| { - ordering - .into_iter() - .map(|name| PhysicalSortExpr { - expr: col(name, &schema).unwrap(), - options: SortOptions::default(), - }) - .collect::>() - }) - .collect::>(); - let expected = EquivalenceProperties::new_with_orderings( - Arc::clone(&schema), - &expected_ordering, - ); - assert_eq_properties_same( - &actual, - &expected, - format!("expected: {:?}, actual: {:?}", expected, actual), - ); - } - Ok(()) + #[test] + fn test_union_equivalence_properties_constants_3() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + // Meet ordering between [a ASC], [a DESC] should be [] + .with_child_sort_and_const_exprs( + // First child orderings + vec![ + // [a ASC] + vec!["a"], + ], + // No constant + vec![], + &schema, + ) + // switch to DESC + .with_sort_options(!SortOptions::default()) + .with_child_sort_and_const_exprs( + // Second child orderings + vec![ + // [a DESC] + vec!["a"], + ], + // No constant + vec![], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union doesn't have any ordering + vec![], + // No constant + vec![], + ) + .run() } - #[tokio::test] - async fn test_union_equivalence_properties_binary() -> Result<()> { - let schema = create_test_schema()?; + #[test] + fn test_union_equivalence_properties_constants_4() { + let schema = create_test_schema().unwrap(); let schema2 = append_fields(&schema, "1"); - let col_a = &col("a", &schema)?; - let col_b = &col("b", &schema)?; - let col_c = &col("c", &schema)?; - let col_a1 = &col("a1", &schema2)?; - let col_b1 = &col("b1", &schema2)?; - let options = SortOptions::default(); - let options_desc = !SortOptions::default(); - let test_cases = [ - //-----------TEST CASE 1----------// - ( - ( - // First child orderings - vec![ - // [a ASC] - (vec![(col_a, options)]), - ], - // First child constants - vec![col_b, col_c], - Arc::clone(&schema), - ), - ( - // Second child orderings - vec![ - // [b ASC] - (vec![(col_b, options)]), - ], - // Second child constants - vec![col_a, col_c], - Arc::clone(&schema), - ), - ( - // Union expected orderings - vec![ - // [a ASC] - vec![(col_a, options)], - // [b ASC] - vec![(col_b, options)], - ], - // Union - vec![col_c], - ), - ), - //-----------TEST CASE 2----------// - // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC] - ( - ( - // First child orderings - vec![ - // [a ASC] - vec![(col_a, options)], - ], - // No constant - vec![], - Arc::clone(&schema), - ), - ( - // Second child orderings - vec![ - // [a ASC, b ASC] - vec![(col_a, options), (col_b, options)], - ], - // No constant - vec![], - Arc::clone(&schema), - ), - ( - // Union orderings - vec![ - // [a ASC] - vec![(col_a, options)], - ], - // No constant - vec![], - ), - ), - //-----------TEST CASE 3----------// - // Meet ordering between [a ASC], [a DESC] should be [] - ( - ( - // First child orderings - vec![ - // [a ASC] - vec![(col_a, options)], - ], - // No constant - vec![], - Arc::clone(&schema), - ), - ( - // Second child orderings - vec![ - // [a DESC] - vec![(col_a, options_desc)], - ], - // No constant - vec![], - Arc::clone(&schema), - ), - ( - // Union doesn't have any ordering - vec![], - // No constant - vec![], - ), - ), - //-----------TEST CASE 4----------// + UnionEquivalenceTest::new(&schema) // Meet ordering between [a ASC], [a1 ASC, b1 ASC] should be [a ASC] // Where a, and a1 ath the same index for their corresponding schemas. - ( - ( - // First child orderings - vec![ - // [a ASC] - vec![(col_a, options)], - ], - // No constant - vec![], - Arc::clone(&schema), - ), - ( - // Second child orderings - vec![ - // [a1 ASC, b1 ASC] - vec![(col_a1, options), (col_b1, options)], - ], - // No constant - vec![], - Arc::clone(&schema2), - ), - ( - // Union orderings - vec![ - // [a ASC] - vec![(col_a, options)], - ], - // No constant - vec![], - ), - ), - ]; + .with_child_sort_and_const_exprs( + // First child orderings + vec![ + // [a ASC] + vec!["a"], + ], + // No constant + vec![], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings + vec![ + // [a1 ASC, b1 ASC] + vec!["a1", "b1"], + ], + // No constant + vec![], + &schema2, + ) + .with_expected_sort_and_const_exprs( + // Union orderings + vec![ + // [a ASC] + vec!["a"], + ], + // No constant + vec![], + ) + .run() + } - for ( - test_idx, - ( - (first_child_orderings, first_child_constants, first_schema), - (second_child_orderings, second_child_constants, second_schema), - (union_orderings, union_constants), - ), - ) in test_cases.iter().enumerate() - { - let first_orderings = first_child_orderings - .iter() - .map(|ordering| convert_to_sort_exprs(ordering)) - .collect::>(); - let first_constants = first_child_constants - .iter() - .map(|expr| ConstExpr::new(Arc::clone(expr))) - .collect::>(); - let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema)); - lhs = lhs.with_constants(first_constants); - lhs.add_new_orderings(first_orderings); + #[test] + fn test_union_equivalence_properties_constants_5() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + // Meet ordering for [a, c] (b constant) + // and [b, c] (a constant) + .with_child_sort_and_const_exprs( + // First child orderings + vec![ + // [a ASC, c ASC] + vec!["a", "c"], + ], + // constants + vec!["b"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings + vec![ + // [b ASC, c ASC] + vec!["b", "c"], + ], + // Constant "a" + vec!["a"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings + vec![ + // [a ASC, b ASC, c ASC] + vec!["a", "b", "c"], + // [b ASC, a ASC, c ASC] + ], + // No constant + vec![], + ) + .run() + } - let second_orderings = second_child_orderings - .iter() - .map(|ordering| convert_to_sort_exprs(ordering)) - .collect::>(); - let second_constants = second_child_constants - .iter() - .map(|expr| ConstExpr::new(Arc::clone(expr))) - .collect::>(); - let mut rhs = EquivalenceProperties::new(Arc::clone(second_schema)); - rhs = rhs.with_constants(second_constants); - rhs.add_new_orderings(second_orderings); + #[derive(Debug)] + struct UnionEquivalenceTest { + /// The schema of the output of the Union + output_schema: SchemaRef, + /// The sort options used to create any subsequent Sort expressions + current_sort_options: SortOptions, + /// The equivalence properties of each child to the union + child_properties: Vec, + /// The expected output properties of the union. Must be set before + /// running `build` + expected_properties: Option, + } + + impl UnionEquivalenceTest { + fn new(output_schema: &SchemaRef) -> Self { + Self { + output_schema: Arc::clone(output_schema), + current_sort_options: SortOptions::default(), + child_properties: vec![], + expected_properties: None, + } + } + + /// Override the sort options to use when creating subsequent sort expressions + fn with_sort_options(mut self, sort_options: SortOptions) -> Self { + self.current_sort_options = sort_options; + self + } + + /// Add a union input with the specified orderings + fn with_child_sort( + mut self, + orderings: Vec>, + schema: &SchemaRef, + ) -> Self { + let properties = self.make_props(orderings, vec![], schema); + self.child_properties.push(properties); + self + } + + /// Add a union input with the specified orderings and constant + /// equivalences + fn with_child_sort_and_const_exprs( + mut self, + orderings: Vec>, + constants: Vec<&'static str>, + schema: &SchemaRef, + ) -> Self { + let properties = self.make_props(orderings, constants, schema); + self.child_properties.push(properties); + self + } + + /// Set the expected output sort order for the union of the children + fn with_expected_sort(mut self, orderings: Vec>) -> Self { + let properties = self.make_props(orderings, vec![], &self.output_schema); + self.expected_properties = Some(properties); + self + } + + /// Set the expected output sort order and constant expressions for the + /// union of the children + fn with_expected_sort_and_const_exprs( + mut self, + orderings: Vec>, + constants: Vec<&'static str>, + ) -> Self { + let properties = self.make_props(orderings, constants, &self.output_schema); + self.expected_properties = Some(properties); + self + } + + /// compute the union's output equivalence properties from the child + /// properties, and compare them to the expected properties + fn run(self) { + let Self { + output_schema, + current_sort_options: _, + child_properties, + expected_properties, + } = self; + let expected_properties = + expected_properties.expect("expected_properties not set"); + let actual_properties = + calculate_union(child_properties, Arc::clone(&output_schema)) + .expect("failed to calculate union equivalence properties"); + assert_eq_properties_same( + &actual_properties, + &expected_properties, + format!( + "expected: {expected_properties:?}, actual: {actual_properties:?}" + ), + ); + } - let union_expected_orderings = union_orderings + /// Make equivalence properties for the specified columns named in orderings and constnats + fn make_props( + &self, + orderings: Vec>, + constants: Vec<&'static str>, + schema: &SchemaRef, + ) -> EquivalenceProperties { + let orderings = orderings .iter() - .map(|ordering| convert_to_sort_exprs(ordering)) + .map(|ordering| { + ordering + .iter() + .map(|name| { + PhysicalSortExpr::new( + col(name, schema).unwrap(), + self.current_sort_options, + ) + }) + .collect::>() + }) .collect::>(); - let union_constants = union_constants + + let constants = constants .iter() - .map(|expr| ConstExpr::new(Arc::clone(expr))) + .map(|col_name| ConstExpr::new(col(col_name, schema).unwrap())) .collect::>(); - let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); - union_expected_eq = union_expected_eq.with_constants(union_constants); - union_expected_eq.add_new_orderings(union_expected_orderings); - let actual_union_eq = calculate_union_binary(lhs, rhs)?; - let err_msg = format!( - "Error in test id: {:?}, test case: {:?}", - test_idx, test_cases[test_idx] - ); - assert_eq_properties_same(&actual_union_eq, &union_expected_eq, err_msg); + EquivalenceProperties::new_with_orderings(Arc::clone(schema), &orderings) + .with_constants(constants) } - Ok(()) } fn assert_eq_properties_same( @@ -3090,21 +3108,31 @@ mod tests { // Check whether constants are same let lhs_constants = lhs.constants(); let rhs_constants = rhs.constants(); - assert_eq!(lhs_constants.len(), rhs_constants.len(), "{}", err_msg); + assert_eq!( + lhs_constants.len(), + rhs_constants.len(), + "{err_msg}\nlhs: {lhs}\nrhs: {rhs}" + ); for rhs_constant in rhs_constants { assert!( const_exprs_contains(lhs_constants, rhs_constant.expr()), - "{}", - err_msg + "{err_msg}\nlhs: {lhs}\nrhs: {rhs}" ); } // Check whether orderings are same. let lhs_orderings = lhs.oeq_class(); let rhs_orderings = &rhs.oeq_class.orderings; - assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg); + assert_eq!( + lhs_orderings.len(), + rhs_orderings.len(), + "{err_msg}\nlhs: {lhs}\nrhs: {rhs}" + ); for rhs_ordering in rhs_orderings { - assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg); + assert!( + lhs_orderings.contains(rhs_ordering), + "{err_msg}\nlhs: {lhs}\nrhs: {rhs}" + ); } } } diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f5..a85d79544b5a4 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -965,6 +965,21 @@ drop table foo; statement ok drop table ambiguity_test; +## reproducer for https://github.com/apache/datafusion/issues/12446 +# Ensure union ordering calculations with constants can be optimized + +statement ok +create table t(a0 int, a int, b int, c int) as values (1, 2, 3, 4), (5, 6, 7, 8); + +# expect this query to run successfully, not error +query II +select * from (select c, a, NULL::int as a0 from t order by a, c) t1 +union all +select * from (select c, NULL::int as a, a0 from t order by a0, c) t2 +order by c, a, a0, b +limit 2; + + # Casting from numeric to string types breaks the ordering statement ok CREATE EXTERNAL TABLE ordered_table ( @@ -1196,3 +1211,47 @@ physical_plan 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + + +# Test: inputs into union with different orderings +query TT +explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; +---- +logical_plan +01)Projection: t1.b, t1.c, t1.a, t1.a0 +02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2 +03)----Union +04)------SubqueryAlias: t1 +05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d +06)----------TableScan: ordered_table projection=[a, b, c, d] +07)------SubqueryAlias: t2 +08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d +09)----------TableScan: ordered_table projection=[a0, b, c, d] +physical_plan +01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0] +02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2 +03)----UnionExec +04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true +07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] +09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true + +# Test: run the query from above +query IIII +select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; +---- +0 0 0 NULL +0 0 NULL 1 + +statement ok +drop table ordered_table; \ No newline at end of file