Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cacheline-alignment to compiler/threading and switch from volatile to std::atomic #1005

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/cacheline.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

#include <stdint.h>

#if defined(__ARM_ARCH_7A__) || defined(__aarch64__)
#define KDB_CACHELINE_SIZE 128
#else
#define KDB_CACHELINE_SIZE 64
#endif // __ARM_ARCH_7A__ || __aarch64__
#define KDB_CACHELINE_ALIGNED __attribute__((aligned(KDB_CACHELINE_SIZE)))

#endif // KDB_COMMON_CACHELINE_H
2 changes: 1 addition & 1 deletion compiler/scheduler/scheduler-base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <cassert>

volatile int tasks_before_sync_node;
std::atomic<int> tasks_before_sync_node;

static SchedulerBase *scheduler;

Expand Down
4 changes: 3 additions & 1 deletion compiler/scheduler/scheduler-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <atomic>

class Node;

class Task;
Expand All @@ -22,7 +24,7 @@ SchedulerBase *get_scheduler();
void set_scheduler(SchedulerBase *new_scheduler);
void unset_scheduler(SchedulerBase *old_scheduler);

extern volatile int tasks_before_sync_node;
extern std::atomic<int> tasks_before_sync_node;

inline void register_async_task(Task *task) {
get_scheduler()->add_task(task);
Expand Down
4 changes: 2 additions & 2 deletions compiler/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void Scheduler::execute() {
}

while (true) {
if (tasks_before_sync_node > 0) {
if (tasks_before_sync_node.load(std::memory_order_acquire) > 0) {
usleep(250);
continue;
}
Expand Down Expand Up @@ -101,7 +101,7 @@ bool Scheduler::thread_process_node(Node *node) {
}
task->execute();
delete task;
__sync_fetch_and_sub(&tasks_before_sync_node, 1);
tasks_before_sync_node.fetch_sub(1, std::memory_order_release);
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions compiler/stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Copyright (c) 2020 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include <atomic>

#include "compiler/stage.h"

#include "common/termformat/termformat.h"
Expand Down Expand Up @@ -31,7 +33,7 @@ const char *get_assert_level_desc(AssertLevelT assert_level) {
}
}

volatile int ce_locker;
std::atomic<int> ce_locker;

namespace {
FILE *warning_file{nullptr};
Expand All @@ -44,7 +46,7 @@ void stage::set_warning_file(FILE *file) noexcept {
void on_compilation_error(const char *description __attribute__((unused)), const char *file_name, int line_number,
const char *full_description, AssertLevelT assert_level) {

AutoLocker<volatile int *> locker(&ce_locker);
AutoLocker<std::atomic<int> *> locker(&ce_locker);
FILE *file = stdout;
if (assert_level == WRN_ASSERT_LEVEL && warning_file) {
file = warning_file;
Expand Down
3 changes: 1 addition & 2 deletions compiler/threading/data-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DataStream {

void operator<<(DataType input) {
if (!is_sink_mode_) {
__sync_fetch_and_add(&tasks_before_sync_node, 1);
tasks_before_sync_node.fetch_add(1, std::memory_order_release);
}
std::lock_guard<std::mutex> lock{mutex_};
queue_.push_front(std::move(input));
Expand All @@ -60,7 +60,6 @@ class DataStream {
const bool is_sink_mode_;
};


struct EmptyStream {
template<size_t stream_id>
using NthDataType = EmptyStream;
Expand Down
20 changes: 11 additions & 9 deletions compiler/threading/hash-table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ template<class T, int N = 1000000>
class TSHashTable {
public:
struct HTNode : Lockable {
unsigned long long hash;
std::atomic<unsigned long long> hash;
T data;

HTNode() :
Expand All @@ -24,7 +24,8 @@ class TSHashTable {

private:
HTNode *nodes;
int used_size;
std::atomic<int> used_size;

public:
TSHashTable() :
nodes(new HTNode[N]),
Expand All @@ -34,14 +35,15 @@ class TSHashTable {
HTNode *at(unsigned long long hash) {
int i = (unsigned)hash % (unsigned)N;
while (true) {
while (nodes[i].hash != 0 && nodes[i].hash != hash) {
while (nodes[i].hash.load(std::memory_order_acquire) != 0 && nodes[i].hash.load(std::memory_order_relaxed) != hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why second read is relaxed?

Copy link
Author

@blonded04 blonded04 May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it won't be reordered between nodes[i].hash.load(std::memory_order_acquire) before and nodes[i].hash.load(std::memory_order_acquire) after, the only barrier needed here is load-load, which is imposed by reading node hash

i++;
if (i == N) {
i = 0;
}
}
if (nodes[i].hash == 0 && !__sync_bool_compare_and_swap(&nodes[i].hash, 0, hash)) {
int id = __sync_fetch_and_add(&used_size, 1);
unsigned long long expected = 0;
if (nodes[i].hash.load(std::memory_order_acquire) == 0 && !nodes[i].hash.compare_exchange_strong(expected, hash, std::memory_order_acq_rel)) {
int id = used_size.fetch_add(1, std::memory_order_relaxed);
assert(id * 2 < N);
continue;
}
Expand All @@ -52,20 +54,20 @@ class TSHashTable {

const T *find(unsigned long long hash) {
int i = (unsigned)hash % (unsigned)N;
while (nodes[i].hash != 0 && nodes[i].hash != hash) {
while (nodes[i].hash.load(std::memory_order_acquire) != 0 && nodes[i].hash.load(std::memory_order_relaxed) != hash) {
i++;
if (i == N) {
i = 0;
}
}

return nodes[i].hash == hash ? &nodes[i].data : nullptr;
return nodes[i].hash.load(std::memory_order_acquire) == hash ? &nodes[i].data : nullptr;
}

std::vector<T> get_all() {
std::vector<T> res;
for (int i = 0; i < N; i++) {
if (nodes[i].hash != 0) {
if (nodes[i].hash.load(std::memory_order_acquire) != 0) {
res.push_back(nodes[i].data);
}
}
Expand All @@ -76,7 +78,7 @@ class TSHashTable {
std::vector<T> get_all_if(const CondF &callbackF) {
std::vector<T> res;
for (int i = 0; i < N; i++) {
if (nodes[i].hash != 0 && callbackF(nodes[i].data)) {
if (nodes[i].hash.load(std::memory_order_acquire) != 0 && callbackF(nodes[i].data)) {
res.push_back(nodes[i].data);
}
}
Expand Down
36 changes: 28 additions & 8 deletions compiler/threading/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

#pragma once

#include <atomic>
#include <cassert>
#include <unistd.h>

#include "common/cacheline.h"

enum { LOCKED = 1, UNLOCKED = 0 };

template<class T>
bool try_lock(T);

Expand All @@ -20,28 +25,43 @@ void unlock(T locker) {
locker->unlock();
}

inline bool try_lock(volatile int *locker) {
return __sync_lock_test_and_set(locker, 1) == 0;
inline bool try_lock(std::atomic<int> *locker) {
int expected = UNLOCKED;
return locker->compare_exchange_weak(expected, LOCKED, std::memory_order_acq_rel);
}

inline void lock(volatile int *locker) {
inline void lock(std::atomic<int> *locker) {
while (!try_lock(locker)) {
usleep(250);
}
}

inline void unlock(volatile int *locker) {
assert(*locker == 1);
__sync_lock_release(locker);
inline void unlock(std::atomic<int> *locker) {
assert(locker->load(std::memory_order_relaxed) == LOCKED);
locker->store(UNLOCKED, std::memory_order_release);
}

class Lockable {
class KDB_CACHELINE_ALIGNED Lockable {
private:
volatile int x;
std::atomic<int> x;

public:
Lockable() :
x(0) {}

Lockable(const Lockable &other) noexcept :
x{other.x.load(std::memory_order_relaxed)} {}
Lockable(Lockable &&other) noexcept :
x{other.x.load(std::memory_order_relaxed)} {}
Lockable &operator=(const Lockable &other) noexcept {
x = other.x.load(std::memory_order_relaxed);
return *this;
}
Lockable &operator=(Lockable &&other) noexcept {
x = other.x.load(std::memory_order_relaxed);
return *this;
}

virtual ~Lockable() = default;

void lock() {
Expand Down
2 changes: 1 addition & 1 deletion compiler/threading/thread-id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include "compiler/threading/thread-id.h"

static __thread int bicycle_thread_id;
static thread_local int bicycle_thread_id;

int get_thread_id() {
return bicycle_thread_id;
Expand Down
20 changes: 3 additions & 17 deletions compiler/threading/tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <cassert>
#include <thread>

#include "common/cacheline.h"

#include "compiler/threading/locks.h"
#include "compiler/threading/thread-id.h"

Expand All @@ -23,10 +25,8 @@ inline uint32_t get_default_threads_count() noexcept {
template<class T>
struct TLS {
private:
struct TLSRaw {
struct KDB_CACHELINE_ALIGNED TLSRaw {
T data{};
volatile int locker = 0;
char dummy[4096];
};

// The thread with thread_id = 0 is the main thread in which the scheduler's master code is executed.
Expand All @@ -49,7 +49,6 @@ struct TLS {
arr() {
}


T &get() {
return get_raw()->data;
}
Expand All @@ -69,19 +68,6 @@ struct TLS {
int size() {
return MAX_THREADS_COUNT + 1;
}

T *lock_get() {
TLSRaw *raw = get_raw();
bool ok = try_lock(&raw->locker);
assert(ok);
return &raw->data;
}

void unlock_get(T *ptr) {
TLSRaw *raw = get_raw();
assert(&raw->data == ptr);
unlock(&raw->locker);
}
};

#pragma GCC diagnostic pop
4 changes: 2 additions & 2 deletions runtime/critical_section.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ void check_stack_overflow() {

namespace dl {

volatile int in_critical_section = 0;
volatile long long pending_signals = 0;
volatile int in_critical_section;
volatile long long pending_signals;

void enter_critical_section() noexcept {
check_stack_overflow();
Expand Down