Skip to content

Commit

Permalink
[Opt] (multi-catalog) opt max scanner thread number in batch split mo…
Browse files Browse the repository at this point in the history
…de. (apache#44635)

### What problem does this PR solve?

Problem Summary:

There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance. However, when calculating
the concurrency of scanners in the scan operator in batch split mode, it
is not divided by 1, but by `query_parallel_instance_num`, resulting in
poor performance of batch split mode.
  • Loading branch information
kaka11chen authored Jan 22, 2025
1 parent 13e6b97 commit d7c99f8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 21 deletions.
45 changes: 31 additions & 14 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vfile_scanner.h"

namespace doris::pipeline {
Expand All @@ -37,9 +38,10 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}

auto& p = _parent->cast<FileScanOperatorX>();
uint32_t shard_num = std::min(
config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(),
_max_scanners);
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
uint32_t shard_num =
std::min(config::doris_scanner_thread_pool_thread_num / p.query_parallel_instance_num(),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
Expand All @@ -60,28 +62,43 @@ std::string FileScanLocalState::name_suffix() const {

void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
_max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
_max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}
auto& p = _parent->cast<FileScanOperatorX>();

auto calc_max_scanners = [&](int parallel_instance_num) -> int {
int max_scanners = config::doris_scanner_thread_pool_thread_num / parallel_instance_num;
max_scanners =
std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1);
if (should_run_serial()) {
max_scanners = 1;
}
return max_scanners;
};

if (scan_ranges.size() == 1) {
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
p._batch_split_mode = true;
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime");

_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
_max_scanners);
}
}
if (_split_source == nullptr) {
_split_source =
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, _max_scanners);

if (!p._batch_split_mode) {
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
if (_split_source == nullptr) {
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
}
// currently the total number of splits in the bach split mode cannot be accurately obtained,
// so we don't do it in the batch split mode.
_max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
}
_max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());

if (scan_ranges.size() > 0 &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {

bool is_file_scan_operator() const override { return true; }

// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
int query_parallel_instance_num() const override {
return _batch_split_mode ? 1 : _query_parallel_instance_num;
}

private:
friend class FileScanLocalState;

const std::string _table_name;
bool _batch_split_mode = false;
};

#include "common/compile_check_end.h"
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,8 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator());
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator(),
p.query_parallel_instance_num());
return Status::OK();
}

Expand Down Expand Up @@ -1205,6 +1206,8 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
}
}

_query_parallel_instance_num = state->query_parallel_instance_num();

return Status::OK();
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ class ScanOperatorX : public OperatorX<LocalStateType> {

[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }

[[nodiscard]] virtual int query_parallel_instance_num() const {
return _query_parallel_instance_num;
}

const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
}
Expand Down Expand Up @@ -434,6 +438,8 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;

int _query_parallel_instance_num = 0;

std::vector<int> topn_filter_source_node_ids;
};

Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ScannerContext::ScannerContext(
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution,
bool is_file_scan_operator)
bool is_file_scan_operator, int num_parallel_instances)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
Expand All @@ -60,7 +60,8 @@ ScannerContext::ScannerContext(
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution),
_is_file_scan_operator(is_file_scan_operator) {
_is_file_scan_operator(is_file_scan_operator),
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
Expand Down Expand Up @@ -105,8 +106,6 @@ Status ScannerContext::init() {
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

const int num_parallel_instances = _state->query_parallel_instance_num();

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
Expand Down Expand Up @@ -176,7 +175,7 @@ Status ScannerContext::init() {
} else {
const size_t factor = _is_file_scan_operator ? 1 : 4;
_max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);
_num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution, bool is_file_scan_operator);
bool ignore_data_distribution, bool is_file_scan_operator,
int num_parallel_instances);

~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
Expand Down Expand Up @@ -213,6 +214,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;
bool _is_file_scan_operator = false;
int _num_parallel_instances;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
Expand Down

0 comments on commit d7c99f8

Please sign in to comment.