Skip to content

Commit

Permalink
using as_ref
Browse files Browse the repository at this point in the history
  • Loading branch information
jatin510 committed Nov 2, 2024
1 parent 815068e commit 79861e3
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 20 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
sort_order: sort_order.clone(),
sort_order: LexOrdering::from_ref(sort_order),
})
}

Expand Down
21 changes: 12 additions & 9 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn pushdown_requirement_to_children(
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
&parent_required_expr,
parent_required_expr.as_ref(),
JoinSide::Left,
),
Some(JoinSide::Right) => {
Expand Down Expand Up @@ -364,29 +364,32 @@ fn try_pushdown_requirements_to_join(
let mut smj_required_orderings = smj.required_input_ordering();
let right_requirement = smj_required_orderings.swap_remove(1);
let left_requirement = smj_required_orderings.swap_remove(0);
let left_ordering = smj.left().output_ordering().cloned().unwrap_or_default();
let right_ordering = smj.right().output_ordering().cloned().unwrap_or_default();
let left_ordering = &smj.left().output_ordering().cloned().unwrap_or_default();
let right_ordering = &smj.right().output_ordering().cloned().unwrap_or_default();

let (new_left_ordering, new_right_ordering) = match push_side {
JoinSide::Left => {
let left_eq_properties =
left_eq_properties.clone().with_reorder(sort_expr.clone());
let left_eq_properties = left_eq_properties
.clone()
.with_reorder(LexOrdering::from_ref(sort_expr));
if left_eq_properties
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(sort_expr, &right_ordering)
(sort_expr, right_ordering)
} else {
return Ok(None);
}
}
JoinSide::Right => {
let right_eq_properties =
right_eq_properties.clone().with_reorder(sort_expr.clone());
let right_eq_properties = right_eq_properties
.clone()
.with_reorder(LexOrdering::from_ref(sort_expr));
if right_eq_properties
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(&left_ordering, sort_expr)
(left_ordering, sort_expr)
} else {
return Ok(None);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg {
OrderSensitiveArrayAggAccumulator::try_new(
&data_type,
&ordering_dtypes,
acc_args.ordering_req.clone(),
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.is_reversed,
)
.map(|acc| Box::new(acc) as _)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue {
FirstValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
acc_args.ordering_req.clone(),
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
Expand Down Expand Up @@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue {
LastValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
acc_args.ordering_req.clone(),
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
Expand Down Expand Up @@ -647,7 +647,7 @@ impl Accumulator for LastValueAccumulator {
if compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
&get_sort_options(self.ordering_req.as_ref()),
)?
.is_lt()
{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg {
n,
&data_type,
&ordering_dtypes,
acc_args.ordering_req.clone(),
LexOrdering::from_ref(acc_args.ordering_req),
)
.map(|acc| Box::new(acc) as _)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl AggregateFunctionExpr {
}

if !self.order_sensitivity().is_insensitive() {
return Some(&self.ordering_req);
return Some(self.ordering_req.as_ref());
}

None
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl WindowExpr for BuiltInWindowExpr {
Arc::new(BuiltInWindowExpr::new(
reverse_expr,
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
reverse_order_bys(self.order_by.as_ref()).as_ref(),
Arc::new(self.window_frame.reverse()),
)) as _
})
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ pub fn get_finer_aggregate_exprs_requirement(
if let Some(finer_ordering) =
finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode)
{
if eq_properties.ordering_satisfy(&finer_ordering) {
if eq_properties.ordering_satisfy(finer_ordering.as_ref()) {
// Requirement is satisfied by existing ordering
requirement = finer_ordering;
continue;
Expand All @@ -1033,7 +1033,7 @@ pub fn get_finer_aggregate_exprs_requirement(
eq_properties,
agg_mode,
) {
if eq_properties.ordering_satisfy(&finer_ordering) {
if eq_properties.ordering_satisfy(finer_ordering.as_ref()) {
// Reverse requirement is satisfied by exiting ordering.
// Hence reverse the aggregator
requirement = finer_ordering;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ pub(crate) fn lexsort_to_indices_multi_columns(
///
/// Support sorting datasets that are larger than the memory allotted
/// by the memory manager, by spilling to disk.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SortExec {
/// Input schema
pub(crate) input: Arc<dyn ExecutionPlan>,
Expand Down

0 comments on commit 79861e3

Please sign in to comment.