diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index c47c9926819b3..c94d2f9b115fa 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering: let sort_exprs = self.input.output_ordering().unwrap_or(&[]); - // Merge streams (while preserving ordering) coming from input partitions to this partition: + + // Merge streams (while preserving ordering) coming from + // input partitions to this partition: + let fetch = None; + let merge_reservation = + MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())) + .register(context.memory_pool()); streaming_merge( input_streams, self.schema(), sort_exprs, BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), - None, + fetch, + merge_reservation, ) } else { Ok(Box::pin(RepartitionStream { diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs b/datafusion/core/src/physical_plan/sorts/builder.rs index 9deb161a84e16..95975b6e95743 100644 --- a/datafusion/core/src/physical_plan/sorts/builder.rs +++ b/datafusion/core/src/physical_plan/sorts/builder.rs @@ -18,8 +18,8 @@ use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_execution::memory_pool::MemoryReservation; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index ba52afa385021..ecd74f3a33421 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -30,8 +30,8 @@ use crate::physical_plan::{ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; -use datafusion_execution::memory_pool::MemoryReservation; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; use std::task::{ready, Context, Poll}; diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index d85858c03b455..6b978b5ee753d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -20,7 +20,6 @@ use std::any::Any; use std::sync::Arc; -use datafusion_execution::memory_pool::MemoryConsumer; use crate::physical_plan::common::spawn_buffered; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ @@ -31,6 +30,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::memory_pool::MemoryConsumer; use arrow::datatypes::SchemaRef; use datafusion_common::{DataFusionError, Result}; diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index f2d94fe583275..a2fa5a91569db 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -22,8 +22,8 @@ use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; -use datafusion_execution::memory_pool::MemoryReservation; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; use std::sync::Arc;