Skip to content

Commit

Permalink
[Enhancement] optimize buffer strategy of merge-sort (StarRocks#54183)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork authored and magzhu committed Jan 6, 2025
1 parent c6d6ce6 commit d790856
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 19 deletions.
8 changes: 3 additions & 5 deletions be/src/exec/chunks_sorter_full_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
#include "exec/sorting/merge.h"
#include "exec/sorting/sort_permute.h"
#include "exec/sorting/sorting.h"
#include "exprs/column_ref.h"
#include "exprs/expr.h"
#include "gutil/strings/substitute.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_state.h"
#include "util/stopwatch.hpp"

namespace starrocks {

Expand All @@ -43,8 +40,8 @@ void ChunksSorterFullSort::setup_runtime(RuntimeState* state, RuntimeProfile* pr
_runtime_profile = profile;
_parent_mem_tracker = parent_mem_tracker;
_object_pool = std::make_unique<ObjectPool>();
_runtime_profile->add_info_string("MaxBufferedRows", strings::Substitute("$0", max_buffered_rows));
_runtime_profile->add_info_string("MaxBufferedBytes", strings::Substitute("$0", max_buffered_bytes));
_runtime_profile->add_info_string("MaxBufferedRows", std::to_string(max_buffered_rows));
_runtime_profile->add_info_string("MaxBufferedBytes", std::to_string(max_buffered_bytes));
_profiler = _object_pool->add(new ChunksSorterFullSortProfiler(profile, parent_mem_tracker));
}

Expand Down Expand Up @@ -130,6 +127,7 @@ Status ChunksSorterFullSort::_partial_sort(RuntimeState* state, bool done) {
Status ChunksSorterFullSort::_merge_sorted(RuntimeState* state) {
SCOPED_TIMER(_merge_timer);
_profiler->num_sorted_runs->set((int64_t)_sorted_chunks.size());
// TODO: introduce an extra merge before cascading merge to handle the case that has a lot of sortruns
// In cascading merging phase, the height of merging tree is ceiling(log2(num_sorted_runs)) + 1,
// so when num_sorted_runs is 1 or 2, the height merging tree is less than 2, the sorted runs just be processed
// in at most one pass. there is no need to enable lazy materialization which eliminates non-order-by output
Expand Down
12 changes: 8 additions & 4 deletions be/src/exec/chunks_sorter_full_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "column/vectorized_fwd.h"
#include "exec/chunks_sorter.h"
#include "exec/sorting/merge.h"
#include "gtest/gtest_prod.h"

namespace starrocks {
class ExprContext;
Expand All @@ -34,6 +33,11 @@ struct ChunksSorterFullSortProfiler {
};
class ChunksSorterFullSort : public ChunksSorter {
public:
static constexpr size_t kDefaultMaxBufferRows =
1 << 30; // 1 billion rows, the number of rows has little impact on performance
static constexpr size_t kDefaultMaxBufferBytes =
256 << 20; // 256MB, a larger limit may improve performance but is not memory allocator friendly

/**
* Constructor.
* @param sort_exprs The order-by columns or columns with expression. This sorter will use but not own the object.
Expand Down Expand Up @@ -90,9 +94,9 @@ class ChunksSorterFullSort : public ChunksSorter {
std::unique_ptr<ObjectPool> _object_pool = nullptr;
ChunksSorterFullSortProfiler* _profiler = nullptr;

// TODO: further tunning the buffer parameter
const size_t max_buffered_rows; // Max buffer 1024000 rows
const size_t max_buffered_bytes; // Max buffer 16MB bytes
// Parameters to control the Merge-Sort behavior
const size_t max_buffered_rows;
const size_t max_buffered_bytes;

// only when order-by columns(_sort_exprs) are all ColumnRefs and the cost of eager-materialization of
// other columns is large than ordinal column, then we materialize order-by columns and ordinal columns eagerly,
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

#include "exec/topn_node.h"

#include <any>
#include <memory>

#include "exec/chunks_sorter.h"
#include "exec/chunks_sorter_full_sort.h"
#include "exec/chunks_sorter_heap_sort.h"
#include "exec/chunks_sorter_topn.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/noop_sink_operator.h"
#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/sort/local_merge_sort_source_operator.h"
#include "exec/pipeline/sort/local_parallel_merge_sort_source_operator.h"
Expand All @@ -34,7 +32,6 @@
#include "exec/pipeline/sort/spillable_partition_sort_sink_operator.h"
#include "exec/pipeline/source_operator.h"
#include "exec/pipeline/spill_process_channel.h"
#include "exec/pipeline/spill_process_operator.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"

Expand Down Expand Up @@ -322,12 +319,14 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> TopNNode::_decompose_to_

OperatorFactoryPtr sink_operator;

int64_t max_buffered_rows = 1024000;
int64_t max_buffered_bytes = 16 * 1024 * 1024;
int64_t max_buffered_rows = ChunksSorterFullSort::kDefaultMaxBufferRows;
int64_t max_buffered_bytes = ChunksSorterFullSort::kDefaultMaxBufferBytes;
if (_tnode.sort_node.__isset.max_buffered_bytes) {
max_buffered_rows = _tnode.sort_node.max_buffered_rows;
max_buffered_bytes = _tnode.sort_node.max_buffered_bytes;
}
if (_tnode.sort_node.__isset.max_buffered_rows) {
max_buffered_rows = _tnode.sort_node.max_buffered_rows;
}

sink_operator = std::make_shared<SinkFactory>(
context->next_operator_id(), id(), context_factory, _sort_exec_exprs, _is_asc_order, _is_null_first,
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/cpu_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class CpuInfo {
return cache_sizes;
}

static long get_l3_cache_size() {
auto& cache_sizes = get_cache_sizes();
return cache_sizes[CacheLevel::L3_CACHE] ? cache_sizes[CacheLevel::L3_CACHE]
: cache_sizes[CacheLevel::L2_CACHE];
}

static std::vector<size_t> get_core_ids();

static bool is_cgroup_with_cpuset() { return is_cgroup_with_cpuset_; }
Expand Down
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.starrocks.common.StarRocksException;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.optimizer.operator.TopNType;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TNormalPlanNode;
Expand Down Expand Up @@ -220,8 +221,14 @@ protected void toThrift(TPlanNode msg) {
msg.sort_node = new TSortNode(sortInfo, useTopN);
msg.sort_node.setOffset(offset);
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
msg.sort_node.setMax_buffered_rows(sessionVariable.getFullSortMaxBufferedRows());
msg.sort_node.setMax_buffered_bytes(sessionVariable.getFullSortMaxBufferedBytes());
SessionVariable defaultVariable = GlobalStateMgr.getCurrentState().getVariableMgr().getDefaultSessionVariable();
if (sessionVariable.getFullSortMaxBufferedBytes() != defaultVariable.getFullSortMaxBufferedBytes()) {
msg.sort_node.setMax_buffered_bytes(sessionVariable.getFullSortMaxBufferedBytes());
}
if (sessionVariable.getFullSortMaxBufferedRows() != defaultVariable.getFullSortMaxBufferedRows()) {
msg.sort_node.setMax_buffered_rows(sessionVariable.getFullSortMaxBufferedRows());
}

msg.sort_node.setLate_materialization(sessionVariable.isFullSortLateMaterialization());
msg.sort_node.setEnable_parallel_merge(sessionVariable.isEnableParallelMerge());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2040,10 +2040,10 @@ public long getConnectorSinkTargetMaxFileSize() {
private long groupConcatMaxLen = 1024;

@VariableMgr.VarAttr(name = FULL_SORT_MAX_BUFFERED_ROWS, flag = VariableMgr.INVISIBLE)
private long fullSortMaxBufferedRows = 1024000;
private long fullSortMaxBufferedRows = 1 * 1024 * 1024 * 1024;

@VariableMgr.VarAttr(name = FULL_SORT_MAX_BUFFERED_BYTES, flag = VariableMgr.INVISIBLE)
private long fullSortMaxBufferedBytes = 16L * 1024 * 1024;
private long fullSortMaxBufferedBytes = 256L * 1024 * 1024;

@VariableMgr.VarAttr(name = FULL_SORT_LATE_MATERIALIZATION_V2, alias = FULL_SORT_LATE_MATERIALIZATION,
show = FULL_SORT_LATE_MATERIALIZATION)
Expand Down

0 comments on commit d790856

Please sign in to comment.