diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 729e5470c97ad5..bc8ab101ecf900 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -196,7 +196,6 @@ class IColumn : public COW { /// Appends range of elements from other column with the same type. /// Could be used to concatenate columns. - /// TODO: we need `insert_range_from_const` for every column type. virtual void insert_range_from(const IColumn& src, size_t start, size_t length) = 0; /// Appends range of elements from other column with the same type. diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index f798ca69bd68dd..c55dc0cc33eee3 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -355,14 +355,14 @@ class PODArray : public PODArrayBase= (static_cast(pad_left_) ? -1 : 0)) && - (n <= static_cast(this->size()))); + DCHECK_GE(n, (static_cast(pad_left_) ? -1 : 0)); + DCHECK_LE(n, static_cast(this->size())); return t_start()[n]; } const T& operator[](ssize_t n) const { - assert((n >= (static_cast(pad_left_) ? -1 : 0)) && - (n <= static_cast(this->size()))); + DCHECK_GE(n, (static_cast(pad_left_) ? -1 : 0)); + DCHECK_LE(n, static_cast(this->size())); return t_start()[n]; } diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index c47a8a8895298e..f6a3cb443aae77 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -49,9 +49,11 @@ PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, in : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)), _row_desc(row_desc), - _has_global_limit(has_global_limit), _partition_inner_limit(partition_inner_limit), - _top_n_algorithm(top_n_algorithm), + _top_n_algorithm( + has_global_limit + ? TopNAlgorithm::ROW_NUMBER + : top_n_algorithm), // FE will make this modification, but still maintain this code for compatibility _previous_row(previous_row) {} Status PartitionSorter::append_block(Block* input_block) { @@ -64,10 +66,13 @@ Status PartitionSorter::append_block(Block* input_block) { Status PartitionSorter::prepare_for_read() { auto& blocks = _state->get_sorted_block(); - auto& priority_queue = _state->get_priority_queue(); + auto& queue = _state->get_queue(); + std::vector cursors; for (auto& block : blocks) { - priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description)); + cursors.emplace_back( + MergeSortCursorImpl::create_shared(std::move(block), _sort_description)); } + queue = MergeSorterQueue(cursors); blocks.clear(); return Status::OK(); } @@ -88,122 +93,114 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_priority_queue().empty()) { - *eos = true; - } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) { - block->swap(*_state->get_priority_queue().top().impl->block); - block->set_num_rows(_partition_inner_limit); - *eos = true; + if (_top_n_algorithm == TopNAlgorithm::ROW_NUMBER) { + return _read_row_num(block, eos, state->batch_size()); } else { - RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); + return _read_row_rank(block, eos, state->batch_size()); } - return Status::OK(); } -Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { - auto& priority_queue = _state->get_priority_queue(); - const auto& sorted_block = priority_queue.top().impl->block; - size_t num_columns = sorted_block->columns(); +Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int batch_size) { + auto& queue = _state->get_queue(); + size_t num_columns = _state->unsorted_block()->columns(); + MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_state->unsorted_block()); MutableColumns& merged_columns = m_block.mutable_columns(); - size_t current_output_rows = 0; - - bool get_enough_data = false; - while (!priority_queue.empty()) { - auto current = priority_queue.top(); - priority_queue.pop(); - if (UNLIKELY(_previous_row->impl == nullptr)) { - *_previous_row = current; + size_t merged_rows = 0; + + Defer defer {[&]() { + if (merged_rows == 0 || _get_enough_data()) { + *eos = true; } + }}; - switch (_top_n_algorithm) { - case TopNAlgorithm::ROW_NUMBER: { - //1 row_number no need to check distinct, just output partition_inner_limit row - if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { - for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); - } - } else { - //rows has get enough - get_enough_data = true; + while (queue.is_valid() && merged_rows < batch_size && !_get_enough_data()) { + auto [current, current_rows] = queue.current(); + + // row_number no need to check distinct, just output partition_inner_limit row + size_t needed_rows = _partition_inner_limit - _output_total_rows; + size_t step = std::min(needed_rows, std::min(current_rows, batch_size - merged_rows)); + + if (current->impl->is_last(step) && current->impl->pos == 0) { + if (merged_rows != 0) { + // return directly for next time's read swap whole block + return Status::OK(); } - current_output_rows++; - break; + // swap and return block directly when we should get all data from cursor + output_block->swap(*current->impl->block); + merged_rows += step; + _output_total_rows += step; + queue.remove_top(); + return Status::OK(); } - case TopNAlgorithm::DENSE_RANK: { - // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2 - //3 dense_rank() maybe need distinct rows of partition_inner_limit - //3.1 _has_global_limit = true, so check (current_output_rows + _output_total_rows) >= _partition_inner_limit) - //3.2 _has_global_limit = false. so check have output distinct rows, not _output_total_rows - if (_has_global_limit && - (current_output_rows + _output_total_rows) >= _partition_inner_limit) { - get_enough_data = true; - break; - } - if (_has_global_limit) { - current_output_rows++; - } else { - bool cmp_res = _previous_row->compare_two_rows(current); - //get a distinct row - if (cmp_res == false) { - _output_distinct_rows++; //need rows++ firstly - if (_output_distinct_rows >= _partition_inner_limit) { - get_enough_data = true; - break; - } - *_previous_row = current; - } - } + + if (step) { + merged_rows += step; + _output_total_rows += step; for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_range_from(*current->impl->columns[i], current->impl->pos, + step); } - break; } - case TopNAlgorithm::RANK: { - // rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6 - //2 rank() maybe need check when have get a distinct row - //2.1 _has_global_limit = true: (current_output_rows + _output_total_rows) >= _partition_inner_limit) - //2.2 _has_global_limit = false: so when the cmp_res is get a distinct row, need check have output all rows num - if (_has_global_limit && - (current_output_rows + _output_total_rows) >= _partition_inner_limit) { - get_enough_data = true; - break; - } - bool cmp_res = _previous_row->compare_two_rows(current); - //get a distinct row - if (cmp_res == false) { - //here must be check distinct of two rows, and then check nums of row - if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) { - get_enough_data = true; - break; + + if (!current->impl->is_last(step)) { + queue.next(step); + } else { + queue.remove_top(); + } + } + + return Status::OK(); +} + +Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) { + auto& queue = _state->get_queue(); + size_t num_columns = _state->unsorted_block()->columns(); + + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_state->unsorted_block()); + MutableColumns& merged_columns = m_block.mutable_columns(); + size_t merged_rows = 0; + + Defer defer {[&]() { + if (merged_rows == 0 || _get_enough_data()) { + *eos = true; + } + }}; + + while (queue.is_valid() && merged_rows < batch_size) { + auto [current, current_rows] = queue.current(); + + for (size_t offset = 0; offset < current_rows && merged_rows < batch_size; offset++) { + bool cmp_res = _previous_row->impl && _previous_row->compare_two_rows(current->impl); + if (!cmp_res) { + // 1. dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2 + // dense_rank() maybe need distinct rows of partition_inner_limit + // so check have output distinct rows, not _output_total_rows + // 2. rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6 + // rank() maybe need check when have get a distinct row + // so when the cmp_res is get a distinct row, need check have output all rows num + if (_get_enough_data()) { + return Status::OK(); } - *_previous_row = current; + *_previous_row = *current; + _output_distinct_rows++; } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_from(*current->impl->block->get_columns()[i], + current->impl->pos); + } + merged_rows++; + _output_total_rows++; + if (!current->impl->is_last(1)) { + queue.next(1); + } else { + queue.remove_top(); } - current_output_rows++; - break; - } - default: - break; - } - - if (!current->is_last()) { - current->next(); - priority_queue.push(current); - } - - if (current_output_rows == batch_size || get_enough_data == true) { - break; } } - _output_total_rows += output_block->rows(); - if (current_output_rows == 0 || get_enough_data == true) { - *eos = true; - } return Status::OK(); } diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 0939dcd40cdfdb..053b3aa1a29bbd 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -90,17 +90,30 @@ class PartitionSorter final : public Sorter { Status get_next(RuntimeState* state, Block* block, bool* eos) override; size_t data_size() const override { return _state->data_size(); } - - Status partition_sort_read(Block* block, bool* eos, int batch_size); int64 get_output_rows() const { return _output_total_rows; } void reset_sorter_state(RuntimeState* runtime_state); private: + Status _read_row_num(Block* block, bool* eos, int batch_size); + Status _read_row_rank(Block* block, bool* eos, int batch_size); + bool _get_enough_data() const { + if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) { + // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2 + // dense_rank() maybe need distinct rows of partition_inner_limit + // so check have output distinct rows, not _output_total_rows + return _output_distinct_rows >= _partition_inner_limit; + } else { + // rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6 + // rank() maybe need check when have get a distinct row + // so when the cmp_res is get a distinct row, need check have output all rows num + return _output_total_rows >= _partition_inner_limit; + } + } + std::unique_ptr _state; const RowDescriptor& _row_desc; int64 _output_total_rows = 0; int64 _output_distinct_rows = 0; - bool _has_global_limit = false; int _partition_inner_limit = 0; TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER; SortCursorCmp* _previous_row = nullptr; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 4f7de1d379aea9..0fddcf011820ef 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -57,13 +57,11 @@ namespace doris::vectorized { // void MergeSorterState::reset() { - auto empty_queue = std::priority_queue(); - priority_queue_.swap(empty_queue); std::vector> empty_cursors(0); std::vector> empty_blocks(0); - sorted_blocks_.swap(empty_blocks); - unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); - in_mem_sorted_bocks_size_ = 0; + _sorted_blocks.swap(empty_blocks); + unsorted_block() = Block::create_unique(unsorted_block()->clone_empty()); + _in_mem_sorted_bocks_size = 0; } void MergeSorterState::add_sorted_block(std::shared_ptr block) { @@ -71,72 +69,80 @@ void MergeSorterState::add_sorted_block(std::shared_ptr block) { if (0 == rows) { return; } - in_mem_sorted_bocks_size_ += block->bytes(); - sorted_blocks_.emplace_back(block); - num_rows_ += rows; + _in_mem_sorted_bocks_size += block->bytes(); + _sorted_blocks.emplace_back(block); + _num_rows += rows; } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { - for (auto& block : sorted_blocks_) { - priority_queue_.emplace( + std::vector cursors; + for (auto& block : _sorted_blocks) { + cursors.emplace_back( MergeSortCursorImpl::create_shared(std::move(block), sort_description)); } + _queue = MergeSorterQueue(cursors); - sorted_blocks_.clear(); + _sorted_blocks.clear(); return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - DCHECK(sorted_blocks_.empty()); - DCHECK(unsorted_block_->empty()); - if (priority_queue_.empty()) { - *eos = true; - } else if (priority_queue_.size() == 1) { - if (offset_ != 0 || priority_queue_.top()->pos != 0) { - // Skip rows already returned or need to be ignored - int64_t offset = offset_ + (int64_t)priority_queue_.top()->pos; - priority_queue_.top().impl->block->skip_num_rows(offset); - } - block->swap(*priority_queue_.top().impl->block); - *eos = true; - } else { - RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); - } + DCHECK(_sorted_blocks.empty()); + DCHECK(unsorted_block()->empty()); + RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); return Status::OK(); } Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - size_t num_columns = priority_queue_.top().impl->block->columns(); + size_t num_columns = unsorted_block()->columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( - block, *priority_queue_.top().impl->block); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block()); MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; // process single element queue on merge_sort_read() - while (priority_queue_.size() > 1 && merged_rows < batch_size) { - auto current = priority_queue_.top(); - priority_queue_.pop(); + while (_queue.is_valid() && merged_rows < batch_size) { + auto [current, current_rows] = _queue.current(); + current_rows = std::min(current_rows, batch_size - merged_rows); + + size_t step = std::min(_offset, current_rows); + _offset -= step; + current_rows -= step; + + if (current->impl->is_last(current_rows + step) && current->impl->pos == 0 && step == 0) { + if (merged_rows != 0) { + // return directly for next time's read swap whole block + return Status::OK(); + } + // swap and return block directly when we should get all data from cursor + block->swap(*current->impl->block); + _queue.remove_top(); + return Status::OK(); + } - if (offset_ == 0) { + if (current_rows) { for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_range_from(*current->impl->columns[i], + current->impl->pos + step, current_rows); } - ++merged_rows; - } else { - offset_--; + merged_rows += current_rows; } - if (!current->is_last()) { - current->next(); - priority_queue_.push(current); + if (!current->impl->is_last(current_rows + step)) { + _queue.next(current_rows + step); + } else { + _queue.remove_top(); } } block->set_columns(std::move(merged_columns)); + + if (merged_rows == 0) { + *eos = true; + } return Status::OK(); } @@ -207,23 +213,28 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); - if (_reach_limit() && block->bytes() > _state->unsorted_block_->allocated_bytes() - - _state->unsorted_block_->bytes()) { + if (_reach_limit() && block->bytes() > _state->unsorted_block()->allocated_bytes() - + _state->unsorted_block()->bytes()) { RETURN_IF_ERROR(_do_sort()); } { SCOPED_TIMER(_merge_block_timer); - const auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); + const auto& data = _state->unsorted_block()->get_columns_with_type_and_name(); const auto& arrival_data = block->get_columns_with_type_and_name(); auto sz = block->rows(); for (int i = 0; i < data.size(); ++i) { DCHECK(data[i].type->equals(*(arrival_data[i].type))) << " type1: " << data[i].type->get_name() << " type2: " << arrival_data[i].type->get_name() << " i: " << i; - //TODO: to eliminate unnecessary expansion, we need a `insert_range_from_const` for every column type. - data[i].column->assume_mutable()->insert_range_from( - *arrival_data[i].column->convert_to_full_column_if_const(), 0, sz); + if (is_column_const(*arrival_data[i].column)) { + data[i].column->assume_mutable()->insert_many_from( + assert_cast(arrival_data[i].column.get()) + ->get_data_column(), + 0, sz); + } else { + data[i].column->assume_mutable()->insert_range_from(*arrival_data[i].column, 0, sz); + } } block->clear_column_data(); } @@ -231,7 +242,7 @@ Status FullSorter::append_block(Block* block) { } Status FullSorter::prepare_for_read() { - if (_state->unsorted_block_->rows() > 0) { + if (_state->unsorted_block()->rows() > 0) { RETURN_IF_ERROR(_do_sort()); } return _state->build_merge_tree(_sort_description); @@ -247,7 +258,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::vectori } Status FullSorter::_do_sort() { - Block* src_block = _state->unsorted_block_.get(); + Block* src_block = _state->unsorted_block().get(); Block desc_block = src_block->clone_without_columns(); RETURN_IF_ERROR(partial_sort(*src_block, desc_block)); diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 36c535c9101db9..69e70ab869cd4b 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -43,6 +43,8 @@ class RowDescriptor; namespace doris::vectorized { +using MergeSorterQueue = SortingQueueBatch; + // TODO: now we only use merge sort class MergeSorterState { ENABLE_FACTORY_CREATOR(MergeSorterState); @@ -53,9 +55,9 @@ class MergeSorterState { // create_empty_block should ignore invalid slots, unsorted_block // should be same structure with arrival block from child node // since block from child node may ignored these slots - : unsorted_block_(Block::create_unique( + : _unsorted_block(Block::create_unique( VectorizedUtils::create_empty_block(row_desc, true /*ignore invalid slot*/))), - offset_(offset) {} + _offset(offset) {} ~MergeSorterState() = default; @@ -66,32 +68,33 @@ class MergeSorterState { Status merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos); size_t data_size() const { - size_t size = unsorted_block_->bytes(); - return size + in_mem_sorted_bocks_size_; + size_t size = _unsorted_block->bytes(); + return size + _in_mem_sorted_bocks_size; } - uint64_t num_rows() const { return num_rows_; } + uint64_t num_rows() const { return _num_rows; } - std::shared_ptr last_sorted_block() { return sorted_blocks_.back(); } + std::shared_ptr last_sorted_block() { return _sorted_blocks.back(); } - std::vector>& get_sorted_block() { return sorted_blocks_; } - std::priority_queue& get_priority_queue() { return priority_queue_; } + std::vector>& get_sorted_block() { return _sorted_blocks; } + MergeSorterQueue& get_queue() { return _queue; } void reset(); - std::unique_ptr unsorted_block_; + std::unique_ptr& unsorted_block() { return _unsorted_block; } private: Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); - std::priority_queue priority_queue_; - std::vector> sorted_blocks_; - size_t in_mem_sorted_bocks_size_ = 0; - uint64_t num_rows_ = 0; + std::unique_ptr _unsorted_block; + MergeSorterQueue _queue; + std::vector> _sorted_blocks; + size_t _in_mem_sorted_bocks_size = 0; + uint64_t _num_rows = 0; - int64_t offset_; + size_t _offset; - Block merge_sorted_block_; - std::unique_ptr merger_; + Block _merge_sorted_block; + std::unique_ptr _merger; }; class Sorter { @@ -177,7 +180,7 @@ class FullSorter final : public Sorter { private: bool _reach_limit() { - return _state->unsorted_block_->allocated_bytes() >= buffered_block_bytes_; + return _state->unsorted_block()->allocated_bytes() >= buffered_block_bytes_; } Status _do_sort(); diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d31767f46e461f..a37b6feb21ecd6 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -20,6 +20,8 @@ #pragma once +#include + #include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/core/sort_description.h" @@ -48,8 +50,7 @@ struct HeapSortCursorBlockView { void _reset() { sort_columns.clear(); auto columns = block.get_columns_and_convert(); - for (size_t j = 0, size = desc.size(); j < size; ++j) { - auto& column_desc = desc[j]; + for (auto& column_desc : desc) { size_t column_number = !column_desc.column_name.empty() ? block.get_position_by_name(column_desc.column_name) : column_desc.column_number; @@ -63,7 +64,7 @@ using HeapSortCursorBlockSPtr = std::shared_ptr; struct HeapSortCursorImpl { public: HeapSortCursorImpl(int row_id, HeapSortCursorBlockSPtr block_view) - : _row_id(row_id), _block_view(block_view) {} + : _row_id(row_id), _block_view(std::move(block_view)) {} HeapSortCursorImpl(const HeapSortCursorImpl& other) { _row_id = other._row_id; @@ -123,6 +124,7 @@ struct MergeSortCursorImpl { ENABLE_FACTORY_CREATOR(MergeSortCursorImpl); std::shared_ptr block; ColumnRawPtrs sort_columns; + ColumnRawPtrs columns; SortDescription desc; size_t sort_columns_size = 0; size_t pos = 0; @@ -131,26 +133,33 @@ struct MergeSortCursorImpl { MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; - MergeSortCursorImpl(std::shared_ptr block_, const SortDescription& desc_) - : block(block_), desc(desc_), sort_columns_size(desc.size()) { + MergeSortCursorImpl(std::shared_ptr block_, SortDescription desc_) + : block(std::move(block_)), desc(std::move(desc_)), sort_columns_size(desc.size()) { reset(); } - MergeSortCursorImpl(const SortDescription& desc_) - : block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {} + MergeSortCursorImpl(SortDescription desc_) + : block(Block::create_shared()), + desc(std::move(desc_)), + sort_columns_size(desc.size()) {} + bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. void reset() { sort_columns.clear(); + columns.clear(); - auto columns = block->get_columns_and_convert(); - for (size_t j = 0, size = desc.size(); j < size; ++j) { - auto& column_desc = desc[j]; + auto tmp_columns = block->get_columns_and_convert(); + columns.reserve(tmp_columns.size()); + for (auto col : tmp_columns) { + columns.push_back(col.get()); + } + for (auto& column_desc : desc) { size_t column_number = !column_desc.column_name.empty() ? block->get_position_by_name(column_desc.column_name) : column_desc.column_number; - sort_columns.push_back(columns[column_number].get()); + sort_columns.push_back(columns[column_number]); } pos = 0; @@ -158,8 +167,9 @@ struct MergeSortCursorImpl { } bool is_first() const { return pos == 0; } - bool is_last() const { return pos + 1 >= rows; } - void next() { ++pos; } + bool is_last(size_t size = 1) const { return pos + size >= rows; } + void next(size_t size = 1) { pos += size; } + size_t get_size() const { return rows; } virtual bool has_next_block() { return false; } virtual Block* block_ptr() { return nullptr; } @@ -169,11 +179,11 @@ using BlockSupplier = std::function; struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl); - BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, + BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first) - : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { + : _ordering_expr(ordering_expr), _block_supplier(std::move(block_supplier)) { block = Block::create_shared(); sort_columns_size = ordering_expr.size(); @@ -185,8 +195,8 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { _is_eof = !has_next_block(); } - BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const SortDescription& desc_) - : MergeSortCursorImpl(desc_), _block_supplier(block_supplier) { + BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const SortDescription& desc_) + : MergeSortCursorImpl(desc_), _block_supplier(std::move(block_supplier)) { _is_eof = !has_next_block(); } @@ -202,7 +212,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. if (status.ok() && !block->empty()) { - if (_ordering_expr.size() > 0) { + if (!_ordering_expr.empty()) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); @@ -233,7 +243,7 @@ struct MergeSortCursor { ENABLE_FACTORY_CREATOR(MergeSortCursor); std::shared_ptr impl; - MergeSortCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortCursor(std::shared_ptr impl_) : impl(std::move(impl_)) {} MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. @@ -264,6 +274,21 @@ struct MergeSortCursor { return greater_at(rhs, impl->rows - 1, 0) == -1; } + /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. + bool totally_less_or_equals(const MergeSortCursor& rhs) const { + if (impl->rows == 0 || rhs.impl->rows == 0) { + return false; + } + + /// The last row of this cursor is no larger than the first row of the another cursor. + return greater_at(rhs, impl->rows - 1, rhs->pos) <= 0; + } + + bool greater_with_offset(const MergeSortCursor& rhs, size_t lhs_offset, + size_t rhs_offset) const { + return greater_at(rhs, impl->pos + lhs_offset, rhs.impl->pos + rhs_offset) > 0; + } + bool greater(const MergeSortCursor& rhs) const { return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0; } @@ -277,7 +302,7 @@ struct MergeSortBlockCursor { ENABLE_FACTORY_CREATOR(MergeSortBlockCursor); std::shared_ptr impl = nullptr; - MergeSortBlockCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortBlockCursor(std::shared_ptr impl_) : impl(std::move(impl_)) {} MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. @@ -314,4 +339,219 @@ struct MergeSortBlockCursor { } }; +enum class SortingQueueStrategy : uint8_t { Default, Batch }; + +/// Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). +template +class SortingQueueImpl { +public: + SortingQueueImpl() = default; + + template + explicit SortingQueueImpl(Cursors& cursors) { + size_t size = cursors.size(); + _queue.reserve(size); + + for (size_t i = 0; i < size; ++i) { + _queue.emplace_back(cursors[i]); + } + + std::make_heap(_queue.begin(), _queue.end()); + + if constexpr (strategy == SortingQueueStrategy::Batch) { + if (!_queue.empty()) { + update_batch_size(); + } + } + } + + bool is_valid() const { return !_queue.empty(); } + + Cursor& current() + requires(strategy == SortingQueueStrategy::Default) + { + return &_queue.front(); + } + + std::pair current() + requires(strategy == SortingQueueStrategy::Batch) + { + return {&_queue.front(), batch_size}; + } + + size_t size() { return _queue.size(); } + + Cursor& next_child() { return _queue[next_child_index()]; } + + void ALWAYS_INLINE next() + requires(strategy == SortingQueueStrategy::Default) + { + assert(is_valid()); + + if (!_queue.front()->is_last()) { + _queue.front()->next(); + update_top(true); + } else { + remove_top(); + } + } + + void ALWAYS_INLINE next(size_t batch_size_value) + requires(strategy == SortingQueueStrategy::Batch) + { + assert(is_valid()); + assert(batch_size_value <= batch_size); + assert(batch_size_value > 0); + + batch_size -= batch_size_value; + if (batch_size > 0) { + _queue.front()->next(batch_size_value); + return; + } + + if (!_queue.front()->is_last(batch_size_value)) { + _queue.front()->next(batch_size_value); + update_top(false); + } else { + remove_top(); + } + } + + void remove_top() { + std::pop_heap(_queue.begin(), _queue.end()); + _queue.pop_back(); + next_child_idx = 0; + + if constexpr (strategy == SortingQueueStrategy::Batch) { + if (_queue.empty()) { + batch_size = 0; + } else { + update_batch_size(); + } + } + } + + void push(MergeSortCursorImpl& cursor) { + _queue.emplace_back(&cursor); + std::push_heap(_queue.begin(), _queue.end()); + next_child_idx = 0; + + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + } + +private: + using Container = std::vector; + Container _queue; + + /// Cache comparison between first and second child if the order in queue has not been changed. + size_t next_child_idx = 0; + size_t batch_size = 0; + + size_t ALWAYS_INLINE next_child_index() { + if (next_child_idx == 0) { + next_child_idx = 1; + + if (_queue.size() > 2 && _queue[1].greater(_queue[2])) { + ++next_child_idx; + } + } + + return next_child_idx; + } + + /// This is adapted version of the function __sift_down from libc++. + /// Why cannot simply use std::priority_queue? + /// - because it doesn't support updating the top element and requires pop and push instead. + /// Also look at "Boost.Heap" library. + void ALWAYS_INLINE update_top(bool check_in_order) { + size_t size = _queue.size(); + if (size < 2) { + return; + } + + auto begin = _queue.begin(); + + size_t child_idx = next_child_index(); + auto child_it = begin + child_idx; + + /// Check if we are in order. + if (check_in_order && (*child_it).greater(*begin)) { + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + return; + } + + next_child_idx = 0; + + auto curr_it = begin; + auto top(std::move(*begin)); + do { + /// We are not in heap-order, swap the parent with it's largest child. + *curr_it = std::move(*child_it); + curr_it = child_it; + + // recompute the child based off of the updated parent + child_idx = 2 * child_idx + 1; + + if (child_idx >= size) { + break; + } + + child_it = begin + child_idx; + + if ((child_idx + 1) < size && (*child_it).greater(*(child_it + 1))) { + /// Right child exists and is greater than left child. + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + } while (!((*child_it).greater(top))); + *curr_it = std::move(top); + + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + } + + /// Update batch size of elements that client can extract from current cursor + void update_batch_size() { + DCHECK(!_queue.empty()); + + auto& begin_cursor = *_queue.begin(); + size_t min_cursor_size = begin_cursor->get_size(); + size_t min_cursor_pos = begin_cursor->pos; + + if (_queue.size() == 1) { + batch_size = min_cursor_size - min_cursor_pos; + return; + } + + batch_size = 1; + size_t child_idx = next_child_index(); + auto& next_child_cursor = *(_queue.begin() + child_idx); + if (min_cursor_pos + batch_size < min_cursor_size && + next_child_cursor.greater_with_offset(begin_cursor, 0, batch_size)) { + ++batch_size; + } else { + return; + } + if (begin_cursor.totally_less_or_equals(next_child_cursor)) { + batch_size = min_cursor_size - min_cursor_pos; + return; + } + + while (min_cursor_pos + batch_size < min_cursor_size && + next_child_cursor.greater_with_offset(begin_cursor, 0, batch_size)) { + ++batch_size; + } + } +}; +template +using SortingQueue = SortingQueueImpl; +template +using SortingQueueBatch = SortingQueueImpl; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java index 20142e380ce710..69a1b871d1ba66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java @@ -142,7 +142,10 @@ protected void toThrift(TPlanNode msg) { Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for tupleIds in PartitionSortNode"); TopNAlgorithm topNAlgorithm; - if (function == WindowFuncType.ROW_NUMBER) { + if (hasGlobalLimit) { + // only need row number if has global limit, so we change algorithm directly + topNAlgorithm = TopNAlgorithm.ROW_NUMBER; + } else if (function == WindowFuncType.ROW_NUMBER) { topNAlgorithm = TopNAlgorithm.ROW_NUMBER; } else if (function == WindowFuncType.RANK) { topNAlgorithm = TopNAlgorithm.RANK;