Skip to content
This repository has been archived by the owner on Sep 27, 2019. It is now read-only.

Commit

Permalink
Modify WorkerPool threads to not sleep for fixed amount of time, but …
Browse files Browse the repository at this point in the history
…rather wake up when something goes into the task_queue for them to work on.
  • Loading branch information
mbutrovich committed Jun 27, 2018
1 parent d22bd24 commit 8aee4ea
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
6 changes: 5 additions & 1 deletion src/include/threadpool/mono_queue_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ inline void MonoQueuePool::SubmitTask(const F &func) {
if (!is_running_) {
Startup();
}
task_queue_.Enqueue(std::move(func));
{
std::lock_guard<std::mutex> lock(worker_pool_.cv_lock);
task_queue_.Enqueue(std::move(func));
}
worker_pool_.not_empty.notify_one();
}

inline MonoQueuePool &MonoQueuePool::GetInstance() {
Expand Down
5 changes: 5 additions & 0 deletions src/include/threadpool/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -52,6 +54,9 @@ class WorkerPool {
*/
uint32_t NumWorkers() const { return num_workers_; }

std::mutex cv_lock;
std::condition_variable not_empty;

private:
// The name of this pool
std::string pool_name_;
Expand Down
21 changes: 10 additions & 11 deletions src/threadpool/worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,31 @@
//
//===----------------------------------------------------------------------===//

#include <chrono>

#include "threadpool/worker_pool.h"

#include "common/logger.h"


namespace peloton {
namespace threadpool {

namespace {

void WorkerFunc(std::string thread_name, std::atomic_bool *is_running,
TaskQueue *task_queue) {
constexpr auto kMinPauseTime = std::chrono::microseconds(1);
constexpr auto kMaxPauseTime = std::chrono::microseconds(1000);
TaskQueue *task_queue, std::mutex *cv_lock,
std::condition_variable *not_empty) {

LOG_INFO("Thread %s starting ...", thread_name.c_str());

auto pause_time = kMinPauseTime;
while (is_running->load() || !task_queue->IsEmpty()) {
std::unique_lock<std::mutex> lock(*cv_lock);
not_empty->wait_for(lock, std::chrono::milliseconds(1), [&]{return !task_queue->IsEmpty();});
std::function<void()> task;
if (!task_queue->Dequeue(task)) {
// Polling with exponential back-off
std::this_thread::sleep_for(pause_time);
pause_time = std::min(pause_time * 2, kMaxPauseTime);
} else {
if (task_queue->Dequeue(task)) {
lock.unlock();
task();
pause_time = kMinPauseTime;
}
}

Expand All @@ -56,7 +55,7 @@ void WorkerPool::Startup() {
if (is_running_.compare_exchange_strong(running, true)) {
for (size_t i = 0; i < num_workers_; i++) {
std::string name = pool_name_ + "-worker-" + std::to_string(i);
workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_);
workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_, &cv_lock, &not_empty);
}
}
}
Expand Down

0 comments on commit 8aee4ea

Please sign in to comment.