Skip to content

Commit

Permalink
Fix fmt and logical merge
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 28, 2023
1 parent 579cbd2 commit d819ee6
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 6 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec {

// Get existing ordering:
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
// Merge streams (while preserving ordering) coming from input partitions to this partition:

// Merge streams (while preserving ordering) coming from
// input partitions to this partition:
let fetch = None;
let merge_reservation =
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
.register(context.memory_pool());
streaming_merge(
input_streams,
self.schema(),
sort_exprs,
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
None,
fetch,
merge_reservation,
)
} else {
Ok(Box::pin(RepartitionStream {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::physical_plan::{
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::any::Any;
use std::sync::Arc;

use datafusion_execution::memory_pool::MemoryConsumer;
use crate::physical_plan::common::spawn_buffered;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
Expand All @@ -31,6 +30,7 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};
use datafusion_execution::memory_pool::MemoryConsumer;

use arrow::datatypes::SchemaRef;
use datafusion_common::{DataFusionError, Result};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use arrow::array::Array;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::stream::{Fuse, StreamExt};
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down

0 comments on commit d819ee6

Please sign in to comment.