Skip to content

Commit

Permalink
Minor: Avoid emitting empty batches in partial sort (#13895)
Browse files Browse the repository at this point in the history
* Update partial_sort.rs

* Update partial_sort.rs

* Update partial_sort.rs
  • Loading branch information
berkaysynnada authored Dec 25, 2024
1 parent 3864b11 commit 7b4e559
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,29 +366,41 @@ impl PartialSortStream {
return Poll::Ready(None);
}
loop {
return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if let Some(slice_point) =
self.get_slice_point(self.common_prefix_length, &batch)?
{
self.in_mem_batches.push(batch.slice(0, slice_point));
let remaining_batch =
batch.slice(slice_point, batch.num_rows() - slice_point);
// Extract the sorted batch
let sorted_batch = self.sort_in_mem_batches();
// Refill with the remaining batch
self.in_mem_batches.push(remaining_batch);
sorted_batch

debug_assert!(sorted_batch
.as_ref()
.map(|batch| batch.num_rows() > 0)
.unwrap_or(true));
Some(sorted_batch)
} else {
self.in_mem_batches.push(batch);
continue;
}
}
Some(Err(e)) => Err(e),
Some(Err(e)) => Some(Err(e)),
None => {
self.is_closed = true;
// once input is consumed, sort the rest of the inserted batches
self.sort_in_mem_batches()
let remaining_batch = self.sort_in_mem_batches()?;
if remaining_batch.num_rows() > 0 {
Some(Ok(remaining_batch))
} else {
None
}
}
}));
});
}
}

Expand All @@ -409,9 +421,6 @@ impl PartialSortStream {
self.is_closed = true;
}
}
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
debug_assert!(result.num_rows() > 0);
Ok(result)
}

Expand Down

0 comments on commit 7b4e559

Please sign in to comment.