Skip to content

Commit

Permalink
[Enchancement](sort) change priority_queue to ck SortingQueue (apache…
Browse files Browse the repository at this point in the history
…#45952)

1. change priority_queue to ck SortingQueue(a heap whitch support modify
top element)
2. avoid some convert_if_const usage


![QQ_1735532950407](https://github.com/user-attachments/assets/ec7b52c1-424b-4d7f-993c-64410ec35ba5)
  • Loading branch information
BiteTheDDDDt authored Jan 7, 2025
1 parent c5fce7a commit ac0322d
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 194 deletions.
1 change: 0 additions & 1 deletion be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class IColumn : public COW<IColumn> {

/// Appends range of elements from other column with the same type.
/// Could be used to concatenate columns.
/// TODO: we need `insert_range_from_const` for every column type.
virtual void insert_range_from(const IColumn& src, size_t start, size_t length) = 0;

/// Appends range of elements from other column with the same type.
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/common/pod_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_r
/// The index is signed to access -1th element without pointer overflow.
T& operator[](ssize_t n) {
/// <= size, because taking address of one element past memory range is Ok in C++ (expression like &arr[arr.size()] is perfectly valid).
assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
(n <= static_cast<ssize_t>(this->size())));
DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
DCHECK_LE(n, static_cast<ssize_t>(this->size()));
return t_start()[n];
}

const T& operator[](ssize_t n) const {
assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
(n <= static_cast<ssize_t>(this->size())));
DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
DCHECK_LE(n, static_cast<ssize_t>(this->size()));
return t_start()[n];
}

Expand Down
197 changes: 97 additions & 100 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, in
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first),
_state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)),
_row_desc(row_desc),
_has_global_limit(has_global_limit),
_partition_inner_limit(partition_inner_limit),
_top_n_algorithm(top_n_algorithm),
_top_n_algorithm(
has_global_limit
? TopNAlgorithm::ROW_NUMBER
: top_n_algorithm), // FE will make this modification, but still maintain this code for compatibility
_previous_row(previous_row) {}

Status PartitionSorter::append_block(Block* input_block) {
Expand All @@ -64,10 +66,13 @@ Status PartitionSorter::append_block(Block* input_block) {

Status PartitionSorter::prepare_for_read() {
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
auto& queue = _state->get_queue();
std::vector<MergeSortCursor> cursors;
for (auto& block : blocks) {
priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description));
cursors.emplace_back(
MergeSortCursorImpl::create_shared(std::move(block), _sort_description));
}
queue = MergeSorterQueue(cursors);
blocks.clear();
return Status::OK();
}
Expand All @@ -88,122 +93,114 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
}

Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
if (_state->get_priority_queue().empty()) {
*eos = true;
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
block->swap(*_state->get_priority_queue().top().impl->block);
block->set_num_rows(_partition_inner_limit);
*eos = true;
if (_top_n_algorithm == TopNAlgorithm::ROW_NUMBER) {
return _read_row_num(block, eos, state->batch_size());
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
return _read_row_rank(block, eos, state->batch_size());
}
return Status::OK();
}

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
auto& priority_queue = _state->get_priority_queue();
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int batch_size) {
auto& queue = _state->get_queue();
size_t num_columns = _state->unsorted_block()->columns();

MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_state->unsorted_block());
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;

bool get_enough_data = false;
while (!priority_queue.empty()) {
auto current = priority_queue.top();
priority_queue.pop();
if (UNLIKELY(_previous_row->impl == nullptr)) {
*_previous_row = current;
size_t merged_rows = 0;

Defer defer {[&]() {
if (merged_rows == 0 || _get_enough_data()) {
*eos = true;
}
}};

switch (_top_n_algorithm) {
case TopNAlgorithm::ROW_NUMBER: {
//1 row_number no need to check distinct, just output partition_inner_limit row
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
} else {
//rows has get enough
get_enough_data = true;
while (queue.is_valid() && merged_rows < batch_size && !_get_enough_data()) {
auto [current, current_rows] = queue.current();

// row_number no need to check distinct, just output partition_inner_limit row
size_t needed_rows = _partition_inner_limit - _output_total_rows;
size_t step = std::min(needed_rows, std::min(current_rows, batch_size - merged_rows));

if (current->impl->is_last(step) && current->impl->pos == 0) {
if (merged_rows != 0) {
// return directly for next time's read swap whole block
return Status::OK();
}
current_output_rows++;
break;
// swap and return block directly when we should get all data from cursor
output_block->swap(*current->impl->block);
merged_rows += step;
_output_total_rows += step;
queue.remove_top();
return Status::OK();
}
case TopNAlgorithm::DENSE_RANK: {
// dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2
//3 dense_rank() maybe need distinct rows of partition_inner_limit
//3.1 _has_global_limit = true, so check (current_output_rows + _output_total_rows) >= _partition_inner_limit)
//3.2 _has_global_limit = false. so check have output distinct rows, not _output_total_rows
if (_has_global_limit &&
(current_output_rows + _output_total_rows) >= _partition_inner_limit) {
get_enough_data = true;
break;
}
if (_has_global_limit) {
current_output_rows++;
} else {
bool cmp_res = _previous_row->compare_two_rows(current);
//get a distinct row
if (cmp_res == false) {
_output_distinct_rows++; //need rows++ firstly
if (_output_distinct_rows >= _partition_inner_limit) {
get_enough_data = true;
break;
}
*_previous_row = current;
}
}

if (step) {
merged_rows += step;
_output_total_rows += step;
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_range_from(*current->impl->columns[i], current->impl->pos,
step);
}
break;
}
case TopNAlgorithm::RANK: {
// rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6
//2 rank() maybe need check when have get a distinct row
//2.1 _has_global_limit = true: (current_output_rows + _output_total_rows) >= _partition_inner_limit)
//2.2 _has_global_limit = false: so when the cmp_res is get a distinct row, need check have output all rows num
if (_has_global_limit &&
(current_output_rows + _output_total_rows) >= _partition_inner_limit) {
get_enough_data = true;
break;
}
bool cmp_res = _previous_row->compare_two_rows(current);
//get a distinct row
if (cmp_res == false) {
//here must be check distinct of two rows, and then check nums of row
if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) {
get_enough_data = true;
break;

if (!current->impl->is_last(step)) {
queue.next(step);
} else {
queue.remove_top();
}
}

return Status::OK();
}

Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) {
auto& queue = _state->get_queue();
size_t num_columns = _state->unsorted_block()->columns();

MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_state->unsorted_block());
MutableColumns& merged_columns = m_block.mutable_columns();
size_t merged_rows = 0;

Defer defer {[&]() {
if (merged_rows == 0 || _get_enough_data()) {
*eos = true;
}
}};

while (queue.is_valid() && merged_rows < batch_size) {
auto [current, current_rows] = queue.current();

for (size_t offset = 0; offset < current_rows && merged_rows < batch_size; offset++) {
bool cmp_res = _previous_row->impl && _previous_row->compare_two_rows(current->impl);
if (!cmp_res) {
// 1. dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2
// dense_rank() maybe need distinct rows of partition_inner_limit
// so check have output distinct rows, not _output_total_rows
// 2. rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6
// rank() maybe need check when have get a distinct row
// so when the cmp_res is get a distinct row, need check have output all rows num
if (_get_enough_data()) {
return Status::OK();
}
*_previous_row = current;
*_previous_row = *current;
_output_distinct_rows++;
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->impl->block->get_columns()[i],
current->impl->pos);
}
merged_rows++;
_output_total_rows++;
if (!current->impl->is_last(1)) {
queue.next(1);
} else {
queue.remove_top();
}
current_output_rows++;
break;
}
default:
break;
}

if (!current->is_last()) {
current->next();
priority_queue.push(current);
}

if (current_output_rows == batch_size || get_enough_data == true) {
break;
}
}

_output_total_rows += output_block->rows();
if (current_output_rows == 0 || get_enough_data == true) {
*eos = true;
}
return Status::OK();
}

Expand Down
19 changes: 16 additions & 3 deletions be/src/vec/common/sort/partition_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,30 @@ class PartitionSorter final : public Sorter {
Status get_next(RuntimeState* state, Block* block, bool* eos) override;

size_t data_size() const override { return _state->data_size(); }

Status partition_sort_read(Block* block, bool* eos, int batch_size);
int64 get_output_rows() const { return _output_total_rows; }
void reset_sorter_state(RuntimeState* runtime_state);

private:
Status _read_row_num(Block* block, bool* eos, int batch_size);
Status _read_row_rank(Block* block, bool* eos, int batch_size);
bool _get_enough_data() const {
if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) {
// dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 3, need output all 1 and 2
// dense_rank() maybe need distinct rows of partition_inner_limit
// so check have output distinct rows, not _output_total_rows
return _output_distinct_rows >= _partition_inner_limit;
} else {
// rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, need output all 1,1,1,4,5,6,6,....6
// rank() maybe need check when have get a distinct row
// so when the cmp_res is get a distinct row, need check have output all rows num
return _output_total_rows >= _partition_inner_limit;
}
}

std::unique_ptr<MergeSorterState> _state;
const RowDescriptor& _row_desc;
int64 _output_total_rows = 0;
int64 _output_distinct_rows = 0;
bool _has_global_limit = false;
int _partition_inner_limit = 0;
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER;
SortCursorCmp* _previous_row = nullptr;
Expand Down
Loading

0 comments on commit ac0322d

Please sign in to comment.