Skip to content

Commit

Permalink
UCS: Introduce lightweight rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemy-Mellanox committed Dec 5, 2024
1 parent 66b03be commit 24a457b
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/ucs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ nobase_dist_libucs_la_HEADERS = \
type/param.h \
type/init_once.h \
type/spinlock.h \
type/rwlock.h \
type/status.h \
type/thread_mode.h \
type/cpu_set.h \
Expand Down
8 changes: 8 additions & 0 deletions src/ucs/arch/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <ucs/sys/compiler_def.h>
#include <stddef.h>
#include <sched.h>

BEGIN_C_DECLS

Expand Down Expand Up @@ -176,6 +177,13 @@ static inline int ucs_cpu_prefer_relaxed_order()
const char *ucs_cpu_vendor_name();
const char *ucs_cpu_model_name();

#ifndef UCS_HAS_CPU_RELAX
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
sched_yield();
}
#endif

END_C_DECLS

#endif
11 changes: 11 additions & 0 deletions src/ucs/arch/x86_64/cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#ifdef __AVX__
# include <immintrin.h>
#endif
#ifdef __SSE2__
# include <emmintrin.h>
#endif

BEGIN_C_DECLS

Expand Down Expand Up @@ -132,6 +135,14 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len)
ucs_x86_memcpy_sse_movntdqa(dst, src, len);
}

#ifdef __SSE2__
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
{
_mm_pause();
}
#define UCS_HAS_CPU_RELAX
#endif

END_C_DECLS

#endif
Expand Down
114 changes: 114 additions & 0 deletions src/ucs/type/rwlock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCS_RWLOCK_H
#define UCS_RWLOCK_H

#include <ucs/arch/cpu.h>
#include <errno.h>

/**
* The ucs_rwlock_t type.
*
* Readers increment the counter by UCS_RWLOCK_READ (4)
* Writers set the UCS_RWLOCK_WRITE bit when lock is held
* and set the UCS_RWLOCK_WAIT bit while waiting.
*
* 31 2 1 0
* +-------------------+-+-+
* | readers | | |
* +-------------------+-+-+
* ^ ^
* | |
* WRITE: lock held ----/ |
* WAIT: writer pending --/
*/

#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */
#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */
#define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE)
#define UCS_RWLOCK_READ 0x4 /* Reader increment */

#define UCS_RWLOCK_STATIC_INITIALIZER {0}


/**
* Read-write lock.
*/
typedef struct {
volatile int l;
} ucs_rwlock_t;


static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock) {
int x;

while (1) {
while (lock->l & UCS_RWLOCK_MASK) {
ucs_cpu_relax();
}

x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE);
if (!(x & UCS_RWLOCK_MASK)) {
return;
}

__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
}
}


static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock) {
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
}


static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock) {
int x;

while (1) {
x = lock->l;
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
return;
}

if (!(x & UCS_RWLOCK_WAIT)) {
__atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED);
}

while (lock->l > UCS_RWLOCK_WAIT) {
ucs_cpu_relax();
}
}
}


static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock) {
int x;

x = lock->l;
if ((x < UCS_RWLOCK_WRITE) &&
(__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
return 0;
}

return -EBUSY;
}


static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock) {
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED);
}


static inline void ucs_rwlock_init(ucs_rwlock_t *lock) {
lock->l = 0;
}

#endif
146 changes: 146 additions & 0 deletions test/gtest/ucs/test_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ extern "C" {
#include <ucs/type/serialize.h>
#include <ucs/type/status.h>
#include <ucs/type/float8.h>
#include <ucs/type/rwlock.h>
}

#include <time.h>
#include <thread>
#include <chrono>
#include <vector>

class test_type : public ucs::test {
};
Expand Down Expand Up @@ -138,6 +142,148 @@ UCS_TEST_F(test_type, pack_float) {
UCS_FP8_PACK_UNPACK(TEST_LATENCY, 200000000));
}

class test_rwlock : public ucs::test {
protected:
static void sleep() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

static void measure_one(int read_num, const std::function<void()>& r,
int write_num, const std::function<void()>& w,
const std::string& name) {
volatile int done = 0;
volatile int read_count = 0;
volatile int write_count = 0;
std::vector<std::thread> tt;

tt.reserve(read_num + write_num);
for (int c = 0; c < read_num; c++) {
tt.emplace_back([&]() {
while (!done) {
r();
read_count++;
}
});
}

for (int c = 0; c < write_num; c++) {
tt.emplace_back([&]() {
while (!done) {
w();
write_count++;
}
});
}

int ms = 100;
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
done = 1;

for (auto& t : tt) {
t.join();
}

UCS_TEST_MESSAGE << name << ": "
<< (read_count/1000.0/ms) << "/"
<< (write_count/1000.0/ms) << " Mops/s";
}

static void measure(const std::function<void()>& r,
const std::function<void()>& w,
const std::string& name) {
int m = std::thread::hardware_concurrency();

measure_one(1, r, 0, w, name + " read");
measure_one(2, r, 0, w, name + " read cont2");
measure_one(m, r, 0, w, name + " read cont");
measure_one(0, r, 1, w, name + " write");
measure_one(0, r, 2, w, name + " write cont2");
measure_one(0, r, m, w, name + " write cont");
measure_one(1, r, 1, w, name + " mix1");
measure_one(2, r, 1, w, name + " mix2");
measure_one(m, r, 1, w, name + " mix");
}
};

UCS_TEST_F(test_rwlock, lock) {
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;

ucs_rwlock_read_lock(&lock);
EXPECT_EQ(-EBUSY, ucs_rwlock_write_trylock(&lock));

ucs_rwlock_read_lock(&lock); /* second read lock should pass */

int write_taken = 0;
std::thread w([&]() {
ucs_rwlock_write_lock(&lock);
write_taken = 1;
ucs_rwlock_write_unlock(&lock);
});
sleep();
EXPECT_FALSE(write_taken); /* write lock should wait for read lock release */

ucs_rwlock_read_unlock(&lock);
sleep();
EXPECT_FALSE(write_taken); /* first read lock still holding lock */

int read_taken = 0;
std::thread r1([&]() {
ucs_rwlock_read_lock(&lock);
read_taken = 1;
ucs_rwlock_read_unlock(&lock);
});
sleep();
EXPECT_FALSE(read_taken); /* read lock should wait while write lock is waiting */

ucs_rwlock_read_unlock(&lock);
sleep();
EXPECT_TRUE(write_taken); /* write lock should be taken */
w.join();

sleep();
EXPECT_TRUE(read_taken); /* read lock should be taken */
r1.join();

EXPECT_EQ(0, ucs_rwlock_write_trylock(&lock));
read_taken = 0;
std::thread r2([&]() {
ucs_rwlock_read_lock(&lock);
read_taken = 1;
ucs_rwlock_read_unlock(&lock);
});
sleep();
EXPECT_FALSE(read_taken); /* read lock should wait for write lock release */

ucs_rwlock_write_unlock(&lock);
sleep();
EXPECT_TRUE(read_taken); /* read lock should be taken */
r2.join();
}

UCS_TEST_F(test_rwlock, perf) {
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;
measure([&]() {
ucs_rwlock_read_lock(&lock);
ucs_rwlock_read_unlock(&lock);
}, [&]() {
ucs_rwlock_write_lock(&lock);
ucs_rwlock_write_unlock(&lock);
}, "builtin");
}

UCS_TEST_F(test_rwlock, pthread) {
pthread_rwlock_t plock;
pthread_rwlock_init(&plock, NULL);
measure([&]() {
pthread_rwlock_rdlock(&plock);
pthread_rwlock_unlock(&plock);
}, [&]() {
pthread_rwlock_wrlock(&plock);
pthread_rwlock_unlock(&plock);
}, "pthread");
pthread_rwlock_destroy(&plock);
}

class test_init_once: public test_type {
protected:
test_init_once() : m_once(INIT_ONCE_INIT), m_count(0) {};
Expand Down

0 comments on commit 24a457b

Please sign in to comment.