From 89415df60f7de60b774528cb3e7e83233a894792 Mon Sep 17 00:00:00 2001 From: Artemy Kovalyov Date: Thu, 5 Dec 2024 06:58:33 +0000 Subject: [PATCH 1/2] UCS: Introduce lightweight rwlock --- src/ucs/Makefile.am | 1 + src/ucs/arch/cpu.h | 8 ++ src/ucs/arch/x86_64/cpu.h | 11 +++ src/ucs/type/rwlock.h | 120 ++++++++++++++++++++++++++++++ test/gtest/ucs/test_type.cc | 143 ++++++++++++++++++++++++++++++++++++ 5 files changed, 283 insertions(+) create mode 100644 src/ucs/type/rwlock.h diff --git a/src/ucs/Makefile.am b/src/ucs/Makefile.am index 9367835a9cb..d5ee6a9c7ff 100644 --- a/src/ucs/Makefile.am +++ b/src/ucs/Makefile.am @@ -135,6 +135,7 @@ noinst_HEADERS = \ time/timerq.h \ time/timer_wheel.h \ type/serialize.h \ + type/rwlock.h \ type/float8.h \ async/async.h \ async/pipe.h \ diff --git a/src/ucs/arch/cpu.h b/src/ucs/arch/cpu.h index 857b8b804cf..2adf4462d95 100644 --- a/src/ucs/arch/cpu.h +++ b/src/ucs/arch/cpu.h @@ -17,6 +17,7 @@ #include #include +#include BEGIN_C_DECLS @@ -176,6 +177,13 @@ static inline int ucs_cpu_prefer_relaxed_order() const char *ucs_cpu_vendor_name(); const char *ucs_cpu_model_name(); +#ifndef UCS_HAS_CPU_RELAX +static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() +{ + sched_yield(); +} +#endif + END_C_DECLS #endif diff --git a/src/ucs/arch/x86_64/cpu.h b/src/ucs/arch/x86_64/cpu.h index 5fe5ada190a..cfc00ca77d2 100644 --- a/src/ucs/arch/x86_64/cpu.h +++ b/src/ucs/arch/x86_64/cpu.h @@ -23,6 +23,9 @@ #ifdef __AVX__ # include #endif +#ifdef __SSE2__ +# include +#endif BEGIN_C_DECLS @@ -132,6 +135,14 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len) ucs_x86_memcpy_sse_movntdqa(dst, src, len); } +#ifdef __SSE2__ +static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() +{ + _mm_pause(); +} +#define UCS_HAS_CPU_RELAX +#endif + END_C_DECLS #endif diff --git a/src/ucs/type/rwlock.h b/src/ucs/type/rwlock.h new file mode 100644 index 00000000000..9060e1786e6 --- /dev/null +++ b/src/ucs/type/rwlock.h @@ -0,0 +1,120 @@ +/* +* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024. ALL RIGHTS RESERVED. +* +* See file LICENSE for terms. +*/ + +#ifndef UCS_RWLOCK_H +#define UCS_RWLOCK_H + +#include +#include + +/** + * The ucs_rwlock_t type. + * + * Readers increment the counter by UCS_RWLOCK_READ (4) + * Writers set the UCS_RWLOCK_WRITE bit when lock is held + * and set the UCS_RWLOCK_WAIT bit while waiting. + * + * 31 2 1 0 + * +-------------------+-+-+ + * | readers | | | + * +-------------------+-+-+ + * ^ ^ + * | | + * WRITE: lock held ----/ | + * WAIT: writer pending --/ + */ + +#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */ +#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */ +#define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE) +#define UCS_RWLOCK_READ 0x4 /* Reader increment */ + +#define UCS_RWLOCK_STATIC_INITIALIZER {0} + + +/** + * Read-write lock. + */ +typedef struct { + volatile int l; +} ucs_rwlock_t; + + +static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock) +{ + int x; + + while (1) { + while (lock->l & UCS_RWLOCK_MASK) { + ucs_cpu_relax(); + } + + x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE); + if (!(x & UCS_RWLOCK_MASK)) { + return; + } + + __atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED); + } +} + + +static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock) +{ + __atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED); +} + + +static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock) +{ + int x; + + while (1) { + x = lock->l; + if ((x < UCS_RWLOCK_WRITE) && + (__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0, + __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { + return; + } + + if (!(x & UCS_RWLOCK_WAIT)) { + __atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED); + } + + while (lock->l > UCS_RWLOCK_WAIT) { + ucs_cpu_relax(); + } + } +} + + +static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock) +{ + int x; + + x = lock->l; + if ((x < UCS_RWLOCK_WRITE) && + (__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1, + __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { + return 0; + } + + return -EBUSY; +} + + +static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock) +{ + __atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED); +} + + +static inline void ucs_rwlock_init(ucs_rwlock_t *lock) +{ + lock->l = 0; +} + +#endif diff --git a/test/gtest/ucs/test_type.cc b/test/gtest/ucs/test_type.cc index 082aeb988e7..0bf3fcd9710 100644 --- a/test/gtest/ucs/test_type.cc +++ b/test/gtest/ucs/test_type.cc @@ -11,9 +11,13 @@ extern "C" { #include #include #include +#include } #include +#include +#include +#include class test_type : public ucs::test { }; @@ -138,6 +142,145 @@ UCS_TEST_F(test_type, pack_float) { UCS_FP8_PACK_UNPACK(TEST_LATENCY, 200000000)); } +class test_rwlock : public ucs::test { +protected: + void sleep() + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + void measure_one(int num, int writers, const std::function &r, + const std::function &w, const std::string &name) + { + std::vector tt; + + tt.reserve(num); + auto start = std::chrono::high_resolution_clock::now(); + for (int c = 0; c < num; c++) { + tt.emplace_back([&]() { + unsigned seed = time(0); + for (int i = 0; i < 1000000 / num; i++) { + if ((rand_r(&seed) % 256) < writers) { + w(); + } else { + r(); + } + } + }); + } + + + for (auto &t : tt) { + t.join(); + } + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = end - start; + + UCS_TEST_MESSAGE << elapsed.count() * 1000 << " ms " << name << " " + << std::to_string(num) << " threads " + << std::to_string(writers) << " writers per 256 "; + } + + void measure(const std::function &r, + const std::function &w, const std::string &name) + { + int m = std::thread::hardware_concurrency(); + std::vector threads = {1, 2, 4, m}; + std::vector writers_per_256 = {1, 25, 128, 250}; + + for (auto t : threads) { + for (auto writers : writers_per_256) { + measure_one(t, writers, r, w, name); + } + } + } +}; + +UCS_TEST_F(test_rwlock, lock) { + ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + + ucs_rwlock_read_lock(&lock); + EXPECT_EQ(-EBUSY, ucs_rwlock_write_trylock(&lock)); + + ucs_rwlock_read_lock(&lock); /* second read lock should pass */ + + int write_taken = 0; + std::thread w([&]() { + ucs_rwlock_write_lock(&lock); + write_taken = 1; + ucs_rwlock_write_unlock(&lock); + }); + sleep(); + EXPECT_FALSE(write_taken); /* write lock should wait for read lock release */ + + ucs_rwlock_read_unlock(&lock); + sleep(); + EXPECT_FALSE(write_taken); /* first read lock still holding lock */ + + int read_taken = 0; + std::thread r1([&]() { + ucs_rwlock_read_lock(&lock); + read_taken = 1; + ucs_rwlock_read_unlock(&lock); + }); + sleep(); + EXPECT_FALSE(read_taken); /* read lock should wait while write lock is waiting */ + + ucs_rwlock_read_unlock(&lock); + sleep(); + EXPECT_TRUE(write_taken); /* write lock should be taken */ + w.join(); + + sleep(); + EXPECT_TRUE(read_taken); /* read lock should be taken */ + r1.join(); + + EXPECT_EQ(0, ucs_rwlock_write_trylock(&lock)); + read_taken = 0; + std::thread r2([&]() { + ucs_rwlock_read_lock(&lock); + read_taken = 1; + ucs_rwlock_read_unlock(&lock); + }); + sleep(); + EXPECT_FALSE(read_taken); /* read lock should wait for write lock release */ + + ucs_rwlock_write_unlock(&lock); + sleep(); + EXPECT_TRUE(read_taken); /* read lock should be taken */ + r2.join(); +} + +UCS_TEST_F(test_rwlock, perf) { + ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + measure( + [&]() { + ucs_rwlock_read_lock(&lock); + ucs_rwlock_read_unlock(&lock); + }, + [&]() { + ucs_rwlock_write_lock(&lock); + ucs_rwlock_write_unlock(&lock); + }, + "builtin"); +} + +UCS_TEST_F(test_rwlock, pthread) { + pthread_rwlock_t plock; + pthread_rwlock_init(&plock, NULL); + measure( + [&]() { + pthread_rwlock_rdlock(&plock); + pthread_rwlock_unlock(&plock); + }, + [&]() { + pthread_rwlock_wrlock(&plock); + pthread_rwlock_unlock(&plock); + }, + "pthread"); + pthread_rwlock_destroy(&plock); +} + class test_init_once: public test_type { protected: test_init_once() : m_once(INIT_ONCE_INIT), m_count(0) {}; From 2c3a0e8c0bdaef24f1ebfa84238738c65611d0bf Mon Sep 17 00:00:00 2001 From: Artemy Kovalyov Date: Tue, 10 Dec 2024 15:36:31 +0200 Subject: [PATCH 2/2] UCS: Introduce lightweight rwlock - 2 --- src/ucs/arch/aarch64/cpu.h | 5 ++ src/ucs/arch/atomic.h | 56 +++++++++++++ src/ucs/arch/cpu.h | 7 -- src/ucs/arch/ppc64/cpu.h | 7 ++ src/ucs/arch/rv64/cpu.h | 5 ++ src/ucs/arch/x86_64/cpu.h | 5 +- src/ucs/type/rwlock.h | 79 +++++++++++------- test/gtest/ucs/test_type.cc | 162 +++++++++++++++++++++++++----------- 8 files changed, 237 insertions(+), 89 deletions(-) diff --git a/src/ucs/arch/aarch64/cpu.h b/src/ucs/arch/aarch64/cpu.h index 213ea6d1f24..45a05dcb2de 100644 --- a/src/ucs/arch/aarch64/cpu.h +++ b/src/ucs/arch/aarch64/cpu.h @@ -301,6 +301,11 @@ static inline ucs_status_t ucs_arch_get_cache_size(size_t *cache_sizes) return UCS_ERR_UNSUPPORTED; } +static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() +{ + asm volatile ("yield" ::: "memory"); +} + END_C_DECLS #endif diff --git a/src/ucs/arch/atomic.h b/src/ucs/arch/atomic.h index 849647902fa..eb019114efa 100644 --- a/src/ucs/arch/atomic.h +++ b/src/ucs/arch/atomic.h @@ -8,6 +8,7 @@ #ifndef UCS_ARCH_ATOMIC_H #define UCS_ARCH_ATOMIC_H +#include #include #if defined(__x86_64__) @@ -138,4 +139,59 @@ UCS_DEFINE_ATOMIC_BOOL_CSWAP(16, w); UCS_DEFINE_ATOMIC_BOOL_CSWAP(32, l); UCS_DEFINE_ATOMIC_BOOL_CSWAP(64, q); + +#define UCS_ATOMIC_WEAK UCS_BIT(0) +#define UCS_ATOMIC_FENCE_LOCK UCS_BIT(1) +#define UCS_ATOMIC_FENCE_UNLOCK UCS_BIT(2) + + +static UCS_F_ALWAYS_INLINE int ucs_atomic_memorder(unsigned flags) +{ + if (flags & UCS_ATOMIC_FENCE_LOCK) { + return __ATOMIC_ACQUIRE; + } + + if (flags & UCS_ATOMIC_FENCE_UNLOCK) { + return __ATOMIC_RELEASE; + } + + return __ATOMIC_RELAXED; +} + + +static UCS_F_ALWAYS_INLINE int ucs_atomic_get(int *ptr, unsigned flags) +{ + return __atomic_load_n(ptr, ucs_atomic_memorder(flags)); +} + + +static UCS_F_ALWAYS_INLINE int +ucs_atomic_fadd(int *ptr, int val, unsigned flags) +{ + return __atomic_fetch_add(ptr, val, ucs_atomic_memorder(flags)); +} + + +static UCS_F_ALWAYS_INLINE void +ucs_atomic_sub(int *ptr, int val, unsigned flags) +{ + __atomic_fetch_sub(ptr, val, ucs_atomic_memorder(flags)); +} + + +static UCS_F_ALWAYS_INLINE void ucs_atomic_or(int *ptr, int val, unsigned flags) +{ + __atomic_fetch_or(ptr, val, ucs_atomic_memorder(flags)); +} + + +static UCS_F_ALWAYS_INLINE int +ucs_atomic_cswap(int *ptr, int old_val, int new_val, unsigned flags) +{ + return __atomic_compare_exchange_n(ptr, &old_val, new_val, + flags & UCS_ATOMIC_WEAK, + ucs_atomic_memorder(flags), + __ATOMIC_RELAXED); +} + #endif diff --git a/src/ucs/arch/cpu.h b/src/ucs/arch/cpu.h index 2adf4462d95..f75fc32f5be 100644 --- a/src/ucs/arch/cpu.h +++ b/src/ucs/arch/cpu.h @@ -177,13 +177,6 @@ static inline int ucs_cpu_prefer_relaxed_order() const char *ucs_cpu_vendor_name(); const char *ucs_cpu_model_name(); -#ifndef UCS_HAS_CPU_RELAX -static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() -{ - sched_yield(); -} -#endif - END_C_DECLS #endif diff --git a/src/ucs/arch/ppc64/cpu.h b/src/ucs/arch/ppc64/cpu.h index c973004c608..73220b7b5a7 100644 --- a/src/ucs/arch/ppc64/cpu.h +++ b/src/ucs/arch/ppc64/cpu.h @@ -99,6 +99,13 @@ static inline ucs_status_t ucs_arch_get_cache_size(size_t *cache_sizes) return UCS_ERR_UNSUPPORTED; } +static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() +{ + asm volatile ("or 1, 1, 1 \n"); /* hw threading low priority */ + asm volatile ("or 2, 2, 2 \n"); /* hw threading normal priority */ + asm volatile ("" ::: "memory"); +} + END_C_DECLS #endif diff --git a/src/ucs/arch/rv64/cpu.h b/src/ucs/arch/rv64/cpu.h index 1a83fc2dc93..8c03c386897 100644 --- a/src/ucs/arch/rv64/cpu.h +++ b/src/ucs/arch/rv64/cpu.h @@ -111,6 +111,11 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len) memcpy(dst, src, len); } +static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() +{ + asm volatile ("" ::: "memory"); +} + END_C_DECLS #endif diff --git a/src/ucs/arch/x86_64/cpu.h b/src/ucs/arch/x86_64/cpu.h index cfc00ca77d2..3e9b9f4d77b 100644 --- a/src/ucs/arch/x86_64/cpu.h +++ b/src/ucs/arch/x86_64/cpu.h @@ -135,13 +135,12 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len) ucs_x86_memcpy_sse_movntdqa(dst, src, len); } -#ifdef __SSE2__ static UCS_F_ALWAYS_INLINE void ucs_cpu_relax() { +#ifdef __SSE2__ _mm_pause(); -} -#define UCS_HAS_CPU_RELAX #endif +} END_C_DECLS diff --git a/src/ucs/type/rwlock.h b/src/ucs/type/rwlock.h index 9060e1786e6..5543b0d4412 100644 --- a/src/ucs/type/rwlock.h +++ b/src/ucs/type/rwlock.h @@ -7,11 +7,14 @@ #ifndef UCS_RWLOCK_H #define UCS_RWLOCK_H +#include #include +#include +#include #include /** - * The ucs_rwlock_t type. + * The ucs_rw_spinlock_t type. * * Readers increment the counter by UCS_RWLOCK_READ (4) * Writers set the UCS_RWLOCK_WRITE bit when lock is held @@ -27,94 +30,108 @@ * WAIT: writer pending --/ */ -#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */ -#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */ +#define UCS_RWLOCK_WAIT UCS_BIT(0) /* Writer is waiting */ +#define UCS_RWLOCK_WRITE UCS_BIT(1) /* Writer has the lock */ #define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE) -#define UCS_RWLOCK_READ 0x4 /* Reader increment */ +#define UCS_RWLOCK_READ UCS_BIT(2) /* Reader increment */ #define UCS_RWLOCK_STATIC_INITIALIZER {0} /** - * Read-write lock. + * Reader-writer spin lock. */ typedef struct { - volatile int l; -} ucs_rwlock_t; + int state; +} ucs_rw_spinlock_t; -static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE void +ucs_rw_spinlock_read_lock(ucs_rw_spinlock_t *lock) { int x; - while (1) { - while (lock->l & UCS_RWLOCK_MASK) { + for (;;) { + while (ucs_atomic_get(&lock->state, 0) & UCS_RWLOCK_MASK) { ucs_cpu_relax(); } - x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE); + x = ucs_atomic_fadd(&lock->state, UCS_RWLOCK_READ, + UCS_ATOMIC_FENCE_LOCK); if (!(x & UCS_RWLOCK_MASK)) { return; } - __atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED); + ucs_atomic_sub(&lock->state, UCS_RWLOCK_READ, 0); } } -static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE void +ucs_rw_spinlock_read_unlock(ucs_rw_spinlock_t *lock) { - __atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED); + ucs_assert(lock->state >= UCS_RWLOCK_READ); + ucs_atomic_sub(&lock->state, UCS_RWLOCK_READ, UCS_ATOMIC_FENCE_UNLOCK); } -static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE void +ucs_rw_spinlock_write_lock(ucs_rw_spinlock_t *lock) { int x; - while (1) { - x = lock->l; + for (;;) { + x = ucs_atomic_get(&lock->state, 0); if ((x < UCS_RWLOCK_WRITE) && - (__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0, - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { + ucs_atomic_cswap(&lock->state, x, UCS_RWLOCK_WRITE, + UCS_ATOMIC_FENCE_LOCK)) { return; } if (!(x & UCS_RWLOCK_WAIT)) { - __atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED); + ucs_atomic_or(&lock->state, UCS_RWLOCK_WAIT, 0); } - while (lock->l > UCS_RWLOCK_WAIT) { + while (ucs_atomic_get(&lock->state, 0) > UCS_RWLOCK_WAIT) { ucs_cpu_relax(); } } } -static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE int +ucs_rw_spinlock_write_trylock(ucs_rw_spinlock_t *lock) { int x; - x = lock->l; + x = ucs_atomic_get(&lock->state, 0); if ((x < UCS_RWLOCK_WRITE) && - (__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1, - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { - return 0; + ucs_atomic_cswap(&lock->state, x, x + UCS_RWLOCK_WRITE, + UCS_ATOMIC_FENCE_LOCK | UCS_ATOMIC_WEAK)) { + return 1; } - return -EBUSY; + return 0; } -static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE void +ucs_rw_spinlock_write_unlock(ucs_rw_spinlock_t *lock) { - __atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED); + ucs_assert(lock->state >= UCS_RWLOCK_WRITE); + ucs_atomic_sub(&lock->state, UCS_RWLOCK_WRITE, UCS_ATOMIC_FENCE_UNLOCK); } -static inline void ucs_rwlock_init(ucs_rwlock_t *lock) +static UCS_F_ALWAYS_INLINE void ucs_rw_spinlock_init(ucs_rw_spinlock_t *lock) { - lock->l = 0; + lock->state = 0; +} + + +static UCS_F_ALWAYS_INLINE void ucs_rw_spinlock_cleanup(ucs_rw_spinlock_t *lock) +{ + ucs_assert(lock->state == 0); } #endif diff --git a/test/gtest/ucs/test_type.cc b/test/gtest/ucs/test_type.cc index 0bf3fcd9710..389e20874b4 100644 --- a/test/gtest/ucs/test_type.cc +++ b/test/gtest/ucs/test_type.cc @@ -18,6 +18,7 @@ extern "C" { #include #include #include +#include class test_type : public ucs::test { }; @@ -144,89 +145,91 @@ UCS_TEST_F(test_type, pack_float) { class test_rwlock : public ucs::test { protected: + using run_func_t = const std::function&; + void sleep() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - void measure_one(int num, int writers, const std::function &r, - const std::function &w, const std::string &name) + void measure_one(int iter_count, int thread_count, int writers, + run_func_t reader, run_func_t writer, + const std::string &name) { - std::vector tt; + std::vector threads; - tt.reserve(num); + threads.reserve(thread_count); auto start = std::chrono::high_resolution_clock::now(); - for (int c = 0; c < num; c++) { - tt.emplace_back([&]() { + for (int c = 0; c < thread_count; c++) { + threads.emplace_back([&]() { unsigned seed = time(0); - for (int i = 0; i < 1000000 / num; i++) { - if ((rand_r(&seed) % 256) < writers) { - w(); + for (int i = 0; i < iter_count / thread_count; i++) { + if ((rand_r(&seed) % 100) < writers) { + writer(); } else { - r(); + reader(); } } }); } - for (auto &t : tt) { + for (auto &t : threads) { t.join(); } auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration elapsed = end - start; UCS_TEST_MESSAGE << elapsed.count() * 1000 << " ms " << name << " " - << std::to_string(num) << " threads " - << std::to_string(writers) << " writers per 256 "; + << std::to_string(thread_count) << " threads " + << std::to_string(writers) << "% of writers"; } - void measure(const std::function &r, - const std::function &w, const std::string &name) + void measure(int iter_count, std::vector &threads, run_func_t reader, + run_func_t writer, const std::string &name) { - int m = std::thread::hardware_concurrency(); - std::vector threads = {1, 2, 4, m}; - std::vector writers_per_256 = {1, 25, 128, 250}; + std::vector writers_percent = {0, 1, 10, 50, 98}; - for (auto t : threads) { - for (auto writers : writers_per_256) { - measure_one(t, writers, r, w, name); + for (auto thread_count : threads) { + for (auto writers : writers_percent) { + measure_one(iter_count, thread_count, writers, reader, writer, + name); } } } }; UCS_TEST_F(test_rwlock, lock) { - ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + ucs_rw_spinlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; - ucs_rwlock_read_lock(&lock); - EXPECT_EQ(-EBUSY, ucs_rwlock_write_trylock(&lock)); + ucs_rw_spinlock_read_lock(&lock); + EXPECT_FALSE(ucs_rw_spinlock_write_trylock(&lock)); - ucs_rwlock_read_lock(&lock); /* second read lock should pass */ + ucs_rw_spinlock_read_lock(&lock); /* second read lock should pass */ - int write_taken = 0; + bool write_taken = 0; std::thread w([&]() { - ucs_rwlock_write_lock(&lock); + ucs_rw_spinlock_write_lock(&lock); write_taken = 1; - ucs_rwlock_write_unlock(&lock); + ucs_rw_spinlock_write_unlock(&lock); }); sleep(); EXPECT_FALSE(write_taken); /* write lock should wait for read lock release */ - ucs_rwlock_read_unlock(&lock); + ucs_rw_spinlock_read_unlock(&lock); sleep(); EXPECT_FALSE(write_taken); /* first read lock still holding lock */ - int read_taken = 0; + bool read_taken = false; std::thread r1([&]() { - ucs_rwlock_read_lock(&lock); - read_taken = 1; - ucs_rwlock_read_unlock(&lock); + ucs_rw_spinlock_read_lock(&lock); + read_taken = true; + ucs_rw_spinlock_read_unlock(&lock); }); sleep(); EXPECT_FALSE(read_taken); /* read lock should wait while write lock is waiting */ - ucs_rwlock_read_unlock(&lock); + ucs_rw_spinlock_read_unlock(&lock); sleep(); EXPECT_TRUE(write_taken); /* write lock should be taken */ w.join(); @@ -235,40 +238,47 @@ UCS_TEST_F(test_rwlock, lock) { EXPECT_TRUE(read_taken); /* read lock should be taken */ r1.join(); - EXPECT_EQ(0, ucs_rwlock_write_trylock(&lock)); - read_taken = 0; + EXPECT_TRUE(ucs_rw_spinlock_write_trylock(&lock)); + read_taken = false; std::thread r2([&]() { - ucs_rwlock_read_lock(&lock); - read_taken = 1; - ucs_rwlock_read_unlock(&lock); + ucs_rw_spinlock_read_lock(&lock); + read_taken = true; + ucs_rw_spinlock_read_unlock(&lock); }); sleep(); EXPECT_FALSE(read_taken); /* read lock should wait for write lock release */ - ucs_rwlock_write_unlock(&lock); + ucs_rw_spinlock_write_unlock(&lock); sleep(); EXPECT_TRUE(read_taken); /* read lock should be taken */ r2.join(); + + ucs_rw_spinlock_cleanup(&lock); } UCS_TEST_F(test_rwlock, perf) { - ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; - measure( + int m = std::thread::hardware_concurrency(); + std::vector threads = {1, 2, 4, m}; + ucs_rw_spinlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + measure(1000000, threads, [&]() { - ucs_rwlock_read_lock(&lock); - ucs_rwlock_read_unlock(&lock); + ucs_rw_spinlock_read_lock(&lock); + ucs_rw_spinlock_read_unlock(&lock); }, [&]() { - ucs_rwlock_write_lock(&lock); - ucs_rwlock_write_unlock(&lock); + ucs_rw_spinlock_write_lock(&lock); + ucs_rw_spinlock_write_unlock(&lock); }, - "builtin"); + "rw_spinlock"); + ucs_rw_spinlock_cleanup(&lock); } UCS_TEST_F(test_rwlock, pthread) { + int m = std::thread::hardware_concurrency(); + std::vector threads = {1, 2, 4, m}; pthread_rwlock_t plock; pthread_rwlock_init(&plock, NULL); - measure( + measure(1000000, threads, [&]() { pthread_rwlock_rdlock(&plock); pthread_rwlock_unlock(&plock); @@ -281,6 +291,62 @@ UCS_TEST_F(test_rwlock, pthread) { pthread_rwlock_destroy(&plock); } +UCS_TEST_F(test_rwlock, shared_state) { + int m = std::thread::hardware_concurrency(); + std::vector threads = {16, m, m * 4}; + ucs_rw_spinlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + int x1 = 1; + int x2 = 2; + measure(1000000, threads, + [&]() { + ucs_rw_spinlock_read_lock(&lock); + EXPECT_EQ(x1 * 2, x2); + ucs_rw_spinlock_read_unlock(&lock); + }, + [&]() { + ucs_rw_spinlock_write_lock(&lock); + x1 += 1; + x2 += 2; + ucs_rw_spinlock_write_unlock(&lock); + }, + "shared_state"); + ucs_rw_spinlock_cleanup(&lock); +} + +typedef struct { + int64_t counter[128]; +} data_t; + +UCS_TEST_F(test_rwlock, memory_barriers) { + int m = std::thread::hardware_concurrency(); + std::vector threads = {16, m, m * 4}; + ucs_rw_spinlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER; + data_t data1 = {0}; + data_t data2 = {0}; + measure(10000000, threads, + [&]() { + ucs_rw_spinlock_read_lock(&lock); + data_t d1 = data1; + data_t d2 = data2; + ucs_rw_spinlock_read_unlock(&lock); + + int64_t x1 = d1.counter[ucs::rand() % 128]; + int64_t x2 = d2.counter[ucs::rand() % 128]; + EXPECT_LE(x1, x2); + }, + [&]() { + ucs_rw_spinlock_write_lock(&lock); + for (int i = 0; i < 128; ++i) { + data1.counter[i]++; + } + std::atomic_thread_fence(std::memory_order_seq_cst); + data2 = data1; + ucs_rw_spinlock_write_unlock(&lock); + }, + "membariers"); + ucs_rw_spinlock_cleanup(&lock); +} + class test_init_once: public test_type { protected: test_init_once() : m_once(INIT_ONCE_INIT), m_count(0) {};