From accce9732e26723cab2ffc521edbf5a3fe7460b3 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Tue, 7 May 2024 21:08:45 +0300 Subject: [PATCH] Move transpose code to under common (#10409) --- datafusion/common/src/utils.rs | 25 +++++++++++++++++++ datafusion/physical-plan/src/common.rs | 25 ------------------- .../physical-plan/src/repartition/mod.rs | 2 +- .../src/windows/window_agg_exec.rs | 3 +-- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 33d7313aa0e0..a8ca283fcdd7 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -651,6 +651,22 @@ pub fn find_indices>( .ok_or_else(|| DataFusionError::Execution("Target not found".to_string())) } +/// Transposes the given vector of vectors. +pub fn transpose(original: Vec>) -> Vec> { + match original.as_slice() { + [] => vec![], + [first, ..] => { + let mut result = (0..first.len()).map(|_| vec![]).collect::>(); + for row in original { + for (item, transposed_row) in row.into_iter().zip(&mut result) { + transposed_row.push(item); + } + } + result + } + } +} + #[cfg(test)] mod tests { use crate::ScalarValue::Null; @@ -990,4 +1006,13 @@ mod tests { assert!(find_indices(&[0, 3, 4], [0, 2]).is_err()); Ok(()) } + + #[test] + fn test_transpose() -> Result<()> { + let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]]; + let transposed = transpose(in_data); + let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]]; + assert_eq!(expected, transposed); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index cdd122cf36fe..9e2216ae0a63 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -171,22 +171,6 @@ pub fn compute_record_batch_statistics( } } -/// Transposes the given vector of vectors. -pub fn transpose(original: Vec>) -> Vec> { - match original.as_slice() { - [] => vec![], - [first, ..] => { - let mut result = (0..first.len()).map(|_| vec![]).collect::>(); - for row in original { - for (item, transposed_row) in row.into_iter().zip(&mut result) { - transposed_row.push(item); - } - } - result - } - } -} - /// Calculates the "meet" of given orderings. /// The meet is the finest ordering that satisfied by all the given /// orderings, see . @@ -703,13 +687,4 @@ mod tests { assert_eq!(actual, expected); Ok(()) } - - #[test] - fn test_transpose() -> Result<()> { - let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]]; - let transposed = transpose(in_data); - let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]]; - assert_eq!(expected, transposed); - Ok(()) - } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b6554f46cf78..e31fdc6ee2c2 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -29,7 +29,6 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use crate::common::transpose; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::repartition::distributor_channels::{ @@ -42,6 +41,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Stat use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion_common::utils::transpose; use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 46ba21bd797e..1507902c22ea 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -22,7 +22,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::common::transpose; use crate::expressions::PhysicalSortExpr; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ @@ -41,7 +40,7 @@ use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::utils::evaluate_partition_ranges; +use datafusion_common::utils::{evaluate_partition_ranges, transpose}; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortRequirement;