diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 72bf35f3cbac3b0..9f6f717d15dff0f 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -120,13 +120,14 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!priority_queue_.empty()) { + while (priority_queue_.size() > 1) { auto current = priority_queue_.top(); priority_queue_.pop(); if (offset_ == 0) { - for (size_t i = 0; i < num_columns; ++i) + for (size_t i = 0; i < num_columns; ++i) { merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + } ++merged_rows; } else { offset_--; @@ -141,6 +142,31 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized break; } } + + if (merged_rows < batch_size && priority_queue_.size() == 1) { + auto current = priority_queue_.top(); + priority_queue_.pop(); + if (offset_) { + auto skip_step = std::min(current->remained_rows(), size_t(offset_)); + offset_ -= skip_step; + current->pos += skip_step; + } + + auto step = std::min(current->remained_rows(), size_t(batch_size - merged_rows)); + if (step) { + for (size_t i = 0; i < num_columns; ++i) { + merged_columns[i]->insert_range_from(*current->block->get_columns()[i], + current->pos, step); + } + merged_rows += step; + current->pos += step; + } + + if (current->remained_rows()) { + priority_queue_.push(current); + } + } + block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { @@ -217,9 +243,15 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); + + if (_reach_limit() && block->bytes() > _state->unsorted_block_->allocated_bytes() - + _state->unsorted_block_->bytes()) { + RETURN_IF_ERROR(_do_sort()); + } + { SCOPED_TIMER(_merge_block_timer); - auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); + const auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); const auto& arrival_data = block->get_columns_with_type_and_name(); auto sz = block->rows(); for (int i = 0; i < data.size(); ++i) { @@ -232,9 +264,6 @@ Status FullSorter::append_block(Block* block) { } block->clear_column_data(); } - if (_reach_limit()) { - RETURN_IF_ERROR(_do_sort()); - } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index aa7d88dfbc2a3a1..36c535c9101db93 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -177,21 +177,15 @@ class FullSorter final : public Sorter { private: bool _reach_limit() { - return _state->unsorted_block_->rows() > buffered_block_size_ || - _state->unsorted_block_->bytes() > buffered_block_bytes_; + return _state->unsorted_block_->allocated_bytes() >= buffered_block_bytes_; } Status _do_sort(); std::unique_ptr _state; - static constexpr size_t INITIAL_BUFFERED_BLOCK_SIZE = 1024 * 1024; - static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20; + static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_SIZE = 4 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_BYTES = 256 << 20; - - size_t buffered_block_size_ = INITIAL_BUFFERED_BLOCK_SIZE; size_t buffered_block_bytes_ = INITIAL_BUFFERED_BLOCK_BYTES; }; diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d31767f46e461ff..67d603533a102de 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -160,6 +160,7 @@ struct MergeSortCursorImpl { bool is_first() const { return pos == 0; } bool is_last() const { return pos + 1 >= rows; } void next() { ++pos; } + size_t remained_rows() const { return rows - pos; } virtual bool has_next_block() { return false; } virtual Block* block_ptr() { return nullptr; }