Skip to content

Commit

Permalink
fix(tree_index, #156): data race between clear and insert
Browse files Browse the repository at this point in the history
fix a data race issue between clear and insert when the node is being
split while being cleared at the same time.
  • Loading branch information
wvwwvwwv committed Sep 7, 2024
1 parent 92d8fb4 commit 124cb66
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "scc"
description = "High performance containers and utilities for concurrent and asynchronous programming"
documentation = "https://docs.rs/scc"
version = "2.1.16"
version = "2.1.17"
authors = ["wvwwvwwv <[email protected]>"]
edition = "2021"
rust-version = "1.65.0"
Expand Down
81 changes: 45 additions & 36 deletions src/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,7 @@ mod treeindex_test {
use proptest::prelude::*;
use proptest::strategy::ValueTree;
use proptest::test_runner::TestRunner;
use sdd::suspend;
use std::collections::BTreeSet;
use std::ops::RangeInclusive;
use std::panic::UnwindSafe;
Expand Down Expand Up @@ -1901,26 +1902,57 @@ mod treeindex_test {
}
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn insert_remove_async() {
#[test]
fn insert_remove_clear() {
static INST_CNT: AtomicUsize = AtomicUsize::new(0);
let tree: TreeIndex<usize, R> = TreeIndex::default();

let workload_size = 1024;
for k in 0..workload_size {
assert!(tree.insert_async(k, R::new(&INST_CNT)).await.is_ok());
let num_threads = 3;
let num_iter = if cfg!(miri) { 1 } else { 8 };
let workload_size = if cfg!(miri) { 32 } else { 1024 };
let tree: Arc<TreeIndex<usize, R>> = Arc::new(TreeIndex::default());
let mut thread_handles = Vec::with_capacity(num_threads);
let barrier = Arc::new(Barrier::new(num_threads));
for task_id in 0..num_threads {
let barrier_clone = barrier.clone();
let tree_clone = tree.clone();
thread_handles.push(thread::spawn(move || {
for _ in 0..num_iter {
barrier_clone.wait();
match task_id {
0 => {
for k in 0..workload_size {
assert!(tree_clone.insert(k, R::new(&INST_CNT)).is_ok());
}
}
1 => {
for k in 0..workload_size / 8 {
tree_clone.remove(&(k * 4));
}
}
_ => {
for _ in 0..workload_size / 64 {
if tree_clone.len() >= workload_size / 4 {
tree_clone.clear();
}
}
}
}
tree_clone.clear();
assert!(suspend());
}
drop(tree_clone);
}))
}
assert!(INST_CNT.load(Relaxed) >= workload_size);
assert_eq!(tree.len(), workload_size);
for k in 0..workload_size {
assert!(tree.remove_async(&k).await);

for handle in thread_handles {
handle.join().unwrap();
}
assert_eq!(tree.len(), 0);

drop(tree);

while INST_CNT.load(Relaxed) != 0 {
Guard::new().accelerate();
tokio::task::yield_now().await;
thread::yield_now();
}
}

Expand Down Expand Up @@ -1968,29 +2000,6 @@ mod treeindex_test {
}
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn clone_async() {
static INST_CNT: AtomicUsize = AtomicUsize::new(0);
let tree: TreeIndex<usize, R> = TreeIndex::default();

let workload_size = 1024;
for k in 0..workload_size {
assert!(tree.insert_async(k, R::new(&INST_CNT)).await.is_ok());
}
let tree_clone = tree.clone();
tree.clear();
for k in 0..workload_size {
assert!(tree_clone.peek_with(&k, |_, _| ()).is_some());
}
tree_clone.clear();

while INST_CNT.load(Relaxed) != 0 {
Guard::new().accelerate();
tokio::task::yield_now().await;
}
}

#[cfg_attr(miri, ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn integer_key() {
Expand Down
10 changes: 6 additions & 4 deletions src/tree_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ where
let mut new_root = None;
loop {
let guard = Guard::new();
if let Some(root_ref) = self.root.load(Acquire, &guard).as_ref() {
let root_ptr = self.root.load(Acquire, &guard);
if let Some(root_ref) = root_ptr.as_ref() {
match root_ref.insert(key, val, &mut (), &guard) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
Expand All @@ -185,7 +186,7 @@ where
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root(k, v, &self.root, &guard);
let (k, v) = Node::split_root(root_ptr, &self.root, k, v, &guard);
key = k;
val = v;
continue;
Expand Down Expand Up @@ -245,7 +246,8 @@ where

let need_await = {
let guard = Guard::new();
if let Some(root_ref) = self.root.load(Acquire, &guard).as_ref() {
let root_ptr = self.root.load(Acquire, &guard);
if let Some(root_ref) = root_ptr.as_ref() {
match root_ref.insert(key, val, &mut async_wait_pinned, &guard) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
Expand All @@ -257,7 +259,7 @@ where
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root(k, v, &self.root, &guard);
let (k, v) = Node::split_root(root_ptr, &self.root, k, v, &guard);
key = k;
val = v;
continue;
Expand Down
79 changes: 47 additions & 32 deletions src/tree_index/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,19 @@ where
/// Splits the current root node.
#[inline]
pub(super) fn split_root(
root_ptr: Ptr<Node<K, V>>,
root: &AtomicShared<Node<K, V>>,
key: K,
val: V,
root: &AtomicShared<Node<K, V>>,
guard: &Guard,
) -> (K, V) {
// The fact that the `TreeIndex` calls this function means that the root is full and
// locked.
// The fact that the `TreeIndex` calls this function means the root is full and locked.
let mut new_root = Shared::new(Node::new_internal_node());
if let Some(Self::Internal(internal_node)) = unsafe { new_root.get_mut() } {
internal_node.unbounded_child = root.clone(Relaxed, guard);
if let (Some(Self::Internal(internal_node)), Some(old_root)) = (
unsafe { new_root.get_mut() },
root.get_shared(Relaxed, guard),
) {
internal_node.unbounded_child = AtomicShared::from(old_root);
let result = internal_node.split_node(
key,
val,
Expand All @@ -203,17 +206,37 @@ where
};

// Updates the pointer before unlocking the root.
let new_root_ref = new_root.get_guarded_ptr(guard).as_ref();
if let Some(old_root) = root.swap((Some(new_root), Tag::None), Release).0 {
if let Some(Self::Internal(internal_node)) = new_root_ref.as_ref() {
internal_node.finish_split();
old_root.commit(guard);
match root.compare_exchange(
root_ptr,
(Some(new_root), Tag::None),
Release,
Relaxed,
guard,
) {
Ok((old_root, new_root_ptr)) => {
if let Some(Self::Internal(internal_node)) = new_root_ptr.as_ref() {
internal_node.finish_split();
}
if let Some(old_root) = old_root {
old_root.commit(guard);
};
}
let _: bool = old_root.release();
};

Err((new_root, old_root_ptr)) => {
// The root has been cleared.
if let Some(Self::Internal(internal_node)) = new_root.as_deref() {
internal_node.finish_split();
}
if let Some(old_root) = old_root_ptr.as_ref() {
old_root.rollback(guard);
}
}
}
(key, val)
} else {
// The root has been cleared.
if let Some(old_root) = root_ptr.as_ref() {
old_root.rollback(guard);
}
(key, val)
}
}
Expand Down Expand Up @@ -262,13 +285,6 @@ where
return false;
}

let new_root_ptr = root.load(Acquire, guard);
if root_ptr != new_root_ptr {
// The root node has been changed.
root_ptr = new_root_ptr;
continue;
}

let new_root = match root_ref {
Node::Internal(internal_node) => {
if internal_node.retired() {
Expand All @@ -293,18 +309,17 @@ where
}
};

if let Some(new_root_ptr) = new_root.as_ref().map(|n| n.get_guarded_ptr(guard)) {
root_ptr = new_root_ptr;
} else {
root_ptr = Ptr::null();
}

if let (Some(old_root), _) = root.swap((new_root, Tag::None), Release) {
let _: bool = old_root.release();
}

if let Some(internal_node_locker) = internal_node_locker {
internal_node_locker.unlock_retire();
match root.compare_exchange(root_ptr, (new_root, Tag::None), Release, Relaxed, guard) {
Ok((_, new_root_ptr)) => {
root_ptr = new_root_ptr;
if let Some(internal_node_locker) = internal_node_locker {
internal_node_locker.unlock_retire();
}
}
Err((_, new_root_ptr)) => {
// The root node has been changed.
root_ptr = new_root_ptr;
}
}
}

Expand Down

0 comments on commit 124cb66

Please sign in to comment.