Skip to content

Commit

Permalink
Move transpose code to under common (#10409)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored May 7, 2024
1 parent 161d0f2 commit accce97
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
25 changes: 25 additions & 0 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,22 @@ pub fn find_indices<T: PartialEq, S: Borrow<T>>(
.ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
}

/// Transposes the given vector of vectors.
pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
match original.as_slice() {
[] => vec![],
[first, ..] => {
let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
for row in original {
for (item, transposed_row) in row.into_iter().zip(&mut result) {
transposed_row.push(item);
}
}
result
}
}
}

#[cfg(test)]
mod tests {
use crate::ScalarValue::Null;
Expand Down Expand Up @@ -990,4 +1006,13 @@ mod tests {
assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
Ok(())
}

#[test]
fn test_transpose() -> Result<()> {
let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
let transposed = transpose(in_data);
let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
assert_eq!(expected, transposed);
Ok(())
}
}
25 changes: 0 additions & 25 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,6 @@ pub fn compute_record_batch_statistics(
}
}

/// Transposes the given vector of vectors.
pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
match original.as_slice() {
[] => vec![],
[first, ..] => {
let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
for row in original {
for (item, transposed_row) in row.into_iter().zip(&mut result) {
transposed_row.push(item);
}
}
result
}
}
}

/// Calculates the "meet" of given orderings.
/// The meet is the finest ordering that satisfied by all the given
/// orderings, see <https://en.wikipedia.org/wiki/Join_and_meet>.
Expand Down Expand Up @@ -703,13 +687,4 @@ mod tests {
assert_eq!(actual, expected);
Ok(())
}

#[test]
fn test_transpose() -> Result<()> {
let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
let transposed = transpose(in_data);
let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
assert_eq!(expected, transposed);
Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use crate::common::transpose;
use crate::hash_utils::create_hashes;
use crate::metrics::BaselineMetrics;
use crate::repartition::distributor_channels::{
Expand All @@ -42,6 +41,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Stat
use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::common::transpose;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
Expand All @@ -41,7 +40,7 @@ use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::utils::{evaluate_partition_ranges, transpose};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
Expand Down

0 comments on commit accce97

Please sign in to comment.