Skip to content

Commit

Permalink
fix(core): fix task not run bug when no right notify_one
Browse files Browse the repository at this point in the history
  • Loading branch information
etkmao authored and siguangli committed Jan 7, 2025
1 parent 501e45a commit dfee909
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 13 deletions.
3 changes: 1 addition & 2 deletions modules/footstone/include/footstone/cv_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ class CVDriver: public Driver {
virtual ~CVDriver() = default;

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

private:
std::condition_variable cv_;
std::mutex mutex_;
};

}
Expand Down
7 changes: 5 additions & 2 deletions modules/footstone/include/footstone/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#pragma once

#include <functional>

#include <mutex>
#include "footstone/time_delta.h"

namespace footstone {
Expand All @@ -35,7 +35,8 @@ class Driver {
virtual ~Driver() = default;

virtual void Notify() = 0;
virtual void WaitFor(const TimeDelta& delta) = 0;
std::mutex& Mutex() { return mutex_; }
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) = 0;
virtual void Start() = 0;
virtual void Terminate() = 0;

Expand All @@ -60,6 +61,8 @@ class Driver {
*
*/
bool is_exit_immediately_;

std::mutex mutex_;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class LooperDriver: public Driver {
virtual ~LooperDriver();

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class LooperDriver: public Driver {
virtual ~LooperDriver();

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

void OnTimerFire(CFRunLoopTimerRef timer);

private:
Expand Down
2 changes: 2 additions & 0 deletions modules/footstone/include/footstone/task_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class TaskRunner {
std::unique_ptr<IdleTask> PopIdleTask();
std::unique_ptr<Task> GetTopDelayTask();
std::unique_ptr<Task> GetNext();
bool HasTask();
bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now);

std::queue<std::unique_ptr<Task>> task_queue_;
std::mutex queue_mutex_;
Expand Down
3 changes: 3 additions & 0 deletions modules/footstone/include/footstone/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class Worker {
void BalanceNoLock();
void SortNoLock();

bool HasTask();
bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now);

int32_t WorkerKeyCreate(uint32_t task_runner_id, const std::function<void(void *)>& destruct);
bool WorkerKeyDelete(uint32_t task_runner_id, int32_t key);
bool WorkerSetSpecific(uint32_t task_runner_id, int32_t key, void *p);
Expand Down
5 changes: 2 additions & 3 deletions modules/footstone/src/cv_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ namespace footstone {
inline namespace runner {

void CVDriver::Notify() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.notify_one();
}

void CVDriver::WaitFor(const TimeDelta& delta) {
std::unique_lock<std::mutex> lock(mutex_);

void CVDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
if (delta != TimeDelta::Max() && delta >= TimeDelta::Zero()) {
cv_.wait_for(lock, std::chrono::nanoseconds(delta.ToNanoseconds()));
} else {
Expand Down
2 changes: 1 addition & 1 deletion modules/footstone/src/platform/adr/looper_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void LooperDriver::Notify() {
timerfd_settime(fd_, TFD_TIMER_ABSTIME, &spec, nullptr);
}

void LooperDriver::WaitFor(const TimeDelta& delta) {
void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
auto nano_secs = delta.ToNanoseconds();
if (nano_secs < 1) {
nano_secs = 1;
Expand Down
2 changes: 1 addition & 1 deletion modules/footstone/src/platform/ios/looper_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void LooperDriver::Notify() {
CFAbsoluteTimeGetCurrent());
}

void LooperDriver::WaitFor(const TimeDelta& delta) {
void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
CFRunLoopTimerSetNextFireDate(
delayed_wake_timer_,
CFAbsoluteTimeGetCurrent() + delta.ToSecondsF());
Expand Down
48 changes: 48 additions & 0 deletions modules/footstone/src/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,53 @@ void TaskRunner::RunnerDestroySpecifics() {
return worker->WorkerDestroySpecific(task_runner_id);
}

bool TaskRunner::HasTask() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (!task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(delay_mutex_);
if (!delayed_task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(idle_mutex_);
if (!idle_task_queue_.empty()) {
return true;
}
}
return false;
}

bool TaskRunner::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (!task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(idle_mutex_);
if (!idle_task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(delay_mutex_);
if (!delayed_task_queue_.empty()) {
const DelayedEntry& delayed_task = delayed_task_queue_.top();
if(delayed_task.first - now < min_wait_time) {
return true;
}
}
}

return false;
}

} // namespace runner
} // namespace footstone
70 changes: 69 additions & 1 deletion modules/footstone/src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,20 @@ std::unique_ptr<Task> Worker::GetNextTask() {
}));
return wrapper_idle_task;
}
std::unique_lock<std::mutex> lock(driver_->Mutex());
if (driver_->IsTerminated()) {
return nullptr;
}
driver_->WaitFor(min_wait_time_);
if (min_wait_time_ == TimeDelta::Max()) {
if (HasTask()) {
return nullptr;
}
} else {
if (HasMoreUrgentTask(min_wait_time_, now)) {
return nullptr;
}
}
driver_->WaitFor(min_wait_time_, lock);
return nullptr;
}

Expand Down Expand Up @@ -532,5 +542,63 @@ void Worker::UpdateSpecific(uint32_t task_runner_id,
specific_map_[task_runner_id] = array; // insert or update
}

bool Worker::HasTask() {
bool has_found = false;
for (auto &group : running_group_list_) {
for (auto &runner : group) {
if (runner->HasTask()) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
if (!has_found) {
for (auto &group : pending_group_list_) {
for (auto &runner : group) {
if (runner->HasTask()) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
}
return has_found;
}

bool Worker::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) {
bool has_found = false;
for (auto &group : running_group_list_) {
for (auto &runner : group) {
if (runner->HasMoreUrgentTask(min_wait_time, now)) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
if (!has_found) {
for (auto &group : pending_group_list_) {
for (auto &runner : group) {
if (runner->HasMoreUrgentTask(min_wait_time, now)) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
}
return has_found;
}

} // namespace runner
} // namespace footstone

0 comments on commit dfee909

Please sign in to comment.