Skip to content

Commit

Permalink
feat(bag, queue, stack, #125): add len and iter
Browse files Browse the repository at this point in the history
Iterators are implemented for queue and stack, and the len method is
implemented for bag, queue, and stack.
  • Loading branch information
wvwwvwwv committed Jan 26, 2024
1 parent 63b3414 commit c9d183d
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 17 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

2.0.13

* Fix [#120](https://github.com/wvwwvwwv/scalable-concurrent-containers/issues/120).
* Add `iter` to `Queue` and `Stack`.
* Add `len` to `Bag`, `Queue`, and `Stack`.

2.0.12

Expand Down
24 changes: 20 additions & 4 deletions examples/src/ebr.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#[cfg(test)]
mod examples {
use scc::ebr::{Guard, Owned, Shared};
use scc::ebr::{AtomicShared, Guard, Owned, Shared, Tag};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
use std::thread;

struct R(&'static AtomicIsize);
Expand Down Expand Up @@ -34,14 +34,30 @@ mod examples {
static DROP_CNT: AtomicIsize = AtomicIsize::new(0);

let r1 = Owned::new(R(&DROP_CNT));
let r2 = AtomicShared::new(R(&DROP_CNT));

thread::scope(|s| {
s.spawn(|| {
let guard = Guard::new();
let ptr = r1.get_guarded_ptr(&guard);
drop(r1);

// `ptr` can outlive `r1`.
assert_eq!(ptr.as_ref().unwrap().0.load(Relaxed), 0);
});
s.spawn(|| {
let r2 = Owned::new(R(&DROP_CNT));
drop(r2);
let guard = Guard::new();
let ptr = r2.load(Acquire, &guard);
assert_eq!(ptr.as_ref().unwrap().0.load(Relaxed), 0);

let r3 = r2.get_shared(Acquire, &guard).unwrap();
drop(guard);

// `r3` can outlive `guard`.
assert_eq!(r3.0.load(Relaxed), 0);

let r4 = r2.swap((None, Tag::None), Acquire).0.unwrap();
assert_eq!(r4.0.load(Relaxed), 0);
});
});

Expand Down
48 changes: 41 additions & 7 deletions src/bag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,37 @@ impl<T, const ARRAY_LEN: usize> Bag<T, ARRAY_LEN> {
self.primary_storage.pop_all(acc, &mut fold, true)
}

/// Returns the number of entries in the [`Bag`].
///
/// This method iterates over all the entry arrays in the [`Bag`] to count the number of
/// entries, therefore its time complexity is `O(N)`.
///
/// # Examples
///
/// ```
/// use scc::Bag;
///
/// let bag: Bag<usize> = Bag::default();
/// assert_eq!(bag.len(), 0);
///
/// bag.push(7);
/// assert_eq!(bag.len(), 1);
///
/// for v in 0..64 {
/// bag.push(v);
/// }
/// bag.pop();
/// assert_eq!(bag.len(), 64);
/// ```
#[inline]
pub fn len(&self) -> usize {
self.stack
.iter(&Guard::new())
.fold(self.primary_storage.len(), |acc, storage| {
acc + storage.len()
})
}

/// Returns `true` if the [`Bag`] is empty.
///
/// # Examples
Expand All @@ -197,7 +228,7 @@ impl<T, const ARRAY_LEN: usize> Bag<T, ARRAY_LEN> {
/// ```
#[inline]
pub fn is_empty(&self) -> bool {
if self.primary_storage.is_empty() {
if self.primary_storage.len() == 0 {
self.stack.is_empty()
} else {
false
Expand Down Expand Up @@ -361,6 +392,15 @@ impl<T, const ARRAY_LEN: usize> Storage<T, ARRAY_LEN> {
storage
}

/// Returns the number of entries.
fn len(&self) -> usize {
let metadata = self.metadata.load(Relaxed);
let instance_bitmap = Self::instance_bitmap(metadata);
let owned_bitmap = Self::owned_bitmap(metadata);
let valid_entries_bitmap = instance_bitmap & (!owned_bitmap);
valid_entries_bitmap.count_ones() as usize
}

/// Pushes a new value.
fn push(&self, val: T, allow_empty: bool) -> Option<T> {
let mut metadata = self.metadata.load(Relaxed);
Expand Down Expand Up @@ -541,12 +581,6 @@ impl<T, const ARRAY_LEN: usize> Storage<T, ARRAY_LEN> {
}
}

/// Returns `true` if empty.
fn is_empty(&self) -> bool {
let metadata = self.metadata.load(Acquire);
Self::instance_bitmap(metadata) == 0
}

#[allow(clippy::cast_possible_truncation)]
fn instance_bitmap(metadata: usize) -> u32 {
metadata.wrapping_shr(ARRAY_LEN as u32) as u32
Expand Down
81 changes: 81 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::ebr::{AtomicShared, Guard, Ptr, Shared, Tag};
use super::linked_list::{Entry, LinkedList};
use std::fmt::{self, Debug};
use std::iter::FusedIterator;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// [`Queue`] is a lock-free concurrent first-in-first-out container.
Expand All @@ -14,6 +15,14 @@ pub struct Queue<T> {
newest: AtomicShared<Entry<T>>,
}

/// An iterator over the entries of a [`Queue`].
///
/// [`Iter`] reads the oldest entry first.
pub struct Iter<'g, T> {
current: Ptr<'g, Entry<T>>,
guard: &'g Guard,
}

impl<T: 'static> Queue<T> {
/// Pushes an instance of `T`.
///
Expand Down Expand Up @@ -262,6 +271,32 @@ impl<T> Queue<T> {
reader(None)
}

/// Returns the number of entries in the [`Queue`].
///
/// This method iterates over all the entries in the [`Queue`] to count them, therefore its
/// time complexity is `O(N)`.
///
/// # Examples
///
/// ```
/// use scc::Queue;
///
/// let queue: Queue<usize> = Queue::default();
/// assert_eq!(queue.len(), 0);
///
/// queue.push(7);
/// queue.push(11);
/// assert_eq!(queue.len(), 2);
///
/// queue.pop();
/// queue.pop();
/// assert_eq!(queue.len(), 0);
/// ```
#[inline]
pub fn len(&self) -> usize {
self.iter(&Guard::new()).count()
}

/// Returns `true` if the [`Queue`] is empty.
///
/// # Examples
Expand All @@ -280,6 +315,36 @@ impl<T> Queue<T> {
self.newest.is_null(Acquire)
}

/// Returns an [`Iter`].
///
/// # Examples
///
/// ```
/// use scc::ebr::Guard;
/// use scc::Queue;
///
/// let queue: Queue<usize> = Queue::default();
/// assert_eq!(queue.iter(&Guard::new()).count(), 0);
///
/// queue.push(7);
/// queue.push(11);
/// queue.push(17);
///
/// let guard = Guard::new();
/// let mut iter = queue.iter(&guard);
/// assert_eq!(*iter.next().unwrap(), 7);
/// assert_eq!(*iter.next().unwrap(), 11);
/// assert_eq!(*iter.next().unwrap(), 17);
/// assert!(iter.next().is_none());
/// ```
#[inline]
pub fn iter<'g>(&self, guard: &'g Guard) -> Iter<'g, T> {
Iter {
current: self.cleanup_oldest(guard),
guard,
}
}

/// Pushes an entry into the [`Queue`].
fn push_if_internal<F: FnMut(Option<&Entry<T>>) -> bool>(
&self,
Expand Down Expand Up @@ -434,3 +499,19 @@ impl<T> Default for Queue<T> {
}
}
}

impl<'g, T> FusedIterator for Iter<'g, T> {}

impl<'g, T> Iterator for Iter<'g, T> {
type Item = &'g T;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(current) = self.current.as_ref() {
self.current = current.next_ptr(Acquire, self.guard);
Some(current)
} else {
None
}
}
}
81 changes: 81 additions & 0 deletions src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::ebr::{AtomicShared, Guard, Ptr, Shared, Tag};
use super::linked_list::{Entry, LinkedList};
use std::fmt::{self, Debug};
use std::iter::FusedIterator;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};

/// [`Stack`] is a lock-free concurrent last-in-first-out container.
Expand All @@ -11,6 +12,14 @@ pub struct Stack<T> {
newest: AtomicShared<Entry<T>>,
}

/// An iterator over the entries of a [`Stack`].
///
/// [`Iter`] reads the newest entry first.
pub struct Iter<'g, T> {
current: Ptr<'g, Entry<T>>,
guard: &'g Guard,
}

impl<T: 'static> Stack<T> {
/// Pushes an instance of `T`.
///
Expand Down Expand Up @@ -284,6 +293,32 @@ impl<T> Stack<T> {
)
}

/// Returns the number of entries in the [`Stack`].
///
/// This method iterates over all the entries in the [`Stack`] to count them, therefore its
/// time complexity is `O(N)`.
///
/// # Examples
///
/// ```
/// use scc::Stack;
///
/// let stack: Stack<usize> = Stack::default();
/// assert_eq!(stack.len(), 0);
///
/// stack.push(7);
/// stack.push(11);
/// assert_eq!(stack.len(), 2);
///
/// stack.pop();
/// stack.pop();
/// assert_eq!(stack.len(), 0);
/// ```
#[inline]
pub fn len(&self) -> usize {
self.iter(&Guard::new()).count()
}

/// Returns `true` if the [`Stack`] is empty.
///
/// # Examples
Expand All @@ -304,6 +339,36 @@ impl<T> Stack<T> {
.is_null()
}

/// Returns an [`Iter`].
///
/// # Examples
///
/// ```
/// use scc::ebr::Guard;
/// use scc::Stack;
///
/// let stack: Stack<usize> = Stack::default();
/// assert_eq!(stack.iter(&Guard::new()).count(), 0);
///
/// stack.push(7);
/// stack.push(11);
/// stack.push(17);
///
/// let guard = Guard::new();
/// let mut iter = stack.iter(&guard);
/// assert_eq!(*iter.next().unwrap(), 17);
/// assert_eq!(*iter.next().unwrap(), 11);
/// assert_eq!(*iter.next().unwrap(), 7);
/// assert!(iter.next().is_none());
/// ```
#[inline]
pub fn iter<'g>(&self, guard: &'g Guard) -> Iter<'g, T> {
Iter {
current: self.cleanup_newest(self.newest.load(Acquire, guard), guard),
guard,
}
}

/// Pushes an entry into the [`Stack`].
fn push_if_internal<F: FnMut(Option<&Entry<T>>) -> bool>(
&self,
Expand Down Expand Up @@ -421,3 +486,19 @@ impl<T> Default for Stack<T> {
}
}
}

impl<'g, T> FusedIterator for Iter<'g, T> {}

impl<'g, T> Iterator for Iter<'g, T> {
type Item = &'g T;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(current) = self.current.as_ref() {
self.current = current.next_ptr(Acquire, self.guard);
Some(current)
} else {
None
}
}
}
Loading

0 comments on commit c9d183d

Please sign in to comment.