Skip to content

Commit

Permalink
[fix][common] Implement pthread PriorWorkerSet.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor committed Mar 15, 2024
1 parent 6048417 commit 39c7420
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 209 deletions.
1 change: 1 addition & 0 deletions conf/coordinator-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
-dingo_log_switch_coor_watch=true
-dingo_log_switch_coor_lease=true
-default_replica_num=3
-use_pthread_prior_worker_set=true
3 changes: 2 additions & 1 deletion conf/coordinator.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ cluster:
server:
host: $SERVER_HOST$
port: $SERVER_PORT$
worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_ratio: 1.0 # cpu core * ratio
brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num
coordinator_service_worker_num: 32 # must < server.worker_thread_num
coordinator_service_worker_max_pending_num: 1024 # 0 is unlimited
meta_service_worker_num: 32 # must < server.worker_thread_num
Expand Down
1 change: 1 addition & 0 deletions conf/index-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
-enable_dir_service=false
-enable_threads_service=false
-dingo_log_switch_txn_detail=true
-use_pthread_prior_worker_set=true
10 changes: 4 additions & 6 deletions conf/index.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ server:
metrics_collect_interval_s: 300
approximate_size_metrics_collect_interval_s: 300
scrub_vector_index_interval_s: 60
# for vector index, there is a limit:
# read_worker_num + write_worker_num + raft_apply_worker_num + background_thread_num + fast_background_thread_num
# must < server.worker_thread_num
worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_ratio: 2 # cpu core * ratio
brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num
read_worker_num: 32 # the number of read worker used by index_service
read_worker_max_pending_num: 1024 # 0 is unlimited
write_worker_num: 24 # the number of write worker used by index_service
write_worker_num: 32 # the number of write worker used by index_service
write_worker_max_pending_num: 1024 # 0 is unlimited
raft_apply_worker_num: 24 # the number of raft apply worker used by store_state_machine
raft_apply_worker_num: 16 # the number of raft apply worker used by store_state_machine
raft_apply_worker_max_pending_num: 1024 # 0 is unlimited
region:
region_max_size: 536870912 # 512MB
Expand Down
1 change: 1 addition & 0 deletions conf/store-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
-enable_dir_service=false
-enable_threads_service=false
-dingo_log_switch_txn_detail=true
-use_pthread_prior_worker_set=true
12 changes: 5 additions & 7 deletions conf/store.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ server:
heartbeat_interval_s: 6
metrics_collect_interval_s: 300
approximate_size_metrics_collect_interval_s: 300
# for store, there is a limit:
# read_worker_num + write_worker_num + raft_apply_worker_num
# must < server.worker_thread_num
worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio
# worker_thread_ratio: 1 # cpu core * ratio
read_worker_num: 48 # # the number of read worker used by store_service
brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num
read_worker_num: 32 # # the number of read worker used by store_service
read_worker_max_pending_num: 1024 # 0 is unlimited
write_worker_num: 32 # the number of write worker used by store_service
write_worker_max_pending_num: 1024 # 0 is unlimited
raft_apply_worker_num: 32 # the number of raft apply worker used by store_state_machine
raft_apply_worker_max_pending_num: 1024 # 0 is unlimited
raft_apply_worker_num: 16 # the number of raft apply worker used by store_state_machine
raft_apply_worker_max_pending_num: 1024 # 0 is unlimited, this is a soft limit for raft_apply, but hard limit for client write
region:
region_max_size: 268435456 # 256MB
enable_auto_split: true
Expand Down
58 changes: 30 additions & 28 deletions src/common/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <atomic>
#include <cstdint>
#include <string>

#include "butil/compiler_specific.h"
#include "common/helper.h"
Expand Down Expand Up @@ -291,9 +292,10 @@ std::vector<std::vector<std::string>> WorkerSet::GetPendingTaskTrace() {
return traces;
}

PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count)
PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread)
: name_(name),
worker_num_(worker_num),
use_pthread_(use_pthread),
max_pending_task_count_(max_pending_task_count),
total_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_total_task_count", name)),
pending_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_pending_task_count", name)),
Expand All @@ -309,11 +311,13 @@ PriorWorkerSet::~PriorWorkerSet() {
}

bool PriorWorkerSet::Init() {
for (uint32_t i = 0; i < worker_num_; ++i) {
workers_.push_back(Bthread());
}
uint32_t i = 0;

auto worker_function = [this, &i]() {
if (use_pthread_) {
pthread_setname_np(pthread_self(), (name_ + ":" + std::to_string(i)).c_str());
}

auto worker_function = [this]() {
while (true) {
bthread_mutex_lock(&mutex_);
while (pending_task_count_.load(std::memory_order_relaxed) == 0) {
Expand All @@ -333,7 +337,7 @@ bool PriorWorkerSet::Init() {

bthread_mutex_unlock(&mutex_);

if (task != nullptr) {
if (BAIDU_UNLIKELY(task != nullptr)) {
task->Run();
queue_run_metrics_ << Helper::TimestampUs() - now_time_us;
DecPendingTaskCount();
Expand All @@ -342,16 +346,28 @@ bool PriorWorkerSet::Init() {
}
};

for (auto& bthread : workers_) {
bthread.Run(worker_function);
if (use_pthread_) {
for (i = 0; i < worker_num_; ++i) {
pthread_workers_.push_back(std::thread(worker_function));
}
} else {
for (i = 0; i < worker_num_; ++i) {
bthread_workers_.push_back(Bthread(worker_function));
}
}

return true;
}

void PriorWorkerSet::Destroy() {
for (const auto& bthread : workers_) {
bthread.Join();
if (use_pthread_) {
for (auto& std_thread : pthread_workers_) {
std_thread.join();
}
} else {
for (auto& bthread : bthread_workers_) {
bthread.Join();
}
}
}

Expand All @@ -367,24 +383,10 @@ bool PriorWorkerSet::Execute(TaskRunnablePtr task) {
IncPendingTaskCount();
IncTotalTaskCount();

// if the pending task count is less than the worker number, execute the task directly
// else push the task to the task queue
// the total count of pending task will be decreased in the worker function
// and the total concurrency is limited by the worker number
if (pend_task_count < worker_num_) {
int64_t now_time_us = Helper::TimestampUs();
task->Run();
queue_run_metrics_ << Helper::TimestampUs() - now_time_us;

DecPendingTaskCount();
Notify(WorkerEventType::kFinishTask);

} else {
bthread_mutex_lock(&mutex_);
tasks_.push(task);
bthread_cond_signal(&cond_);
bthread_mutex_unlock(&mutex_);
}
bthread_mutex_lock(&mutex_);
tasks_.push(task);
bthread_cond_signal(&cond_);
bthread_mutex_unlock(&mutex_);

return true;
}
Expand Down
14 changes: 9 additions & 5 deletions src/common/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <queue>
#include <string>
#include <thread>
#include <vector>

#include "bthread/execution_queue.h"
Expand Down Expand Up @@ -66,7 +67,7 @@ using TaskRunnablePtr = std::shared_ptr<TaskRunnable>;

// Custom Comparator for priority_queue
struct CompareTaskRunnable {
bool operator()(const TaskRunnablePtr& lhs, TaskRunnablePtr& rhs) const { return lhs < rhs; }
bool operator()(const TaskRunnablePtr& lhs, TaskRunnablePtr& rhs) const { return lhs.get() < rhs.get(); }
};

int ExecuteRoutine(void*, bthread::TaskIterator<TaskRunnablePtr>& iter);
Expand Down Expand Up @@ -171,11 +172,12 @@ using WorkerSetPtr = std::shared_ptr<WorkerSet>;

class PriorWorkerSet {
public:
PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count);
PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread);
~PriorWorkerSet();

static std::shared_ptr<PriorWorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count) {
return std::make_shared<PriorWorkerSet>(name, worker_num, max_pending_task_count);
static std::shared_ptr<PriorWorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count,
bool use_pthead) {
return std::make_shared<PriorWorkerSet>(name, worker_num, max_pending_task_count, use_pthead);
}

bool Init();
Expand Down Expand Up @@ -206,7 +208,9 @@ class PriorWorkerSet {
bthread_cond_t cond_;
std::priority_queue<TaskRunnablePtr, std::vector<TaskRunnablePtr>, CompareTaskRunnable> tasks_;

std::vector<Bthread> workers_;
bool use_pthread_;
std::vector<Bthread> bthread_workers_;
std::vector<std::thread> pthread_workers_;

int64_t max_pending_task_count_;
uint32_t worker_num_;
Expand Down
5 changes: 3 additions & 2 deletions src/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/threadpool.h"

#include <exception>
#include <string>

#include "fmt/core.h"

Expand All @@ -26,8 +27,8 @@ ThreadPool::ThreadPool(const std::string &thread_name, uint32_t thread_num)
total_task_count_metrics_(fmt::format("dingo_threadpool_{}_total_task_count", thread_name)),
pending_task_count_metrics_(fmt::format("dingo_threadpool_{}_pending_task_count", thread_name)) {
for (size_t i = 0; i < thread_num; ++i)
workers_.emplace_back([this] {
pthread_setname_np(pthread_self(), this->thread_name_.c_str());
workers_.emplace_back([this, &i] {
pthread_setname_np(pthread_self(), (thread_name_ + ":" + std::to_string(i)).c_str());

for (;;) {
TaskPtr task;
Expand Down
6 changes: 5 additions & 1 deletion src/engine/bdb_raw_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <random>
#include <string>
#include <string_view>
#include <thread>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -2113,7 +2114,10 @@ bool BdbRawEngine::Init(std::shared_ptr<Config> config, const std::vector<std::s
writer_ = std::make_shared<bdb::Writer>(GetSelfPtr());
DINGO_LOG(INFO) << fmt::format("[bdb] db path: {}", db_path_);

std::thread checkpoint_thread([this]() { bdb::BdbHelper::CheckpointThread(envp_, db_, is_close_); });
std::thread checkpoint_thread([this]() {
pthread_setname_np(pthread_self(), "bdb_chkpt");
bdb::BdbHelper::CheckpointThread(envp_, db_, is_close_);
});
checkpoint_thread.detach();

DINGO_LOG(INFO) << fmt::format("[bdb] db path: {}", db_path_);
Expand Down
1 change: 0 additions & 1 deletion src/engine/bdb_raw_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include <unordered_map>
#include <vector>

Expand Down
8 changes: 4 additions & 4 deletions src/raft/store_state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ void StoreStateMachine::on_apply(braft::Iterator& iter) {
bool need_apply = true;
// Check region state
auto region_state = region_->State();
if (region_state == pb::common::StoreRegionState::DELETING ||
region_state == pb::common::StoreRegionState::DELETED ||
region_state == pb::common::StoreRegionState::TOMBSTONE) {
if (BAIDU_UNLIKELY(region_state == pb::common::StoreRegionState::DELETING ||
region_state == pb::common::StoreRegionState::DELETED ||
region_state == pb::common::StoreRegionState::TOMBSTONE)) {
std::string s = fmt::format("Region({}) is {} state, abandon apply log", region_->Id(),
pb::common::StoreRegionState_Name(region_state));
DINGO_LOG(WARNING) << fmt::format("[raft.sm][region({})] {}", region_->Id(), s);
Expand All @@ -142,7 +142,7 @@ void StoreStateMachine::on_apply(braft::Iterator& iter) {
}

// Check region epoch
if (need_apply && !Helper::IsEqualRegionEpoch(raft_cmd->header().epoch(), region_->Epoch())) {
if (BAIDU_UNLIKELY(need_apply && !Helper::IsEqualRegionEpoch(raft_cmd->header().epoch(), region_->Epoch()))) {
std::string s = fmt::format("Region({}) epoch is not match, region_epoch({}) raft_cmd_epoch({})", region_->Id(),
region_->EpochToString(), Helper::RegionEpochToString(raft_cmd->header().epoch()));
DINGO_LOG(WARNING) << fmt::format("[raft.sm][region({})] {}", region_->Id(), s);
Expand Down
Loading

0 comments on commit 39c7420

Please sign in to comment.