From 6eedf8e65ee4050a62ab3daa004fef2948171127 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 3 Nov 2024 18:00:09 +0200 Subject: [PATCH] fix: provide a more precise detection of empty state in MPSCIntrusiveQueue The scenario is around remote fiber activation. And it goes like this: 1. A fiber is blocked on EventCount.wait_until (i.e. timed wait) 2. The fiber times outs but in parallel another thread sends it a notification via the remote queue. 3. EventCount::wait_until tries to pull itself from the remote queue, but does not find itself there, even though the notification was pushed STRICTLY BEFORE `active->PullMyselfFromRemoteReadyQueue()` call. We know it because the notify call is under the EventCount spinlock, and PullMyselfFromRemoteReadyQueue is done after the spinlock was grabbed and released. The scenario happens when, the remote queue is empty but then something else tries to notify another, unrelated fiber in the thread, but does not finish crossing the blocking point (*). Then our fiber had notification being pushed into the queue. Then our fiber is waked by the timeout and it tries to pull itself from the queue just to learn the queue is empty, which contradicts the fact we observed BEFORE relationship between notify and PullMyselfFromRemoteReadyQueue call due to the spinlock in EventCount. This fix recognizes the stuck in the middle push by loading the tail and comparing it with head. Signed-off-by: Roman Gershman --- base/mpsc_intrusive_queue.h | 46 +++++++++++++++++++++++++--- base/mpsc_intrusive_queue_test.cc | 2 +- util/fibers/detail/fiber_interface.h | 2 +- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/base/mpsc_intrusive_queue.h b/base/mpsc_intrusive_queue.h index e12dad5d..e289b586 100644 --- a/base/mpsc_intrusive_queue.h +++ b/base/mpsc_intrusive_queue.h @@ -52,12 +52,18 @@ template class MPSCIntrusiveQueue { void Push(T* item) noexcept { // item becomes a new tail. MPSC_intrusive_store_next(item, nullptr); + + // we need it to write with at least `release` MO, so that this write won't be reordered + // before resetting item.next above. Otherwise, another producer could push its own item + // after this CAS, and its item.next will be overriden. T* prev = tail_.exchange(item, std::memory_order_acq_rel); // link the previous tail to the new tail. // (*) Also a potential blocking point! // For more details see the linked article above! - MPSC_intrusive_store_next(prev, item); + // Until (*) completes, the chain is cut at `prev` and Pop can not reach the item + // and its subsequent items. + MPSC_intrusive_store_next(prev, item); // release (*) } // Pops the first item at the head or returns nullptr if the queue is empty. @@ -91,9 +97,35 @@ template std::pair MPSCIntrusiveQueue::PopWeak() noexc T* next = MPSC_intrusive_load_next(*head); // load(std::memory_order_acquire) if (stub() == head) { if (nullptr == next) { - // empty - return {nullptr, true}; + // Empty state. + // Caveat: if Push() called on an empty queue has not crossed the blocking point yet, + // we may reach this condition because head_ is a stub and stub.next is nullptr. + // Unfortunately it may lead to a bizarre scenario where the arbitrary number of + // subsequent pushes will fully complete, but the queue will still be observerd + // as empty by the consumer because the chain will be cut by the Push that is stuck updating + // the stub. + // + // More comments: if we had a single Push that is not completed yet, then returning + // an empty state is fine. The problem arises when we have multiple pushes in parallel + // and the first one has not completed yet, but others did. + // Moreover, one of the pushes could even be linearized with this PopWeak call + // using locks/barriers, and still would not be be observed. + // + // To disambiguite, we load the tail_ and check if it is the same as the head. + // MO is not important because for operations that are externally linearized, + // the order is already guaranteed. + // To sum up: + // 1. if tail is not head, it is quaranteed that the queue is not empty. + // 2. If Push(Item) has finished, and was linearized with PopWeak somehow, + // is guaranteed that we will observe the updated tail_. + // 3. Otherwise, it's just an optimization that may or may not return the updated tail. + // Remember that standard guarantees that stores will eventually be visible with any MO, + // therefore we may see here stale (head), or the updated tail even if Push has finished. + T* tail = tail_.load(std::memory_order_relaxed); + return {nullptr, tail == head}; } + + // skip the stub if needed and continue. head_ = next; head = next; next = MPSC_intrusive_load_next(*next); @@ -105,16 +137,20 @@ template std::pair MPSCIntrusiveQueue::PopWeak() noexc return {head, false}; } - T* tail = tail_.load(std::memory_order_acquire); + T* tail = tail_.load(std::memory_order_relaxed); if (tail != head) { // non-empty, we are in the middle of push - see a blocking point above. return {nullptr, false}; } // tail and head are the same, pointing to the last element in the queue. - // Link stub to the tail to introduce an empty state. + // Link stub to the tail to introduce an empty state. Before the tail_.load above: + // head -> item, tail -> item Push(stub()); + // Unless we had concurrent pushes, now: + // head->item, item.next=stub + // tail->stub, stub.next = nullptr next = MPSC_intrusive_load_next(*head); if (nullptr != next) { head_ = next; diff --git a/base/mpsc_intrusive_queue_test.cc b/base/mpsc_intrusive_queue_test.cc index 30aa1518..f2526adb 100644 --- a/base/mpsc_intrusive_queue_test.cc +++ b/base/mpsc_intrusive_queue_test.cc @@ -17,7 +17,7 @@ struct TestNode { }; void MPSC_intrusive_store_next(TestNode* dest, TestNode* next_node) { - dest->next.store(next_node, std::memory_order_relaxed); + dest->next.store(next_node, std::memory_order_release); } TestNode* MPSC_intrusive_load_next(const TestNode& src) { diff --git a/util/fibers/detail/fiber_interface.h b/util/fibers/detail/fiber_interface.h index 4ba4c257..da31ed28 100644 --- a/util/fibers/detail/fiber_interface.h +++ b/util/fibers/detail/fiber_interface.h @@ -175,7 +175,7 @@ class FiberInterface { #endif friend void MPSC_intrusive_store_next(FiberInterface* dest, FiberInterface* next_node) { - dest->remote_next_.store(next_node, std::memory_order_relaxed); + dest->remote_next_.store(next_node, std::memory_order_release); } friend FiberInterface* MPSC_intrusive_load_next(const FiberInterface& src) {