From cf2a5d4c91597555aab1cf77926381c7de8759af Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 23 Dec 2024 20:39:27 +0800 Subject: [PATCH 1/5] some optimization for full sort --- be/src/vec/common/sort/sorter.cpp | 41 ++++++++++++++++++++++++++----- be/src/vec/common/sort/sorter.h | 10 ++------ be/src/vec/core/sort_cursor.h | 1 + 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 72bf35f3cbac3b..9f6f717d15dff0 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -120,13 +120,14 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!priority_queue_.empty()) { + while (priority_queue_.size() > 1) { auto current = priority_queue_.top(); priority_queue_.pop(); if (offset_ == 0) { - for (size_t i = 0; i < num_columns; ++i) + for (size_t i = 0; i < num_columns; ++i) { merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + } ++merged_rows; } else { offset_--; @@ -141,6 +142,31 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized break; } } + + if (merged_rows < batch_size && priority_queue_.size() == 1) { + auto current = priority_queue_.top(); + priority_queue_.pop(); + if (offset_) { + auto skip_step = std::min(current->remained_rows(), size_t(offset_)); + offset_ -= skip_step; + current->pos += skip_step; + } + + auto step = std::min(current->remained_rows(), size_t(batch_size - merged_rows)); + if (step) { + for (size_t i = 0; i < num_columns; ++i) { + merged_columns[i]->insert_range_from(*current->block->get_columns()[i], + current->pos, step); + } + merged_rows += step; + current->pos += step; + } + + if (current->remained_rows()) { + priority_queue_.push(current); + } + } + block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { @@ -217,9 +243,15 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); + + if (_reach_limit() && block->bytes() > _state->unsorted_block_->allocated_bytes() - + _state->unsorted_block_->bytes()) { + RETURN_IF_ERROR(_do_sort()); + } + { SCOPED_TIMER(_merge_block_timer); - auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); + const auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); const auto& arrival_data = block->get_columns_with_type_and_name(); auto sz = block->rows(); for (int i = 0; i < data.size(); ++i) { @@ -232,9 +264,6 @@ Status FullSorter::append_block(Block* block) { } block->clear_column_data(); } - if (_reach_limit()) { - RETURN_IF_ERROR(_do_sort()); - } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index aa7d88dfbc2a3a..36c535c9101db9 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -177,21 +177,15 @@ class FullSorter final : public Sorter { private: bool _reach_limit() { - return _state->unsorted_block_->rows() > buffered_block_size_ || - _state->unsorted_block_->bytes() > buffered_block_bytes_; + return _state->unsorted_block_->allocated_bytes() >= buffered_block_bytes_; } Status _do_sort(); std::unique_ptr _state; - static constexpr size_t INITIAL_BUFFERED_BLOCK_SIZE = 1024 * 1024; - static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20; + static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_SIZE = 4 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_BYTES = 256 << 20; - - size_t buffered_block_size_ = INITIAL_BUFFERED_BLOCK_SIZE; size_t buffered_block_bytes_ = INITIAL_BUFFERED_BLOCK_BYTES; }; diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d31767f46e461f..67d603533a102d 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -160,6 +160,7 @@ struct MergeSortCursorImpl { bool is_first() const { return pos == 0; } bool is_last() const { return pos + 1 >= rows; } void next() { ++pos; } + size_t remained_rows() const { return rows - pos; } virtual bool has_next_block() { return false; } virtual Block* block_ptr() { return nullptr; } From a18416c8b770221f2b63e643ccbe6013e1a03436 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 23 Dec 2024 22:18:51 +0800 Subject: [PATCH 2/5] update --- be/src/vec/common/sort/sorter.cpp | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 9f6f717d15dff0..34b5ef3dd26621 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -108,10 +108,6 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - if (priority_queue_.empty()) { - *eos = true; - return Status::OK(); - } size_t num_columns = priority_queue_.top().impl->block->columns(); MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( @@ -120,6 +116,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; + // process single element queue on merge_sort_read() while (priority_queue_.size() > 1) { auto current = priority_queue_.top(); priority_queue_.pop(); @@ -143,30 +140,6 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized } } - if (merged_rows < batch_size && priority_queue_.size() == 1) { - auto current = priority_queue_.top(); - priority_queue_.pop(); - if (offset_) { - auto skip_step = std::min(current->remained_rows(), size_t(offset_)); - offset_ -= skip_step; - current->pos += skip_step; - } - - auto step = std::min(current->remained_rows(), size_t(batch_size - merged_rows)); - if (step) { - for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_range_from(*current->block->get_columns()[i], - current->pos, step); - } - merged_rows += step; - current->pos += step; - } - - if (current->remained_rows()) { - priority_queue_.push(current); - } - } - block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { From f58a36ede7ae13e3b1c41c935c1db428750e9dc4 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 24 Dec 2024 16:28:51 +0800 Subject: [PATCH 3/5] update --- be/src/vec/common/sort/sorter.cpp | 12 +----------- be/src/vec/core/sort_cursor.h | 1 - 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 34b5ef3dd26621..4f7de1d379aea9 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -117,7 +117,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; // process single element queue on merge_sort_read() - while (priority_queue_.size() > 1) { + while (priority_queue_.size() > 1 && merged_rows < batch_size) { auto current = priority_queue_.top(); priority_queue_.pop(); @@ -134,19 +134,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized current->next(); priority_queue_.push(current); } - - if (merged_rows == batch_size) { - break; - } } block->set_columns(std::move(merged_columns)); - - if (merged_rows == 0) { - *eos = true; - return Status::OK(); - } - return Status::OK(); } diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 67d603533a102d..d31767f46e461f 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -160,7 +160,6 @@ struct MergeSortCursorImpl { bool is_first() const { return pos == 0; } bool is_last() const { return pos + 1 >= rows; } void next() { ++pos; } - size_t remained_rows() const { return rows - pos; } virtual bool has_next_block() { return false; } virtual Block* block_ptr() { return nullptr; } From 74d637aa11ae16631e444e35e1886087141268f8 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 25 Dec 2024 11:53:24 +0800 Subject: [PATCH 4/5] update partition_sorter --- be/src/vec/common/sort/partition_sorter.cpp | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 9e2620d64df9fd..2cedc2677c25db 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -66,7 +66,7 @@ Status PartitionSorter::prepare_for_read() { auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); + priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description)); } blocks.clear(); return Status::OK(); @@ -102,10 +102,6 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { auto& priority_queue = _state->get_priority_queue(); - if (priority_queue.empty()) { - *eos = true; - return Status::OK(); - } const auto& sorted_block = priority_queue.top().impl->block; size_t num_columns = sorted_block->columns(); MutableBlock m_block = @@ -114,7 +110,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int size_t current_output_rows = 0; bool get_enough_data = false; - while (!priority_queue.empty()) { + while (priority_queue.size() > 1 && current_output_rows < batch_size && !get_enough_data) { auto current = priority_queue.top(); priority_queue.pop(); if (UNLIKELY(_previous_row->impl == nullptr)) { @@ -150,7 +146,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } else { bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (cmp_res == false) { + if (!cmp_res) { _output_distinct_rows++; //need rows++ firstly if (_output_distinct_rows >= _partition_inner_limit) { get_enough_data = true; @@ -176,7 +172,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (cmp_res == false) { + if (!cmp_res) { //here must be check distinct of two rows, and then check nums of row if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) { get_enough_data = true; @@ -198,14 +194,10 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int current->next(); priority_queue.push(current); } - - if (current_output_rows == batch_size || get_enough_data == true) { - break; - } } _output_total_rows += output_block->rows(); - if (current_output_rows == 0 || get_enough_data == true) { + if (get_enough_data) { *eos = true; } return Status::OK(); From 3ac70adb8f9e3e26c20a2d364f584242b035ca05 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 25 Dec 2024 12:19:33 +0800 Subject: [PATCH 5/5] Revert "update partition_sorter" This reverts commit 74d637aa11ae16631e444e35e1886087141268f8. --- be/src/vec/common/sort/partition_sorter.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 2cedc2677c25db..9e2620d64df9fd 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -66,7 +66,7 @@ Status PartitionSorter::prepare_for_read() { auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description)); + priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); } blocks.clear(); return Status::OK(); @@ -102,6 +102,10 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { auto& priority_queue = _state->get_priority_queue(); + if (priority_queue.empty()) { + *eos = true; + return Status::OK(); + } const auto& sorted_block = priority_queue.top().impl->block; size_t num_columns = sorted_block->columns(); MutableBlock m_block = @@ -110,7 +114,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int size_t current_output_rows = 0; bool get_enough_data = false; - while (priority_queue.size() > 1 && current_output_rows < batch_size && !get_enough_data) { + while (!priority_queue.empty()) { auto current = priority_queue.top(); priority_queue.pop(); if (UNLIKELY(_previous_row->impl == nullptr)) { @@ -146,7 +150,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } else { bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (!cmp_res) { + if (cmp_res == false) { _output_distinct_rows++; //need rows++ firstly if (_output_distinct_rows >= _partition_inner_limit) { get_enough_data = true; @@ -172,7 +176,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } bool cmp_res = _previous_row->compare_two_rows(current); //get a distinct row - if (!cmp_res) { + if (cmp_res == false) { //here must be check distinct of two rows, and then check nums of row if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) { get_enough_data = true; @@ -194,10 +198,14 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int current->next(); priority_queue.push(current); } + + if (current_output_rows == batch_size || get_enough_data == true) { + break; + } } _output_total_rows += output_block->rows(); - if (get_enough_data) { + if (current_output_rows == 0 || get_enough_data == true) { *eos = true; } return Status::OK();