From cc7319028607df2df4f45aaf17471c9d6868c6ad Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 24 Dec 2024 14:24:55 +0300 Subject: [PATCH 1/3] Update partial_sort.rs --- .../physical-plan/src/sorts/partial_sort.rs | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index f14ba6606e89..425cb2282fc7 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -366,7 +366,7 @@ impl PartialSortStream { return Poll::Ready(None); } loop { - return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) { + return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { if let Some(slice_point) = self.get_slice_point(self.common_prefix_length, &batch)? @@ -375,20 +375,33 @@ impl PartialSortStream { let remaining_batch = batch.slice(slice_point, batch.num_rows() - slice_point); let sorted_batch = self.sort_in_mem_batches(); - self.in_mem_batches.push(remaining_batch); - sorted_batch + if sorted_batch + .as_ref() + .map(|batch| batch.num_rows() > 0) + .unwrap_or(true) + { + self.in_mem_batches.push(remaining_batch); + Some(sorted_batch) + } else { + None + } } else { self.in_mem_batches.push(batch); continue; } } - Some(Err(e)) => Err(e), + Some(Err(e)) => Some(Err(e)), None => { self.is_closed = true; // once input is consumed, sort the rest of the inserted batches - self.sort_in_mem_batches() + let remaining_batch = self.sort_in_mem_batches()?; + if remaining_batch.num_rows() > 0 { + Some(Ok(remaining_batch)) + } else { + None + } } - })); + }); } } @@ -409,9 +422,6 @@ impl PartialSortStream { self.is_closed = true; } } - // Empty record batches should not be emitted. - // They need to be treated as [`Option`]es and handle separately - debug_assert!(result.num_rows() > 0); Ok(result) } From 4d87779472956fdc2a13ad8eca8c7dae631c1112 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 24 Dec 2024 14:44:07 +0300 Subject: [PATCH 2/3] Update partial_sort.rs --- datafusion/physical-plan/src/sorts/partial_sort.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 425cb2282fc7..d3276e66964c 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -374,17 +374,14 @@ impl PartialSortStream { self.in_mem_batches.push(batch.slice(0, slice_point)); let remaining_batch = batch.slice(slice_point, batch.num_rows() - slice_point); + self.in_mem_batches.push(remaining_batch); + let sorted_batch = self.sort_in_mem_batches(); - if sorted_batch + debug_assert!(sorted_batch .as_ref() .map(|batch| batch.num_rows() > 0) - .unwrap_or(true) - { - self.in_mem_batches.push(remaining_batch); - Some(sorted_batch) - } else { - None - } + .unwrap_or(true)); + Some(sorted_batch) } else { self.in_mem_batches.push(batch); continue; From 2d7d793f540b156f57c6ece26f30a7aaa03a58c9 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 24 Dec 2024 15:04:54 +0300 Subject: [PATCH 3/3] Update partial_sort.rs --- datafusion/physical-plan/src/sorts/partial_sort.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index d3276e66964c..c838376a482e 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -374,9 +374,11 @@ impl PartialSortStream { self.in_mem_batches.push(batch.slice(0, slice_point)); let remaining_batch = batch.slice(slice_point, batch.num_rows() - slice_point); + // Extract the sorted batch + let sorted_batch = self.sort_in_mem_batches(); + // Refill with the remaining batch self.in_mem_batches.push(remaining_batch); - let sorted_batch = self.sort_in_mem_batches(); debug_assert!(sorted_batch .as_ref() .map(|batch| batch.num_rows() > 0)