Skip to content

Commit

Permalink
Unbounded SortExec (and Top-K) Implementation When Req's Are Satisfied (
Browse files Browse the repository at this point in the history
apache#12174)

* Sort fetch updates execution mode

* Update sort.rs

* Update sort.rs

* Update sort.rs

* Update sort.rs

* Update sort.rs

* Apply suggestions from code review

* Update sort.rs

* Update datafusion/physical-plan/src/sorts/sort.rs

* Update datafusion/physical-plan/src/sorts/sort.rs

* Reuse LimitStream

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
berkaysynnada and ozankabak authored Aug 29, 2024
1 parent bd50698 commit f5dcdf0
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 89 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct BaselineMetrics {
}

impl BaselineMetrics {
/// Create a new BaselineMetric structure, and set `start_time` to now
/// Create a new BaselineMetric structure, and set `start_time` to now
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();
Expand Down
311 changes: 244 additions & 67 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::common::spawn_buffered;
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
Expand All @@ -51,6 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;

use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
Expand Down Expand Up @@ -737,9 +739,22 @@ impl SortExec {
/// This can reduce the memory pressure required by the sort
/// operation since rows that are not going to be included
/// can be dropped.
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
let mut cache = self.cache.clone();
if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded {
// When a theoretically unnecessary sort becomes a top-K (which
// sometimes arises as an intermediate state before full removal),
// its execution mode should become `Bounded`.
cache.execution_mode = ExecutionMode::Bounded;
}
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
fetch,
cache,
}
}

/// Input schema
Expand Down Expand Up @@ -775,6 +790,16 @@ impl SortExec {
sort_exprs: LexOrdering,
preserve_partitioning: bool,
) -> PlanProperties {
// Determine execution mode:
let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
);
let mode = match input.execution_mode() {
ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
ExecutionMode::Bounded => ExecutionMode::Bounded,
_ => ExecutionMode::PipelineBreaking,
};

// Calculate equivalence properties; i.e. reset the ordering equivalence
// class with the new ordering:
let eq_properties = input
Expand All @@ -786,14 +811,6 @@ impl SortExec {
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);

// Determine execution mode:
let mode = match input.execution_mode() {
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
ExecutionMode::PipelineBreaking
}
ExecutionMode::Bounded => ExecutionMode::Bounded,
};

PlanProperties::new(eq_properties, output_partitioning, mode)
}
}
Expand Down Expand Up @@ -874,53 +891,68 @@ impl ExecutionPlan for SortExec {

trace!("End SortExec's input.execute for partition: {}", partition);

if let Some(fetch) = self.fetch.as_ref() {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
} else {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
let sort_satisfied = self
.input
.equivalence_properties()
.ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
match (sort_satisfied, self.fetch.as_ref()) {
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
input,
0,
Some(*fetch),
BaselineMetrics::new(&self.metrics_set, partition),
))),
(true, None) => Ok(input),
(false, Some(fetch)) => {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
}
(false, None) => {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
}
}
}

Expand All @@ -933,14 +965,7 @@ impl ExecutionPlan for SortExec {
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
fetch: limit,
cache: self.cache.clone(),
}))
Some(Arc::new(SortExec::with_fetch(self, limit)))
}

fn fetch(&self) -> Option<usize> {
Expand All @@ -951,6 +976,8 @@ impl ExecutionPlan for SortExec {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};

use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -965,12 +992,124 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{assert_batches_eq, Result, ScalarValue};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::EquivalenceProperties;

use futures::{FutureExt, Stream};

#[derive(Debug, Clone)]
pub struct SortedUnboundedExec {
schema: Schema,
batch_size: u64,
cache: PlanProperties,
}

impl DisplayAs for SortedUnboundedExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnboundableExec",).unwrap()
}
}
Ok(())
}
}

impl SortedUnboundedExec {
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let mut eq_properties = EquivalenceProperties::new(schema);
eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)]]);
let mode = ExecutionMode::Unbounded;
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode)
}
}

impl ExecutionPlan for SortedUnboundedExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(SortedUnboundedStream {
schema: Arc::new(self.schema.clone()),
batch_size: self.batch_size,
offset: 0,
}))
}
}

#[derive(Debug)]
pub struct SortedUnboundedStream {
schema: SchemaRef,
batch_size: u64,
offset: u64,
}

use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::Literal;
use futures::FutureExt;
impl Stream for SortedUnboundedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let batch = SortedUnboundedStream::create_record_batch(
Arc::clone(&self.schema),
self.offset,
self.batch_size,
);
self.offset += self.batch_size;
Poll::Ready(Some(Ok(batch)))
}
}

impl RecordBatchStream for SortedUnboundedStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

impl SortedUnboundedStream {
fn create_record_batch(
schema: SchemaRef,
offset: u64,
batch_size: u64,
) -> RecordBatch {
let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
let array = UInt64Array::from(values);
let array_ref: ArrayRef = Arc::new(array);
RecordBatch::try_new(schema, vec![array_ref]).unwrap()
}
}

#[tokio::test]
async fn test_in_mem_sort() -> Result<()> {
Expand Down Expand Up @@ -1414,4 +1553,42 @@ mod tests {
let result = sort_batch(&batch, &expressions, None).unwrap();
assert_eq!(result.num_rows(), 1);
}

#[tokio::test]
async fn topk_unbounded_source() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
let source = SortedUnboundedExec {
schema: schema.clone(),
batch_size: 2,
cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
};
let mut plan = SortExec::new(
vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)],
Arc::new(source),
);
plan = plan.with_fetch(Some(9));

let batches = collect(Arc::new(plan), task_ctx).await?;
#[rustfmt::skip]
let expected = [
"+----+",
"| c1 |",
"+----+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"+----+",];
assert_batches_eq!(expected, &batches);
Ok(())
}
}
Loading

0 comments on commit f5dcdf0

Please sign in to comment.