Skip to content

Commit

Permalink
fix: missed notification bug in atomics (#334)
Browse files Browse the repository at this point in the history
Also fix a warning in gcs code.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Oct 31, 2024
1 parent ec95de6 commit 3d011e3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
14 changes: 8 additions & 6 deletions util/cloud/gcp/gcs_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ string BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
return read_obj_url;
}

#if 0
inline void SetRange(size_t from, size_t to, h2::fields* flds) {
string tmp = absl::StrCat("bytes=", from, "-");
if (to < kuint64max) {
absl::StrAppend(&tmp, to - 1);
}
flds->set(h2::field::range, std::move(tmp));
}
#endif

// File handle that writes to GCS.
//
Expand Down Expand Up @@ -97,26 +99,26 @@ class GcsWriteFile : public io::WriteFile {
GcsWriteFileOptions opts_;
};

class GcsReadFile : public io::ReadonlyFile {
class GcsReadFile final : public io::ReadonlyFile {
public:
// does not own gcs object, only wraps it with ReadonlyFile interface.
GcsReadFile(string read_obj_url, const GcsReadFileOptions& opts)
: read_obj_url_(read_obj_url), opts_(opts) {
}

virtual ~GcsReadFile() final;
virtual ~GcsReadFile();

error_code Close() final {
error_code Close() {
return {};
}

io::SizeOrError Read(size_t offset, const iovec* v, uint32_t len) final;
io::SizeOrError Read(size_t offset, const iovec* v, uint32_t len);

size_t Size() const final {
size_t Size() const {
return size_;
}

int Handle() const final {
int Handle() const {
return -1;
};

Expand Down
8 changes: 6 additions & 2 deletions util/fibers/proactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,16 @@ class ProactorDispatcher : public DispatchPolicy {
//

inline void ProactorBase::WakeupIfNeeded() {
auto current = tq_seq_.fetch_add(2, std::memory_order_relaxed);
// memory_order_relaxed does not provide the "latest" value guarantees! If we do not use
// memory_order_acquire, it may happen that a store in the other thread is not yet visible here,
// and we will miss WAIT_SECTION_STATE.
// Similarly, we use release because we want to make sure that the notification is
// visible to the other thread before it decides to suspend.
auto current = tq_seq_.fetch_add(2, std::memory_order_acq_rel);
if (current == WAIT_SECTION_STATE) {
// We protect WakeRing using tq_seq_. That means only one thread at a time
// can enter here. Moreover tq_seq_ == WAIT_SECTION_STATE only when
// proactor enters WAIT section, therefore we do not race over SQE ring with proactor thread.
std::atomic_thread_fence(std::memory_order_acquire);
WakeRing();
} else {
tq_wakeup_skipped_ev_.fetch_add(1, std::memory_order_relaxed);
Expand Down
22 changes: 16 additions & 6 deletions util/fibers/uring_proactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -900,13 +900,23 @@ void UringProactor::MainLoop(detail::Scheduler* scheduler) {
* If tq_seq_ has changed since it was cached into tq_seq, then
* EmplaceTaskQueue succeeded and we might have more tasks to execute - lets
* run the loop again. Otherwise, set tq_seq_ to WAIT_SECTION_STATE, hinting that
* we are going to stall now. Other threads will need to wake-up the ring
* (see WakeRing()) but only one will actually call the syscall.
* we are going to stall now. Other threads will have to wake up the ring
* (see WakeRing()) but only one will actually call WakeRing.
* Important:
*
* 1. task_queue_.empty() check is an optimization to avoid unnecessary CAS.
*
* 2. We must use acquire because we check for notifications coming from WakeupIfNeeded
* and we can not miss them. In addition, we must userelease order to make sure
* the value of tq_seq_ is visible in WakeupIfNeeded, so it won't miss WAIT_SECTION_STATE.
* 3. In case compare failed, we do not care about the ordering since we load tq_seq_ again
* at the beginning of the loop.
*/
if (task_queue_.empty() &&
tq_seq_.compare_exchange_weak(tq_seq, WAIT_SECTION_STATE, std::memory_order_acquire)) {
tq_seq_.compare_exchange_weak(tq_seq, WAIT_SECTION_STATE, memory_order_acq_rel,
memory_order_relaxed)) {
if (is_stopped_) {
tq_seq_.store(0, std::memory_order_release); // clear WAIT section
tq_seq_.store(0, memory_order_release); // clear WAIT section
break;
}

Expand All @@ -916,8 +926,8 @@ void UringProactor::MainLoop(detail::Scheduler* scheduler) {
uint64_t start_cycle = GetCPUCycleCount();
wait_for_cqe(&ring_, 1, ts_arg);
IdleEnd(start_cycle);
VPRO(2) << "Woke up after wait_for_cqe, tq_seq_: " << tq_seq_.load()
<< " tasks:" << stats_.num_task_runs;
VPRO(2) << "Woke up after wait_for_cqe, tq_seq_: " << tq_seq_.load(memory_order_relaxed)
<< " tasks:" << stats_.num_task_runs << " " << stats_.loop_cnt;

++stats_.num_stalls;
tq_seq = 0;
Expand Down

0 comments on commit 3d011e3

Please sign in to comment.