Skip to content

Commit

Permalink
Account for constant equivalence properties in union, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 30, 2024
1 parent f1aa27f commit 23cfc58
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 392 deletions.
32 changes: 20 additions & 12 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,34 @@ impl PhysicalSortExpr {
}

/// Set the sort sort options to ASC
pub fn asc(mut self) -> Self {
self.options.descending = false;
self
pub fn asc(self) -> Self {
self.with_descending(false)
}

/// Set the sort sort options to DESC
pub fn desc(mut self) -> Self {
self.options.descending = true;
self
pub fn desc(self) -> Self {
self.with_descending(true)
}

/// Set the sort sort options to NULLS FIRST
pub fn nulls_first(mut self) -> Self {
self.options.nulls_first = true;
/// set the sort options `descending` flag
pub fn with_descending(mut self, descending: bool) -> Self {
self.options.descending = descending;
self
}

/// Set the sort sort options to NULLS LAST
pub fn nulls_last(mut self) -> Self {
self.options.nulls_first = false;
/// Set the sort options to NULLS FIRST
pub fn nulls_first(self) -> Self {
self.with_nulls_first(true)
}

/// Set the sort options to NULLS LAST
pub fn nulls_last(self) -> Self {
self.with_nulls_first(false)
}

/// set the sort options `nulls_first` flag
pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
self.options.nulls_first = nulls_first;
self
}
}
Expand Down
25 changes: 21 additions & 4 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
///
/// The `ConstExpr` struct encapsulates an expression that is constant during the execution
Expand All @@ -41,9 +40,10 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
///
/// - `expr`: Constant expression for a node in the physical plan.
///
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
/// - `across_partitions`: A boolean flag indicating whether the constant
/// expression is the same across partitions. If set to `true`, the constant
/// expression has same value for all partitions. If set to `false`, the
/// constant expression may have different values for different partitions.
///
/// # Example
///
Expand All @@ -56,11 +56,22 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
/// // create a constant expression from a physical expression
/// let const_expr = ConstExpr::from(col);
/// ```
#[derive(Debug, Clone)]
pub struct ConstExpr {
/// The expression that is known to be constant (e.g. a `Column`)
expr: Arc<dyn PhysicalExpr>,
/// Does the constant have the same value across all partitions? See
/// struct docs for more details
across_partitions: bool,
}

impl PartialEq for ConstExpr {
fn eq(&self, other: &Self) -> bool {
self.across_partitions == other.across_partitions
&& self.expr.eq(other.expr.as_any())
}
}

impl ConstExpr {
/// Create a new constant expression from a physical expression.
///
Expand All @@ -74,11 +85,17 @@ impl ConstExpr {
}
}

/// Set the `across_partitions` flag
///
/// See struct docs for more details
pub fn with_across_partitions(mut self, across_partitions: bool) -> Self {
self.across_partitions = across_partitions;
self
}

/// Is the expression the same across all partitions?
///
/// See struct docs for more details
pub fn across_partitions(&self) -> bool {
self.across_partitions
}
Expand Down
25 changes: 17 additions & 8 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;

use crate::equivalence::add_offset_to_expr;
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow_schema::SortOptions;
use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;
use std::vec::IntoIter;

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
Expand All @@ -36,15 +36,15 @@ use arrow_schema::SortOptions;
///
/// Here, both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the table
/// ordering. In this case, we say that these orderings are equivalent.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct OrderingEquivalenceClass {
pub orderings: Vec<LexOrdering>,
}

impl OrderingEquivalenceClass {
/// Creates new empty ordering equivalence class.
pub fn empty() -> Self {
Self { orderings: vec![] }
Default::default()
}

/// Clears (empties) this ordering equivalence class.
Expand Down Expand Up @@ -197,6 +197,15 @@ impl OrderingEquivalenceClass {
}
}

impl IntoIterator for OrderingEquivalenceClass {
type Item = LexOrdering;
type IntoIter = IntoIter<LexOrdering>;

fn into_iter(self) -> Self::IntoIter {
self.orderings.into_iter()
}
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
Expand Down Expand Up @@ -229,10 +238,10 @@ impl Display for OrderingEquivalenceClass {
write!(f, "[")?;
let mut iter = self.orderings.iter();
if let Some(ordering) = iter.next() {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
write!(f, "[{}]", PhysicalSortExpr::format_list(ordering))?;
}
for ordering in iter {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
write!(f, ", [{}]", PhysicalSortExpr::format_list(ordering))?;
}
write!(f, "]")?;
Ok(())
Expand Down
Loading

0 comments on commit 23cfc58

Please sign in to comment.