Skip to content

Commit

Permalink
some optimization for full sort
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Dec 23, 2024
1 parent a05582f commit 3ee1703
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
41 changes: 35 additions & 6 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!priority_queue_.empty()) {
while (priority_queue_.size() > 1) {
auto current = priority_queue_.top();
priority_queue_.pop();

if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
++merged_rows;
} else {
offset_--;
Expand All @@ -141,6 +142,31 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
break;
}
}

if (merged_rows < batch_size && priority_queue_.size() == 1) {
auto current = priority_queue_.top();
priority_queue_.pop();
if (offset_) {
auto skip_step = std::min(current->remained_rows(), size_t(offset_));
offset_ -= skip_step;
current->pos += skip_step;
}

auto step = std::min(current->remained_rows(), size_t(batch_size - merged_rows));
if (step) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_range_from(*current->block->get_columns()[i],
current->pos, step);
}
merged_rows += step;
current->pos += step;
}

if (current->remained_rows()) {
priority_queue_.push(current);
}
}

block->set_columns(std::move(merged_columns));

if (merged_rows == 0) {
Expand Down Expand Up @@ -217,9 +243,15 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs

Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);

if (_reach_limit() && block->bytes() > _state->unsorted_block_->allocated_bytes() -
_state->unsorted_block_->bytes()) {
RETURN_IF_ERROR(_do_sort());
}

{
SCOPED_TIMER(_merge_block_timer);
auto& data = _state->unsorted_block_->get_columns_with_type_and_name();
const auto& data = _state->unsorted_block_->get_columns_with_type_and_name();
const auto& arrival_data = block->get_columns_with_type_and_name();
auto sz = block->rows();
for (int i = 0; i < data.size(); ++i) {
Expand All @@ -232,9 +264,6 @@ Status FullSorter::append_block(Block* block) {
}
block->clear_column_data();
}
if (_reach_limit()) {
RETURN_IF_ERROR(_do_sort());
}
return Status::OK();
}

Expand Down
10 changes: 2 additions & 8 deletions be/src/vec/common/sort/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,15 @@ class FullSorter final : public Sorter {

private:
bool _reach_limit() {
return _state->unsorted_block_->rows() > buffered_block_size_ ||
_state->unsorted_block_->bytes() > buffered_block_bytes_;
return _state->unsorted_block_->allocated_bytes() >= buffered_block_bytes_;
}

Status _do_sort();

std::unique_ptr<MergeSorterState> _state;

static constexpr size_t INITIAL_BUFFERED_BLOCK_SIZE = 1024 * 1024;
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024;

static constexpr size_t SPILL_BUFFERED_BLOCK_SIZE = 4 * 1024 * 1024;
static constexpr size_t SPILL_BUFFERED_BLOCK_BYTES = 256 << 20;

size_t buffered_block_size_ = INITIAL_BUFFERED_BLOCK_SIZE;
size_t buffered_block_bytes_ = INITIAL_BUFFERED_BLOCK_BYTES;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ struct MergeSortCursorImpl {
bool is_first() const { return pos == 0; }
bool is_last() const { return pos + 1 >= rows; }
void next() { ++pos; }
size_t remained_rows() const { return rows - pos; }

virtual bool has_next_block() { return false; }
virtual Block* block_ptr() { return nullptr; }
Expand Down

0 comments on commit 3ee1703

Please sign in to comment.