Skip to content

Commit

Permalink
fix(#118): the drop of an incomplete AsyncWait leads to a crash
Browse files Browse the repository at this point in the history
Fix intermittent test failures.
  • Loading branch information
wvwwvwwv committed Dec 14, 2023
1 parent 30cdc3e commit 99ba342
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Version 2

2.0.6

* Fix [#118](https://github.com/wvwwvwwv/scalable-concurrent-containers/issues/118).

2.0.5

* Add support for 32-bit binaries.
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "scc"
description = "High performance containers and utilities for concurrent and asynchronous programming"
documentation = "https://docs.rs/scc"
version = "2.0.5"
version = "2.0.6"
authors = ["wvwwvwwv <[email protected]>"]
edition = "2021"
readme = "README.md"
Expand All @@ -25,7 +25,7 @@ proptest = "1.4"
rand = "0.8"
serde_test = "1.0"
static_assertions = "1.1"
tokio = { version = "1.34", features = ["full"] }
tokio = { version = "1.35", features = ["full"] }

[[bench]]
name = "bag"
Expand Down
56 changes: 29 additions & 27 deletions src/wait_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,29 +366,29 @@ mod test {
let data_clone = data.clone();
task_handles.push(tokio::spawn(async move {
barrier_clone.wait().await;
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if wait_queue_clone
.push_async_entry(&mut async_wait_pinned, || {
if data_clone
.compare_exchange(task_id, task_id + 1, Relaxed, Relaxed)
.is_ok()
{
Ok(())
} else {
Err(())
}
})
.is_ok()
{
wait_queue_clone.signal();
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
while wait_queue_clone
.push_async_entry(&mut async_wait_pinned, || {
if data_clone
.compare_exchange(task_id, task_id + 1, Relaxed, Relaxed)
.is_ok()
{
Ok(())
} else {
Err(())
}
})
.is_err()
{
async_wait_pinned.as_mut().await;
if data_clone.load(Relaxed) > task_id {
// The operation was successful, but was signalled by another thread.
break;
}
wait_queue_clone.signal();
async_wait_pinned.await;
tokio::task::yield_now().await;
async_wait_pinned.mutex.take();
}
wait_queue_clone.signal();
}));
}

Expand Down Expand Up @@ -416,16 +416,18 @@ mod test {
for _ in 0..num_tasks {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
assert_eq!(
wait_queue_clone
.push_async_entry(&mut async_wait_pinned, || if task_id % 2 == 0 {
if wait_queue_clone
.push_async_entry(&mut async_wait_pinned, || {
if task_id % 2 == 0 {
Ok(())
} else {
Err(())
})
.is_ok(),
task_id % 2 == 0
);
}
})
.is_ok()
{
assert_eq!(task_id % 2, 0);
}
}
wait_queue_clone.signal();
}));
Expand Down

0 comments on commit 99ba342

Please sign in to comment.