From 124cb66fb8156dfaa112c032d456318f15a3ffec Mon Sep 17 00:00:00 2001 From: Changgyoo Park Date: Sat, 7 Sep 2024 21:47:32 +0200 Subject: [PATCH] fix(tree_index, #156): data race between clear and insert fix a data race issue between clear and insert when the node is being split while being cleared at the same time. --- Cargo.toml | 2 +- src/tests/correctness.rs | 81 ++++++++++++++++++++++------------------ src/tree_index.rs | 10 +++-- src/tree_index/node.rs | 79 +++++++++++++++++++++++---------------- 4 files changed, 99 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ed7916..8fc3e75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "scc" description = "High performance containers and utilities for concurrent and asynchronous programming" documentation = "https://docs.rs/scc" -version = "2.1.16" +version = "2.1.17" authors = ["wvwwvwwv "] edition = "2021" rust-version = "1.65.0" diff --git a/src/tests/correctness.rs b/src/tests/correctness.rs index 090bfb8..d2defcf 100644 --- a/src/tests/correctness.rs +++ b/src/tests/correctness.rs @@ -1804,6 +1804,7 @@ mod treeindex_test { use proptest::prelude::*; use proptest::strategy::ValueTree; use proptest::test_runner::TestRunner; + use sdd::suspend; use std::collections::BTreeSet; use std::ops::RangeInclusive; use std::panic::UnwindSafe; @@ -1901,26 +1902,57 @@ mod treeindex_test { } } - #[cfg_attr(miri, ignore)] - #[tokio::test] - async fn insert_remove_async() { + #[test] + fn insert_remove_clear() { static INST_CNT: AtomicUsize = AtomicUsize::new(0); - let tree: TreeIndex = TreeIndex::default(); - let workload_size = 1024; - for k in 0..workload_size { - assert!(tree.insert_async(k, R::new(&INST_CNT)).await.is_ok()); + let num_threads = 3; + let num_iter = if cfg!(miri) { 1 } else { 8 }; + let workload_size = if cfg!(miri) { 32 } else { 1024 }; + let tree: Arc> = Arc::new(TreeIndex::default()); + let mut thread_handles = Vec::with_capacity(num_threads); + let barrier = Arc::new(Barrier::new(num_threads)); + for task_id in 0..num_threads { + let barrier_clone = barrier.clone(); + let tree_clone = tree.clone(); + thread_handles.push(thread::spawn(move || { + for _ in 0..num_iter { + barrier_clone.wait(); + match task_id { + 0 => { + for k in 0..workload_size { + assert!(tree_clone.insert(k, R::new(&INST_CNT)).is_ok()); + } + } + 1 => { + for k in 0..workload_size / 8 { + tree_clone.remove(&(k * 4)); + } + } + _ => { + for _ in 0..workload_size / 64 { + if tree_clone.len() >= workload_size / 4 { + tree_clone.clear(); + } + } + } + } + tree_clone.clear(); + assert!(suspend()); + } + drop(tree_clone); + })) } - assert!(INST_CNT.load(Relaxed) >= workload_size); - assert_eq!(tree.len(), workload_size); - for k in 0..workload_size { - assert!(tree.remove_async(&k).await); + + for handle in thread_handles { + handle.join().unwrap(); } - assert_eq!(tree.len(), 0); + + drop(tree); while INST_CNT.load(Relaxed) != 0 { Guard::new().accelerate(); - tokio::task::yield_now().await; + thread::yield_now(); } } @@ -1968,29 +2000,6 @@ mod treeindex_test { } } - #[cfg_attr(miri, ignore)] - #[tokio::test] - async fn clone_async() { - static INST_CNT: AtomicUsize = AtomicUsize::new(0); - let tree: TreeIndex = TreeIndex::default(); - - let workload_size = 1024; - for k in 0..workload_size { - assert!(tree.insert_async(k, R::new(&INST_CNT)).await.is_ok()); - } - let tree_clone = tree.clone(); - tree.clear(); - for k in 0..workload_size { - assert!(tree_clone.peek_with(&k, |_, _| ()).is_some()); - } - tree_clone.clear(); - - while INST_CNT.load(Relaxed) != 0 { - Guard::new().accelerate(); - tokio::task::yield_now().await; - } - } - #[cfg_attr(miri, ignore)] #[tokio::test(flavor = "multi_thread", worker_threads = 16)] async fn integer_key() { diff --git a/src/tree_index.rs b/src/tree_index.rs index d2d18c1..68c920b 100644 --- a/src/tree_index.rs +++ b/src/tree_index.rs @@ -174,7 +174,8 @@ where let mut new_root = None; loop { let guard = Guard::new(); - if let Some(root_ref) = self.root.load(Acquire, &guard).as_ref() { + let root_ptr = self.root.load(Acquire, &guard); + if let Some(root_ref) = root_ptr.as_ref() { match root_ref.insert(key, val, &mut (), &guard) { Ok(r) => match r { InsertResult::Success => return Ok(()), @@ -185,7 +186,7 @@ where } InsertResult::Duplicate(k, v) => return Err((k, v)), InsertResult::Full(k, v) => { - let (k, v) = Node::split_root(k, v, &self.root, &guard); + let (k, v) = Node::split_root(root_ptr, &self.root, k, v, &guard); key = k; val = v; continue; @@ -245,7 +246,8 @@ where let need_await = { let guard = Guard::new(); - if let Some(root_ref) = self.root.load(Acquire, &guard).as_ref() { + let root_ptr = self.root.load(Acquire, &guard); + if let Some(root_ref) = root_ptr.as_ref() { match root_ref.insert(key, val, &mut async_wait_pinned, &guard) { Ok(r) => match r { InsertResult::Success => return Ok(()), @@ -257,7 +259,7 @@ where } InsertResult::Duplicate(k, v) => return Err((k, v)), InsertResult::Full(k, v) => { - let (k, v) = Node::split_root(k, v, &self.root, &guard); + let (k, v) = Node::split_root(root_ptr, &self.root, k, v, &guard); key = k; val = v; continue; diff --git a/src/tree_index/node.rs b/src/tree_index/node.rs index b05f590..8bc9d7f 100644 --- a/src/tree_index/node.rs +++ b/src/tree_index/node.rs @@ -178,16 +178,19 @@ where /// Splits the current root node. #[inline] pub(super) fn split_root( + root_ptr: Ptr>, + root: &AtomicShared>, key: K, val: V, - root: &AtomicShared>, guard: &Guard, ) -> (K, V) { - // The fact that the `TreeIndex` calls this function means that the root is full and - // locked. + // The fact that the `TreeIndex` calls this function means the root is full and locked. let mut new_root = Shared::new(Node::new_internal_node()); - if let Some(Self::Internal(internal_node)) = unsafe { new_root.get_mut() } { - internal_node.unbounded_child = root.clone(Relaxed, guard); + if let (Some(Self::Internal(internal_node)), Some(old_root)) = ( + unsafe { new_root.get_mut() }, + root.get_shared(Relaxed, guard), + ) { + internal_node.unbounded_child = AtomicShared::from(old_root); let result = internal_node.split_node( key, val, @@ -203,17 +206,37 @@ where }; // Updates the pointer before unlocking the root. - let new_root_ref = new_root.get_guarded_ptr(guard).as_ref(); - if let Some(old_root) = root.swap((Some(new_root), Tag::None), Release).0 { - if let Some(Self::Internal(internal_node)) = new_root_ref.as_ref() { - internal_node.finish_split(); - old_root.commit(guard); + match root.compare_exchange( + root_ptr, + (Some(new_root), Tag::None), + Release, + Relaxed, + guard, + ) { + Ok((old_root, new_root_ptr)) => { + if let Some(Self::Internal(internal_node)) = new_root_ptr.as_ref() { + internal_node.finish_split(); + } + if let Some(old_root) = old_root { + old_root.commit(guard); + }; } - let _: bool = old_root.release(); - }; - + Err((new_root, old_root_ptr)) => { + // The root has been cleared. + if let Some(Self::Internal(internal_node)) = new_root.as_deref() { + internal_node.finish_split(); + } + if let Some(old_root) = old_root_ptr.as_ref() { + old_root.rollback(guard); + } + } + } (key, val) } else { + // The root has been cleared. + if let Some(old_root) = root_ptr.as_ref() { + old_root.rollback(guard); + } (key, val) } } @@ -262,13 +285,6 @@ where return false; } - let new_root_ptr = root.load(Acquire, guard); - if root_ptr != new_root_ptr { - // The root node has been changed. - root_ptr = new_root_ptr; - continue; - } - let new_root = match root_ref { Node::Internal(internal_node) => { if internal_node.retired() { @@ -293,18 +309,17 @@ where } }; - if let Some(new_root_ptr) = new_root.as_ref().map(|n| n.get_guarded_ptr(guard)) { - root_ptr = new_root_ptr; - } else { - root_ptr = Ptr::null(); - } - - if let (Some(old_root), _) = root.swap((new_root, Tag::None), Release) { - let _: bool = old_root.release(); - } - - if let Some(internal_node_locker) = internal_node_locker { - internal_node_locker.unlock_retire(); + match root.compare_exchange(root_ptr, (new_root, Tag::None), Release, Relaxed, guard) { + Ok((_, new_root_ptr)) => { + root_ptr = new_root_ptr; + if let Some(internal_node_locker) = internal_node_locker { + internal_node_locker.unlock_retire(); + } + } + Err((_, new_root_ptr)) => { + // The root node has been changed. + root_ptr = new_root_ptr; + } } }