Skip to content

Commit

Permalink
msg/async/dpdk: exit condition waiting when DPDKStack is destructed
Browse files Browse the repository at this point in the history
exit() will call pthread_cond_destroy attempting to destroy dpdk::eal::cond
upon which other threads are currently blocked results in undefine
behavior. Link different libc version test, libc-2.17 can exit,
libc-2.27 will deadlock, the call stack is as follows:

Thread 3 (Thread 0xffff7e5749f0 (LWP 62213)):
 #0  0x0000ffff7f3c422c in futex_wait_cancelable (private=<optimized out>, expected=0,
    futex_word=0xaaaadc0e30f4 <dpdk::eal::cond+44>) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
 #1  __pthread_cond_wait_common (abstime=0x0, mutex=0xaaaadc0e30f8 <dpdk::eal::lock>, cond=0xaaaadc0e30c8 <dpdk::eal::cond>)
    at pthread_cond_wait.c:502
 #2  __pthread_cond_wait (cond=0xaaaadc0e30c8 <dpdk::eal::cond>, mutex=0xaaaadc0e30f8 <dpdk::eal::lock>)
    at pthread_cond_wait.c:655
 ceph#3  0x0000ffff7f1f1f80 in std::condition_variable::wait(std::unique_lock<std::mutex>&) ()
   from /usr/lib/aarch64-linux-gnu/libstdc++.so.6
 ceph#4  0x0000aaaad37f5078 in dpdk::eal::<lambda()>::operator()(void) const (__closure=<optimized out>, __closure=<optimized out>)
    at ./src/msg/async/dpdk/dpdk_rte.cc:136
 ceph#5  0x0000ffff7f1f7ed4 in ?? () from /usr/lib/aarch64-linux-gnu/libstdc++.so.6
 ceph#6  0x0000ffff7f3be088 in start_thread (arg=0xffffe73e197f) at pthread_create.c:463
 ceph#7  0x0000ffff7efc74ec in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:78

Thread 1 (Thread 0xffff7ee3b010 (LWP 62200)):
 #0  0x0000ffff7f3c3c38 in futex_wait (private=<optimized out>, expected=12, futex_word=0xaaaadc0e30ec <dpdk::eal::cond+36>)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:61
 #1  futex_wait_simple (private=<optimized out>, expected=12, futex_word=0xaaaadc0e30ec <dpdk::eal::cond+36>)
    at ../sysdeps/nptl/futex-internal.h:135
 #2  __pthread_cond_destroy (cond=0xaaaadc0e30c8 <dpdk::eal::cond>) at pthread_cond_destroy.c:54
 ceph#3  0x0000ffff7ef2be34 in __run_exit_handlers (status=-6, listp=0xffff7f04a5a0 <__exit_funcs>, run_list_atexit=255,
    run_list_atexit@entry=true, run_dtors=run_dtors@entry=true) at exit.c:108
 ceph#4  0x0000ffff7ef2bf6c in __GI_exit (status=<optimized out>) at exit.c:139
 ceph#5  0x0000ffff7ef176e4 in __libc_start_main (main=0x0, argc=0, argv=0x0, init=<optimized out>, fini=<optimized out>,
    rtld_fini=<optimized out>, stack_end=<optimized out>) at ../csu/libc-start.c:344
 ceph#6  0x0000aaaad2939db0 in _start () at ./src/include/buffer.h:642

Fixes: https://tracker.ceph.com/issues/42890
Signed-off-by: Chunsong Feng <[email protected]>
Signed-off-by: luo rixin <[email protected]>
  • Loading branch information
Chunsong Feng authored and rosinL committed Oct 15, 2021
1 parent ec4fbe9 commit 2c49202
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 38 deletions.
12 changes: 7 additions & 5 deletions src/msg/async/dpdk/DPDKStack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)
//
funcs.push_back(std::move(func));
int r = 0;
r = dpdk::eal::init(cct);
r = eal.start();
if (r < 0) {
lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
lderr(cct) << __func__ << " start dpdk rte failed, r=" << r << dendl;
ceph_abort();
}
// if dpdk::eal::init already called by NVMEDevice, we will select 1..n
// if eal.start already called by NVMEDevice, we will select 1..n
// cores
unsigned nr_worker = funcs.size();
ceph_assert(rte_lcore_count() >= nr_worker);
Expand All @@ -265,7 +265,7 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)
}
}
void *adapted_func = static_cast<void*>(&funcs.back());
dpdk::eal::execute_on_master([adapted_func, core_id, this]() {
eal.execute_on_master([adapted_func, core_id, this]() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, adapted_func, core_id);
if (r < 0) {
lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
Expand All @@ -276,7 +276,9 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)

void DPDKStack::join_worker(unsigned i)
{
dpdk::eal::execute_on_master([&]() {
eal.execute_on_master([&]() {
rte_eal_wait_lcore(i+1);
});
if (i+1 == get_num_worker())
eal.stop();
}
6 changes: 5 additions & 1 deletion src/msg/async/dpdk/DPDKStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "const.h"
#include "IP.h"
#include "Packet.h"
#include "dpdk_rte.h"

class interface;

Expand Down Expand Up @@ -246,6 +247,7 @@ class DPDKWorker : public Worker {
friend class DPDKServerSocketImpl<tcp4>;
};

using namespace dpdk;
class DPDKStack : public NetworkStack {
std::vector<std::function<void()> > funcs;

Expand All @@ -254,13 +256,15 @@ class DPDKStack : public NetworkStack {
}

public:
explicit DPDKStack(CephContext *cct): NetworkStack(cct) {
explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) {
funcs.reserve(cct->_conf->ms_async_op_threads);
}
virtual bool support_local_listen_table() const override { return true; }

virtual void spawn_worker(std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
private:
dpdk::eal eal;
};

#endif
52 changes: 29 additions & 23 deletions src/msg/async/dpdk/dpdk_rte.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ namespace dpdk {
return v;
}

bool eal::initialized = false;
std::thread eal::t;
std::mutex eal::lock;
std::condition_variable eal::cond;
std::list<std::function<void()>> eal::funcs;

static int bitcount(unsigned long long n)
{
return std::bitset<CHAR_BIT * sizeof(n)>{n}.count();
Expand Down Expand Up @@ -75,31 +69,33 @@ namespace dpdk {
return count;
}

int eal::init(CephContext *c)
bool eal::rte_initialized = false;

int eal::start()
{
if (initialized) {
return 1;
}

bool done = false;
auto coremask = c->_conf.get_val<std::string>("ms_dpdk_coremask");
auto coremask = cct->_conf.get_val<std::string>("ms_dpdk_coremask");
int coremaskbit = coremask_bitcount(coremask.c_str());

if (coremaskbit <= 0
|| static_cast<uint64_t>(coremaskbit) < c->_conf->ms_async_op_threads)
|| static_cast<uint64_t>(coremaskbit) < cct->_conf->ms_async_op_threads)
return -EINVAL;

t = std::thread([&]() {
// TODO: Inherit these from the app parameters - "opts"
std::vector<std::vector<char>> args {
string2vector("ceph"),
string2vector("-c"), string2vector(c->_conf.get_val<std::string>("ms_dpdk_coremask")),
string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel),
string2vector("-c"), string2vector(cct->_conf.get_val<std::string>("ms_dpdk_coremask")),
string2vector("-n"), string2vector(cct->_conf->ms_dpdk_memory_channel),
};

std::optional<std::string> hugepages_path;
if (!c->_conf->ms_dpdk_hugepages.empty()) {
hugepages_path.emplace(c->_conf->ms_dpdk_hugepages);
if (!cct->_conf->ms_dpdk_hugepages.empty()) {
hugepages_path.emplace(cct->_conf->ms_dpdk_hugepages);
}

// If "hugepages" is not provided and DPDK PMD drivers mode is requested -
Expand All @@ -123,13 +119,13 @@ namespace dpdk {

args.push_back(string2vector("-m"));
args.push_back(string2vector(size_MB_str.str()));
} else if (!c->_conf->ms_dpdk_pmd.empty()) {
} else if (!cct->_conf->ms_dpdk_pmd.empty()) {
args.push_back(string2vector("--no-huge"));
}

std::string rte_file_prefix;
rte_file_prefix = "rte_";
rte_file_prefix += c->_conf->name.to_str();
rte_file_prefix += cct->_conf->name.to_str();
args.push_back(string2vector("--file-prefix"));
args.push_back(string2vector(rte_file_prefix));

Expand All @@ -138,27 +134,28 @@ namespace dpdk {
for (auto&& a: args) {
cargs.push_back(a.data());
}
/* initialise the EAL for all */
int ret = rte_eal_init(cargs.size(), cargs.data());
if (ret < 0)
return ret;
if (!rte_initialized) {
/* initialise the EAL for all */
int ret = rte_eal_init(cargs.size(), cargs.data());
if (ret < 0)
return;
rte_initialized = true;
}

std::unique_lock<std::mutex> l(lock);
initialized = true;
done = true;
cond.notify_all();
while (true) {
while (!stopped) {
cond.wait(l, [this] { return !funcs.empty() || stopped; });
if (!funcs.empty()) {
auto f = std::move(funcs.front());
funcs.pop_front();
f();
cond.notify_all();
} else {
cond.wait(l);
}
}
});
t.detach();
std::unique_lock<std::mutex> l(lock);
while (!done)
cond.wait(l);
Expand All @@ -182,4 +179,13 @@ namespace dpdk {
return memsize;
}

void eal::stop()
{
assert(initialized);
assert(!stopped);
stopped = true;
cond.notify_all();
t.join();
}

} // namespace dpdk
23 changes: 14 additions & 9 deletions src/msg/async/dpdk/dpdk_rte.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ namespace dpdk {
class eal {
public:
using cpuset = std::bitset<RTE_MAX_LCORE>;

static std::mutex lock;
static std::condition_variable cond;
static std::list<std::function<void()>> funcs;
static int init(CephContext *c);
static void execute_on_master(std::function<void()> &&f) {
explicit eal(CephContext *cct) : cct(cct) {}
int start();
void stop();
void execute_on_master(std::function<void()> &&f) {
bool done = false;
std::unique_lock<std::mutex> l(lock);
funcs.emplace_back([&]() { f(); done = true; });
Expand All @@ -65,9 +63,16 @@ class eal {
*
* @return
*/
static size_t mem_size(int num_cpus);
static bool initialized;
static std::thread t;
size_t mem_size(int num_cpus);
static bool rte_initialized;
private:
CephContext *cct;
bool initialized = false;
bool stopped = false;
std::thread t;
std::mutex lock;
std::condition_variable cond;
std::list<std::function<void()>> funcs;
};

} // namespace dpdk
Expand Down

0 comments on commit 2c49202

Please sign in to comment.