Skip to content

Commit

Permalink
ThreadBack: crash fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fchn289 committed Aug 6, 2024
1 parent cb28719 commit 73829cb
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/thread/AsyncBack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool AsyncBack::newTaskOK(const MT_TaskEntryFN& mt_aEntryFN, const TaskBackFN& a
[mt_aEntryFN, this]() // must cp than ref, otherwise dead loop
{
auto ret = mt_aEntryFN();
this->nDoneTh_.fetch_add(1, std::memory_order_relaxed); // fastest +1
this->nDoneFut_.fetch_add(1, std::memory_order_relaxed); // fastest +1
mt_pingMainTH();
return ret;
}
Expand Down
8 changes: 0 additions & 8 deletions src/thread/AsyncBack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,3 @@ class AsyncBack : public ThreadBack
// . thread pool?
// . then eg future.wait_for() can't be used, may high-risk to self-impl
// * thread pool costs more time than async (eg 0.3s:0.1s of 100 threads on belinb03)
// * hdlFinishedTasks()'s wait_for() each thread, may spend too long?
// . ThreadBack is to use for time-consuming tasks
// * should be not too many threads exist simultaneously
// * so should not spend too long (wait till new req appears)
// . mt_nFinishedThread_:
// . can reduce about half iterations with 15 more LOC
// . atomic is lightweight & fast than mutex
// * since little benefit, decide rm it
2 changes: 1 addition & 1 deletion src/thread/ThPoolBack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ThPoolBack::ThPoolBack(size_t aMaxThread)
this->taskQ_.pop_front();
}
task();
this->nDoneTh_.fetch_add(1, std::memory_order_relaxed); // fastest +1
this->nDoneFut_.fetch_add(1, std::memory_order_relaxed); // fastest +1
mt_pingMainTH(); // notify mainTH 1 task done
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/thread/ThPoolBack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// . code more complex than AsyncBack
//
// - Use-safe: yes with condition:
// . not support TOO many tasks that exceeds fut_backFN_S_/nDoneTh_/.. (impossible in most/normal cases)
// . not support TOO many tasks that exceeds fut_backFN_S_/nDoneFut_/.. (impossible in most/normal cases)
// . destructor will FOREVER wait all thread finished
// - MT safe: NO (can be used in main thread only)
// - Exception-safe: NO
Expand Down
20 changes: 11 additions & 9 deletions src/thread/ThreadBack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,31 @@ using namespace std;
namespace rlib
{
// ***********************************************************************************************
size_t ThreadBack::hdlFinishedTasks(UniLog& oneLog)
size_t ThreadBack::hdlDoneFut(UniLog& oneLog)
{
size_t nHandledTask = 0;
const auto nFinishedTask = nDoneTh_.exchange(0, memory_order_relaxed);
// impossible fut_backFN == fut_backFN_S_.end() if nHandledTask < nFinishedTask
for (auto&& fut_backFN = fut_backFN_S_.begin(); nHandledTask < nFinishedTask;)
size_t nHandledFut = 0;
const auto nDoneFut = nDoneFut_.load(memory_order_relaxed); // since possible nDoneFut_+1 before future::ready
// HID("(ThreadBack) nHandled=" << nHandledFut << '/' << nDoneFut << '|' << nFut());

// bugFix: if nDoneFut_+1 before future::ready, must check "fut_backFN != fut_backFN_S_.end()"
for (auto&& fut_backFN = fut_backFN_S_.begin(); nHandledFut < nDoneFut && fut_backFN != fut_backFN_S_.end();)
{
// - async() failure will throw exception -> terminate since compiling forbid exception
// - valid async()'s future never invalid
// - valid packaged_task's get_future() never invalid
auto& fut = fut_backFN->first;
// HID("(ThreadBack) nHandled=" << nHandledTask << '/' << nFinishedTask
// << ", valid=" << fut.valid() << ", backFn=" << &(fut_backFN->second));
if (fut.wait_for(0s) == future_status::ready)
{
fut_backFN->second(fut.get()); // callback
fut_backFN = fut_backFN_S_.erase(fut_backFN);
++nHandledTask;
++nHandledFut;
}
else
++fut_backFN;
} // 1 loop, simple & safe
return nHandledTask;

nDoneFut_.fetch_sub(nHandledFut, memory_order_relaxed);
return nHandledFut;
}

// ***********************************************************************************************
Expand Down
14 changes: 10 additions & 4 deletions src/thread/ThreadBack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class ThreadBack
// @brief: Processes completed threads and invokes their callbacks.
// @param UniLog: The logger to use for this operation.
// @return: The number of completed tasks processed.
size_t hdlFinishedTasks(UniLog& = UniLog::defaultUniLog_);
size_t hdlDoneFut(UniLog& = UniLog::defaultUniLog_);

auto nThread() { return fut_backFN_S_.size(); }
auto nFut() { return fut_backFN_S_.size(); }

static bool inMyMainTH()
{
Expand All @@ -57,15 +57,21 @@ class ThreadBack
protected:
// -------------------------------------------------------------------------------------------
StoreThreadBack fut_backFN_S_; // must save future till thread end
std::atomic<size_t> nDoneTh_ = 0; // improve main thread to search done thread(s)
std::atomic<size_t> nDoneFut_ = 0; // improve main thread to search done thread(s)

// -------------------------------------------------------------------------------------------
#ifdef RLIB_UT
public:
auto& nDoneFut() { return nDoneFut_; }
#endif
};

} // namespace
// ***********************************************************************************************
// YYYY-MM-DD Who v)Modification Description
// .......... ......... .......................................................................
// 2024-07-09 CSZ 1)create
// 2024-08-05 CSZ - nDoneTh_ to improve iteration of fut_backFN_S_
// 2024-08-05 CSZ - nDoneFut_ to improve iteration of fut_backFN_S_
// - MT_TaskEntryFN ret SafePtr<void> instead of bool
// ***********************************************************************************************
// - why SafePtr
Expand Down
6 changes: 3 additions & 3 deletions ut/thread/ThPoolBackTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ TEST_F(ThPoolBackTest, invalid_maxThread)
[] { return make_safe<bool>(true); }, // entryFn
[](SafePtr<void>) {} // backFn
)) << "REQ: can create new task";
while (threadBack_.hdlFinishedTasks() == 0)
while (threadBack_.hdlDoneFut() == 0)
timedwait(); // REQ: wait new task done
}

Expand All @@ -34,7 +34,7 @@ TEST_F(ThPoolBackTest, performance)
},
[](SafePtr<void>) {} // backFn
));
for (size_t nHandled = 0; nHandled < maxThread; nHandled += thPoolBack.hdlFinishedTasks())
for (size_t nHandled = 0; nHandled < maxThread; nHandled += thPoolBack.hdlDoneFut())
timedwait();
auto dur = duration_cast<chrono::microseconds>(high_resolution_clock::now() - start);
HID("ThPoolBack cost=" << dur.count() << "us");
Expand All @@ -52,7 +52,7 @@ TEST_F(ThPoolBackTest, performance)
},
[](SafePtr<void>) {} // backFn
));
for (size_t nHandled = 0; nHandled < maxThread; nHandled += asyncBack.hdlFinishedTasks())
for (size_t nHandled = 0; nHandled < maxThread; nHandled += asyncBack.hdlDoneFut())
timedwait();
dur = duration_cast<chrono::microseconds>(high_resolution_clock::now() - start);
HID("AsyncBack cost=" << dur.count() << "us");
Expand Down
60 changes: 43 additions & 17 deletions ut/thread/ThreadBackTest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <chrono>
#include <gtest/gtest.h>

#define RLIB_UT
#include "ThreadBack.hpp"
#undef RLIB_UT

#include "AsyncBack.hpp"
#include "MT_PingMainTH.hpp"
#include "MtInQueue.hpp"
Expand All @@ -26,12 +30,12 @@ struct THREAD_BACK_TEST : public Test, public UniLog
{
THREAD_BACK_TEST() : UniLog(UnitTest::GetInstance()->current_test_info()->name())
{
EXPECT_EQ(0, threadBack_.nThread()) << "REQ: clear env";
EXPECT_EQ(0, threadBack_.nFut()) << "REQ: clear env";
}

~THREAD_BACK_TEST()
{
EXPECT_EQ(0, threadBack_.nThread()) << "REQ: handle all";
EXPECT_EQ(0, threadBack_.nFut()) << "REQ: handle all";
mt_getQ().clearHdlrPool();
GTEST_LOG_FAIL
}
Expand All @@ -51,7 +55,7 @@ struct THREAD_BACK_TEST : public Test, public UniLog
// | |
// | |
// |<.....................| future<>
// ThreadBack::hdlFinishedTasks() |
// ThreadBack::hdlDoneFut() |
// |
TEST_F(THREAD_BACK_TEST, GOLD_entryFn_inNewThread_thenBackFn_inMainThread_withTimedWait)
{
Expand All @@ -70,10 +74,10 @@ TEST_F(THREAD_BACK_TEST, GOLD_entryFn_inNewThread_thenBackFn_inMainThread_withTi
}
));

while (threadBack_.hdlFinishedTasks() == 0)
while (threadBack_.hdlDoneFut() == 0)
{
INF("new thread not end yet...");
timedwait(); // REQ: timedwait() is more efficient than keep hdlFinishedTasks()
timedwait(); // REQ: timedwait() is more efficient than keep hdlDoneFut()
}
}
TEST_F(THREAD_BACK_TEST, GOLD_entryFnResult_toBackFn_withoutTimedWait)
Expand All @@ -97,7 +101,7 @@ TEST_F(THREAD_BACK_TEST, GOLD_entryFnResult_toBackFn_withoutTimedWait)
}

// REQ: no timedwait() but keep asking; cost most time in CI
for (size_t nHandled = 0; nHandled < maxThread; nHandled += threadBack_.hdlFinishedTasks())
for (size_t nHandled = 0; nHandled < maxThread; nHandled += threadBack_.hdlDoneFut())
{
INF("nHandled=" << nHandled);
}
Expand Down Expand Up @@ -127,14 +131,14 @@ TEST_F(THREAD_BACK_TEST, canHandle_someThreadDone_whileOtherRunning)
[](SafePtr<void>) {}
);

while (threadBack_.hdlFinishedTasks() == 0)
while (threadBack_.hdlDoneFut() == 0)
{
INF("both threads not end yet, wait...");
timedwait();
}

canEnd = true; // 1st thread keep running while 2nd is done
while (threadBack_.hdlFinishedTasks() == 0) // no timedwait() so keep occupy cpu
while (threadBack_.hdlDoneFut() == 0) // no timedwait() so keep occupy cpu
{
INF("2nd thread done, wait 1st done...")
timedwait();
Expand All @@ -154,15 +158,15 @@ TEST_F(THREAD_BACK_TEST, GOLD_entryFn_notify_insteadof_timeout)
auto dur = duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start);
EXPECT_LT(dur.count(), 500) << "REQ: entryFn end shall notify g_semToMainTH instead of timeout";

while (threadBack_.hdlFinishedTasks() == 0) // clear all threads
while (threadBack_.hdlDoneFut() == 0) // clear all threads
timedwait();
}

#define ABNORMAL
// ***********************************************************************************************
TEST_F(THREAD_BACK_TEST, emptyThreadList_ok)
{
size_t nHandled = threadBack_.hdlFinishedTasks();
size_t nHandled = threadBack_.hdlDoneFut();
EXPECT_EQ(0u, nHandled);
}
TEST_F(THREAD_BACK_TEST, invalid_msgSelf_entryFN_backFN)
Expand All @@ -179,7 +183,29 @@ TEST_F(THREAD_BACK_TEST, invalid_msgSelf_entryFN_backFN)
MT_TaskEntryFN(nullptr), // invalid since entryFn==nullptr
[](SafePtr<void>) {} // backFn
));
EXPECT_EQ(0, threadBack_.nThread());
EXPECT_EQ(0, threadBack_.nFut());
}
TEST_F(THREAD_BACK_TEST, bugFix_nDoneFut_before_futureReady)
{
atomic<bool> canEnd(false);
threadBack_.newTaskOK(
// MT_TaskEntryFN
[&canEnd]()
{
while (not canEnd)
this_thread::yield(); // not end until instruction
return make_safe<bool>(true);
},
// TaskBackFN
[](SafePtr<void>) {}
);

threadBack_.nDoneFut()++; // force +1 before future ready, threadBack_ shall not crash

// clean
canEnd = true;
while (threadBack_.hdlDoneFut() == 0)
timedwait();
}

#define SEM_TEST
Expand Down Expand Up @@ -253,21 +279,21 @@ TEST_F(THREAD_BACK_TEST, GOLD_integrate_MsgSelf_ThreadBack_MtInQueue) // simula
for (;;)
{
// handle all done Thread
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nThread());
threadBack_.hdlFinishedTasks();
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nFut());
threadBack_.hdlDoneFut();

// handle all existing in mt_getQ()
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nThread());
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nFut());
mt_getQ().handleAllEle();

INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nThread());
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nFut());
msgSelf_->handleAllMsg();

INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nThread());
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nFut());
if (expect == cb_info)
return;

INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nThread());
INF("nMsg=" << msgSelf_->nMsg() << ", nQ=" << mt_getQ().mt_size(true) << ", nTh=" << threadBack_.nFut());
timedwait();
}
}
Expand Down

0 comments on commit 73829cb

Please sign in to comment.