Skip to content

Commit

Permalink
update partition_sorter
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Dec 25, 2024
1 parent f58a36e commit 74d637a
Showing 1 changed file with 5 additions and 13 deletions.
18 changes: 5 additions & 13 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Status PartitionSorter::prepare_for_read() {
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description));
}
blocks.clear();
return Status::OK();
Expand Down Expand Up @@ -102,10 +102,6 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
auto& priority_queue = _state->get_priority_queue();
if (priority_queue.empty()) {
*eos = true;
return Status::OK();
}
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
MutableBlock m_block =
Expand All @@ -114,7 +110,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
size_t current_output_rows = 0;

bool get_enough_data = false;
while (!priority_queue.empty()) {
while (priority_queue.size() > 1 && current_output_rows < batch_size && !get_enough_data) {
auto current = priority_queue.top();
priority_queue.pop();
if (UNLIKELY(_previous_row->impl == nullptr)) {
Expand Down Expand Up @@ -150,7 +146,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
} else {
bool cmp_res = _previous_row->compare_two_rows(current);
//get a distinct row
if (cmp_res == false) {
if (!cmp_res) {
_output_distinct_rows++; //need rows++ firstly
if (_output_distinct_rows >= _partition_inner_limit) {
get_enough_data = true;
Expand All @@ -176,7 +172,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
bool cmp_res = _previous_row->compare_two_rows(current);
//get a distinct row
if (cmp_res == false) {
if (!cmp_res) {
//here must be check distinct of two rows, and then check nums of row
if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) {
get_enough_data = true;
Expand All @@ -198,14 +194,10 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
current->next();
priority_queue.push(current);
}

if (current_output_rows == batch_size || get_enough_data == true) {
break;
}
}

_output_total_rows += output_block->rows();
if (current_output_rows == 0 || get_enough_data == true) {
if (get_enough_data) {
*eos = true;
}
return Status::OK();
Expand Down

0 comments on commit 74d637a

Please sign in to comment.