diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 9e2620d64df9fd..2cedc2677c25db 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -66,7 +66,7 @@ Status PartitionSorter::prepare_for_read() { auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); + priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description)); } blocks.clear(); return Status::OK(); @@ -102,10 +102,6 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { auto& priority_queue = _state->get_priority_queue(); - if (priority_queue.empty()) { - *eos = true; - return Status::OK(); - } const auto& sorted_block = priority_queue.top().impl->block; size_t num_columns = sorted_block->columns(); MutableBlock m_block = @@ -114,7 +110,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int size_t current_output_rows = 0; bool get_enough_data = false; - while (!priority_queue.empty()) { + while (priority_queue.size() > 1 && current_output_rows < batch_size && !get_enough_data) { auto current = priority_queue.top(); priority_queue.pop(); if (UNLIKELY(_previous_row->impl == nullptr)) { @@ -150,7 +146,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } else { bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (cmp_res == false) { + if (!cmp_res) { _output_distinct_rows++; //need rows++ firstly if (_output_distinct_rows >= _partition_inner_limit) { get_enough_data = true; @@ -176,7 +172,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (cmp_res == false) { + if (!cmp_res) { //here must be check distinct of two rows, and then check nums of row if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) { get_enough_data = true; @@ -198,14 +194,10 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int current->next(); priority_queue.push(current); } - - if (current_output_rows == batch_size || get_enough_data == true) { - break; - } } _output_total_rows += output_block->rows(); - if (current_output_rows == 0 || get_enough_data == true) { + if (get_enough_data) { *eos = true; } return Status::OK();