Skip to content

Commit

Permalink
Merge branch 'master' into enhancement_nereids_kill-query-support-union
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR authored Nov 21, 2024
2 parents 4b2170f + bdef601 commit 3147d7e
Show file tree
Hide file tree
Showing 730 changed files with 30,970 additions and 5,536 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ on:
pull_request_target:
types:
- closed
- labeled
branches:
- master
permissions:
Expand All @@ -30,7 +31,7 @@ permissions:
jobs:
auto_cherry_pick:
runs-on: ubuntu-latest
if: ${{ (contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) && github.event.pull_request.merged == true }}
if: ${{(contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') ||github.event.label.name == 'dev/3.0.x' || github.event.label.name == 'dev/2.1.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand All @@ -54,18 +55,18 @@ jobs:
echo "SHA matches: $calculated_sha"
fi
- name: Auto cherry-pick to branch-3.0
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/3.0.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/3.0.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
CONFLICT_LABEL: dev/3.0.x-conflict
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
- name: Auto cherry-pick to branch-2.1
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/2.1.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-2.1.x
CONFLICT_LABEL: dev/2.1.x-conflict
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-2.1
61 changes: 38 additions & 23 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ message(STATUS "THIRDPARTY_DIR is ${THIRDPARTY_DIR}")

option(MAKE_TEST "ON for make unit test or OFF for not" OFF)
message(STATUS "make test: ${MAKE_TEST}")
option(BUILD_BENCHMARK "ON for make google benchmark or OFF for not" OFF)
message(STATUS "make benchmark: ${BUILD_BENCHMARK}")

option(WITH_MYSQL "Support access MySQL" ON)

Expand Down Expand Up @@ -568,7 +570,7 @@ if (OS_MACOSX)
)
endif()

if (MAKE_TEST)
if (BUILD_BENCHMARK)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
benchmark
Expand Down Expand Up @@ -708,6 +710,11 @@ if (MAKE_TEST)
endif()
endif ()

# use this to avoid some runtime tracker. reuse BE_TEST symbol, no need another.
if (BUILD_BENCHMARK)
add_definitions(-DBE_TEST)
endif()

get_directory_property(COMPILER_FLAGS COMPILE_OPTIONS)
get_directory_property(COMPILER_DEFINES COMPILE_DEFINITIONS)
message(STATUS "Compiler: ${CMAKE_CXX_COMPILER_ID}-${CMAKE_CXX_COMPILER_VERSION}")
Expand Down Expand Up @@ -754,7 +761,7 @@ add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/service) # this include doris_be
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/cloud)

Expand All @@ -772,36 +779,44 @@ add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/vec)
add_subdirectory(${SRC_DIR}/pipeline)

# this include doris_be_test
if (MAKE_TEST)
add_subdirectory(${TEST_DIR})
endif ()

add_subdirectory(${COMMON_SRC_DIR}/cpp ${BUILD_DIR}/src/common_cpp)

# Install be
install(DIRECTORY DESTINATION ${OUTPUT_DIR})
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/bin)
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/conf)

install(FILES
${BASE_DIR}/../bin/start_be.sh
${BASE_DIR}/../bin/stop_be.sh
${BASE_DIR}/../tools/jeprof
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE
GROUP_READ GROUP_WRITE GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
DESTINATION ${OUTPUT_DIR}/bin)

install(FILES
${BASE_DIR}/../conf/be.conf
${BASE_DIR}/../conf/odbcinst.ini
${BASE_DIR}/../conf/asan_suppr.conf
${BASE_DIR}/../conf/lsan_suppr.conf
DESTINATION ${OUTPUT_DIR}/conf)
if(NOT BUILD_BENCHMARK)
# Install be
install(DIRECTORY DESTINATION ${OUTPUT_DIR})
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/bin)
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/conf)

install(FILES
${BASE_DIR}/../bin/start_be.sh
${BASE_DIR}/../bin/stop_be.sh
${BASE_DIR}/../tools/jeprof
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE
GROUP_READ GROUP_WRITE GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
DESTINATION ${OUTPUT_DIR}/bin)

install(FILES
${BASE_DIR}/../conf/be.conf
${BASE_DIR}/../conf/odbcinst.ini
${BASE_DIR}/../conf/asan_suppr.conf
${BASE_DIR}/../conf/lsan_suppr.conf
DESTINATION ${OUTPUT_DIR}/conf)
endif()

get_property(dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES)
foreach(dir ${dirs})
message(STATUS "dir='${dir}'")
endforeach()


if (BUILD_BENCHMARK)
add_executable(benchmark_test ${BASE_DIR}/benchmark/benchmark_main.cpp)
target_link_libraries(benchmark_test ${DORIS_LINK_LIBS})
message(STATUS "Add benchmark to build")
install(TARGETS benchmark_test DESTINATION ${OUTPUT_DIR}/lib)
endif()
52 changes: 52 additions & 0 deletions be/benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <benchmark/benchmark.h>

#include <string>

#include "vec/columns/column_string.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_string.h"

namespace doris::vectorized { // change if need

static void Example1(benchmark::State& state) {
// init. dont time it.
state.PauseTiming();
Block block;
DataTypePtr str_type = std::make_shared<DataTypeString>();
std::vector<std::string> vals {100, "content"};
state.ResumeTiming();

// do test
for (auto _ : state) {
auto str_col = ColumnString::create();
for (auto& v : vals) {
str_col->insert_data(v.data(), v.size());
}
block.insert({std::move(str_col), str_type, "col"});
benchmark::DoNotOptimize(block); // mark the watched target
}
}
// could BENCHMARK many functions to compare them together.
BENCHMARK(Example1);

} // namespace doris::vectorized

BENCHMARK_MAIN();
6 changes: 3 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
}

std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
if (_is_enable_cgroup_v2_in_env) {
return std::make_unique<CgroupV2CpuCtl>(wg_id);
return std::make_shared<CgroupV2CpuCtl>(wg_id);
} else if (_is_enable_cgroup_v1_in_env) {
return std::make_unique<CgroupV1CpuCtl>(wg_id);
return std::make_shared<CgroupV1CpuCtl>(wg_id);
}
return nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CgroupCpuCtl {

static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

Expand Down
6 changes: 2 additions & 4 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first;
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
workload_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
wg->upsert_task_scheduler(&workload_group_info, _exec_env);
wg->upsert_task_scheduler(&workload_group_info);

// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// If there are historical versions of rowsets, we need to recalculate their delete
// bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets
int64_t start_calc_delete_bitmap_version =
already_exist_any_version ? 0 : sc_job->alter_version() + 1;
// [0-1] is a placeholder rowset, start from 2.
already_exist_any_version ? 2 : sc_job->alter_version() + 1;
RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
start_calc_delete_bitmap_version, initiator));
sc_job->set_delete_bitmap_lock_initiator(initiator);
Expand Down
31 changes: 22 additions & 9 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id) {
});
}

Status CloudStorageEngine::start_bg_threads() {
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "refresh_s3_info_thread",
[this]() { this->_refresh_storage_vault_info_thread_callback(); },
Expand Down Expand Up @@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() {
// compaction tasks producer thread
int base_thread_num = get_base_thread_num();
int cumu_thread_num = get_cumu_thread_num();
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.build(&_cumu_compaction_thread_pool));
if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cumu_compaction_thread_pool));
} else {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.build(&_cumu_compaction_thread_pool));
}
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
[this]() { this->_compaction_tasks_producer_callback(); },
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CloudStorageEngine final : public BaseStorageEngine {

Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) override;

Status start_bg_threads() override;
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;

Status set_cluster_id(int32_t cluster_id) override {
_effective_cluster_id = cluster_id;
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,12 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
continue;
}
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
// TODO: Segment::file_cache_key
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
Expand Down
32 changes: 26 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DEFINE_mString(public_host, "");

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -1004,7 +1025,7 @@ DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: {"path": "/path/to/file_cache", "total_size":53687091200, "normal_percent":85, "disposable_percent":10, "index_percent":5}
// format: {"path": "/path/to/file_cache", "total_size":53687091200, "ttl_percent":50, "normal_percent":40, "disposable_percent":5, "index_percent":5}
// format: [{"path": "xxx", "total_size":53687091200, "storage": "memory"}]
// Note1: storage is "disk" by default
// Note2: when the storage is "memory", the path is ignored. So you can set xxx to anything you like
Expand All @@ -1020,7 +1041,7 @@ DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
Expand Down Expand Up @@ -1301,8 +1322,6 @@ DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
DEFINE_mInt64(max_ttl_cache_ratio, "50");
// The maximum jvm heap usage ratio for hdfs write workload
DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
Expand Down Expand Up @@ -1370,6 +1389,7 @@ DEFINE_Int32(query_cache_size, "512");
DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
// Enable validation to check the correctness of table size.
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");

// clang-format off
#ifdef BE_TEST
Expand Down
Loading

0 comments on commit 3147d7e

Please sign in to comment.