Skip to content

Commit

Permalink
UCS: Introduce lightweight rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemy-Mellanox committed Dec 5, 2024
1 parent 66b03be commit 89415df
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/ucs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ noinst_HEADERS = \
time/timerq.h \
time/timer_wheel.h \
type/serialize.h \
type/rwlock.h \
type/float8.h \
async/async.h \
async/pipe.h \
Expand Down
8 changes: 8 additions & 0 deletions src/ucs/arch/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <ucs/sys/compiler_def.h>
#include <stddef.h>
#include <sched.h>

BEGIN_C_DECLS

Expand Down Expand Up @@ -176,6 +177,13 @@ static inline int ucs_cpu_prefer_relaxed_order()
const char *ucs_cpu_vendor_name();
const char *ucs_cpu_model_name();

#ifndef UCS_HAS_CPU_RELAX
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
sched_yield();
}
#endif

END_C_DECLS

#endif
11 changes: 11 additions & 0 deletions src/ucs/arch/x86_64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#ifdef __AVX__
# include <immintrin.h>
#endif
#ifdef __SSE2__
# include <emmintrin.h>
#endif

BEGIN_C_DECLS

Expand Down Expand Up @@ -132,6 +135,14 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len)
ucs_x86_memcpy_sse_movntdqa(dst, src, len);
}

#ifdef __SSE2__
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
_mm_pause();
}
#define UCS_HAS_CPU_RELAX
#endif

END_C_DECLS

#endif
Expand Down
120 changes: 120 additions & 0 deletions src/ucs/type/rwlock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCS_RWLOCK_H
#define UCS_RWLOCK_H

#include <ucs/arch/cpu.h>
#include <errno.h>

/**
* The ucs_rwlock_t type.
*
* Readers increment the counter by UCS_RWLOCK_READ (4)
* Writers set the UCS_RWLOCK_WRITE bit when lock is held
* and set the UCS_RWLOCK_WAIT bit while waiting.
*
* 31 2 1 0
* +-------------------+-+-+
* | readers | | |
* +-------------------+-+-+
* ^ ^
* | |
* WRITE: lock held ----/ |
* WAIT: writer pending --/
*/

#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */
#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */
#define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE)
#define UCS_RWLOCK_READ 0x4 /* Reader increment */

#define UCS_RWLOCK_STATIC_INITIALIZER {0}


/**
* Read-write lock.
*/
typedef struct {
volatile int l;
} ucs_rwlock_t;


static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock)
{
int x;

while (1) {
while (lock->l & UCS_RWLOCK_MASK) {
ucs_cpu_relax();
}

x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE);
if (!(x & UCS_RWLOCK_MASK)) {
return;
}

__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
}
}


static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock)
{
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
}


static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock)
{
int x;

while (1) {
x = lock->l;
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
return;
}

if (!(x & UCS_RWLOCK_WAIT)) {
__atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED);
}

while (lock->l > UCS_RWLOCK_WAIT) {
ucs_cpu_relax();
}
}
}


static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock)
{
int x;

x = lock->l;
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
return 0;
}

return -EBUSY;
}


static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock)
{
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED);
}


static inline void ucs_rwlock_init(ucs_rwlock_t *lock)
{
lock->l = 0;
}

#endif
143 changes: 143 additions & 0 deletions test/gtest/ucs/test_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ extern "C" {
#include <ucs/type/serialize.h>
#include <ucs/type/status.h>
#include <ucs/type/float8.h>
#include <ucs/type/rwlock.h>
}

#include <time.h>
#include <thread>
#include <chrono>
#include <vector>

class test_type : public ucs::test {
};
Expand Down Expand Up @@ -138,6 +142,145 @@ UCS_TEST_F(test_type, pack_float) {
UCS_FP8_PACK_UNPACK(TEST_LATENCY, 200000000));
}

class test_rwlock : public ucs::test {
protected:
void sleep()
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

void measure_one(int num, int writers, const std::function<void()> &r,
const std::function<void()> &w, const std::string &name)
{
std::vector<std::thread> tt;

tt.reserve(num);
auto start = std::chrono::high_resolution_clock::now();
for (int c = 0; c < num; c++) {
tt.emplace_back([&]() {
unsigned seed = time(0);
for (int i = 0; i < 1000000 / num; i++) {
if ((rand_r(&seed) % 256) < writers) {
w();
} else {
r();
}
}
});
}


for (auto &t : tt) {
t.join();
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;

UCS_TEST_MESSAGE << elapsed.count() * 1000 << " ms " << name << " "
<< std::to_string(num) << " threads "
<< std::to_string(writers) << " writers per 256 ";
}

void measure(const std::function<void()> &r,
const std::function<void()> &w, const std::string &name)
{
int m = std::thread::hardware_concurrency();
std::vector<int> threads = {1, 2, 4, m};
std::vector<int> writers_per_256 = {1, 25, 128, 250};

for (auto t : threads) {
for (auto writers : writers_per_256) {
measure_one(t, writers, r, w, name);
}
}
}
};

UCS_TEST_F(test_rwlock, lock) {
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;

ucs_rwlock_read_lock(&lock);
EXPECT_EQ(-EBUSY, ucs_rwlock_write_trylock(&lock));

ucs_rwlock_read_lock(&lock); /* second read lock should pass */

int write_taken = 0;
std::thread w([&]() {
ucs_rwlock_write_lock(&lock);
write_taken = 1;
ucs_rwlock_write_unlock(&lock);
});
sleep();
EXPECT_FALSE(write_taken); /* write lock should wait for read lock release */

ucs_rwlock_read_unlock(&lock);
sleep();
EXPECT_FALSE(write_taken); /* first read lock still holding lock */

int read_taken = 0;
std::thread r1([&]() {
ucs_rwlock_read_lock(&lock);
read_taken = 1;
ucs_rwlock_read_unlock(&lock);
});
sleep();
EXPECT_FALSE(read_taken); /* read lock should wait while write lock is waiting */

ucs_rwlock_read_unlock(&lock);
sleep();
EXPECT_TRUE(write_taken); /* write lock should be taken */
w.join();

sleep();
EXPECT_TRUE(read_taken); /* read lock should be taken */
r1.join();

EXPECT_EQ(0, ucs_rwlock_write_trylock(&lock));
read_taken = 0;
std::thread r2([&]() {
ucs_rwlock_read_lock(&lock);
read_taken = 1;
ucs_rwlock_read_unlock(&lock);
});
sleep();
EXPECT_FALSE(read_taken); /* read lock should wait for write lock release */

ucs_rwlock_write_unlock(&lock);
sleep();
EXPECT_TRUE(read_taken); /* read lock should be taken */
r2.join();
}

UCS_TEST_F(test_rwlock, perf) {
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;
measure(
[&]() {
ucs_rwlock_read_lock(&lock);
ucs_rwlock_read_unlock(&lock);
},
[&]() {
ucs_rwlock_write_lock(&lock);
ucs_rwlock_write_unlock(&lock);
},
"builtin");
}

UCS_TEST_F(test_rwlock, pthread) {
pthread_rwlock_t plock;
pthread_rwlock_init(&plock, NULL);
measure(
[&]() {
pthread_rwlock_rdlock(&plock);
pthread_rwlock_unlock(&plock);
},
[&]() {
pthread_rwlock_wrlock(&plock);
pthread_rwlock_unlock(&plock);
},
"pthread");
pthread_rwlock_destroy(&plock);
}

class test_init_once: public test_type {
protected:
test_init_once() : m_once(INIT_ONCE_INIT), m_count(0) {};
Expand Down

0 comments on commit 89415df

Please sign in to comment.