Skip to content

Commit

Permalink
fix: provide a more precise detection of empty state in MPSCIntrusive…
Browse files Browse the repository at this point in the history
…Queue

The scenario is around remote fiber activation. And it goes like this:
1. A fiber is blocked on EventCount.wait_until (i.e. timed wait)
2. The fiber times outs but in parallel another thread sends it a notification
   via the remote queue.
3. EventCount::wait_until tries to pull itself from the remote queue, but does not find
   itself there, even though the notification was pushed STRICTLY BEFORE
   `active->PullMyselfFromRemoteReadyQueue()` call. We know it because the notify
   call is under the EventCount spinlock, and PullMyselfFromRemoteReadyQueue is done
   after the spinlock was grabbed and released.

The scenario happens when, the remote queue is empty but then something else
tries to notify another, unrelated fiber in the thread, but does not finish
crossing the blocking point (*). Then our fiber had notification being pushed into the queue.
Then our fiber is waked by the timeout and it tries to pull itself from the queue just to learn
the queue is empty, which contradicts the fact we observed BEFORE relationship between notify and
PullMyselfFromRemoteReadyQueue call due to the spinlock in EventCount.

This fix recognizes the stuck in the middle push by loading the tail and comparing it with head.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Nov 3, 2024
1 parent 2e9cb6f commit 6eedf8e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
46 changes: 41 additions & 5 deletions base/mpsc_intrusive_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ template <typename T> class MPSCIntrusiveQueue {
void Push(T* item) noexcept {
// item becomes a new tail.
MPSC_intrusive_store_next(item, nullptr);

// we need it to write with at least `release` MO, so that this write won't be reordered
// before resetting item.next above. Otherwise, another producer could push its own item
// after this CAS, and its item.next will be overriden.
T* prev = tail_.exchange(item, std::memory_order_acq_rel);

// link the previous tail to the new tail.
// (*) Also a potential blocking point!
// For more details see the linked article above!
MPSC_intrusive_store_next(prev, item);
// Until (*) completes, the chain is cut at `prev` and Pop can not reach the item
// and its subsequent items.
MPSC_intrusive_store_next(prev, item); // release (*)
}

// Pops the first item at the head or returns nullptr if the queue is empty.
Expand Down Expand Up @@ -91,9 +97,35 @@ template <typename T> std::pair<T*, bool> MPSCIntrusiveQueue<T>::PopWeak() noexc
T* next = MPSC_intrusive_load_next(*head); // load(std::memory_order_acquire)
if (stub() == head) {
if (nullptr == next) {
// empty
return {nullptr, true};
// Empty state.
// Caveat: if Push() called on an empty queue has not crossed the blocking point yet,
// we may reach this condition because head_ is a stub and stub.next is nullptr.
// Unfortunately it may lead to a bizarre scenario where the arbitrary number of
// subsequent pushes will fully complete, but the queue will still be observerd
// as empty by the consumer because the chain will be cut by the Push that is stuck updating
// the stub.
//
// More comments: if we had a single Push that is not completed yet, then returning
// an empty state is fine. The problem arises when we have multiple pushes in parallel
// and the first one has not completed yet, but others did.
// Moreover, one of the pushes could even be linearized with this PopWeak call
// using locks/barriers, and still would not be be observed.
//
// To disambiguite, we load the tail_ and check if it is the same as the head.
// MO is not important because for operations that are externally linearized,
// the order is already guaranteed.
// To sum up:
// 1. if tail is not head, it is quaranteed that the queue is not empty.
// 2. If Push(Item) has finished, and was linearized with PopWeak somehow,
// is guaranteed that we will observe the updated tail_.
// 3. Otherwise, it's just an optimization that may or may not return the updated tail.
// Remember that standard guarantees that stores will eventually be visible with any MO,
// therefore we may see here stale (head), or the updated tail even if Push has finished.
T* tail = tail_.load(std::memory_order_relaxed);
return {nullptr, tail == head};
}

// skip the stub if needed and continue.
head_ = next;
head = next;
next = MPSC_intrusive_load_next(*next);
Expand All @@ -105,16 +137,20 @@ template <typename T> std::pair<T*, bool> MPSCIntrusiveQueue<T>::PopWeak() noexc
return {head, false};
}

T* tail = tail_.load(std::memory_order_acquire);
T* tail = tail_.load(std::memory_order_relaxed);
if (tail != head) {
// non-empty, we are in the middle of push - see a blocking point above.
return {nullptr, false};
}

// tail and head are the same, pointing to the last element in the queue.
// Link stub to the tail to introduce an empty state.
// Link stub to the tail to introduce an empty state. Before the tail_.load above:
// head -> item, tail -> item
Push(stub());

// Unless we had concurrent pushes, now:
// head->item, item.next=stub
// tail->stub, stub.next = nullptr
next = MPSC_intrusive_load_next(*head);
if (nullptr != next) {
head_ = next;
Expand Down
2 changes: 1 addition & 1 deletion base/mpsc_intrusive_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct TestNode {
};

void MPSC_intrusive_store_next(TestNode* dest, TestNode* next_node) {
dest->next.store(next_node, std::memory_order_relaxed);
dest->next.store(next_node, std::memory_order_release);
}

TestNode* MPSC_intrusive_load_next(const TestNode& src) {
Expand Down
2 changes: 1 addition & 1 deletion util/fibers/detail/fiber_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class FiberInterface {
#endif
friend void
MPSC_intrusive_store_next(FiberInterface* dest, FiberInterface* next_node) {
dest->remote_next_.store(next_node, std::memory_order_relaxed);
dest->remote_next_.store(next_node, std::memory_order_release);
}

friend FiberInterface* MPSC_intrusive_load_next(const FiberInterface& src) {
Expand Down

0 comments on commit 6eedf8e

Please sign in to comment.