Skip to content

Commit

Permalink
feat(tree_index, #120): optimize leaf-node range removal
Browse files Browse the repository at this point in the history
  • Loading branch information
wvwwvwwv committed Jan 30, 2024
1 parent ebe2e31 commit ac205ce
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 73 deletions.
140 changes: 84 additions & 56 deletions src/tree_index/internal_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,15 @@ where
range: &R,
start_unbounded: bool,
end_unbounded: bool,
last_left_valid_leaf: Option<&'g Leaf<K, V>>,
first_right_valid_node: Option<&'g Node<K, V>>,
valid_lower_max_leaf: Option<&'g Leaf<K, V>>,
valid_upper_min_node: Option<&'g Node<K, V>>,
async_wait: &mut D,
guard: &'g Guard,
) -> Result<usize, ()> {
// If `last_left_valid_leaf` is supplied, this method is removing entries near the upper
// bound of the range.
debug_assert!(last_left_valid_leaf.is_none() || start_unbounded);
debug_assert!(last_left_valid_leaf.is_none() || first_right_valid_node.is_none());
debug_assert!(valid_lower_max_leaf.is_none() || start_unbounded);
debug_assert!(valid_lower_max_leaf.is_none() || valid_upper_min_node.is_none());

let Some(_lock) = Locker::try_lock(self) else {
self.wait(async_wait);
Expand All @@ -418,17 +418,16 @@ where

let mut current_state = RemoveRangeState::Below;
let mut num_children = 1;
let mut last_left_valid_child = None;
let mut first_right_valid_child = None;
let scanner = Scanner::new(&self.children);
for (key, child) in scanner {
let mut lower_border = None;
let mut upper_border = None;
let mut cleanup_target = None;

for (key, child) in Scanner::new(&self.children) {
let contains = range.contains(key);
current_state = if contains {
match current_state {
RemoveRangeState::Below => {
if start_unbounded {
// If the start of the range is unbounded, then any child with the max
// key contained in the range can be removed.
RemoveRangeState::FullyContained
} else {
RemoveRangeState::MaybeBelow
Expand All @@ -437,9 +436,7 @@ where
RemoveRangeState::MaybeBelow | RemoveRangeState::FullyContained => {
RemoveRangeState::FullyContained
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => {
unreachable!()
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => unreachable!(),
}
} else {
match current_state {
Expand All @@ -454,9 +451,14 @@ where
};

match current_state {
RemoveRangeState::Below | RemoveRangeState::MaybeBelow => {
RemoveRangeState::Below => {
debug_assert!(!start_unbounded);
num_children += 1;
}
RemoveRangeState::MaybeBelow => {
debug_assert!(!start_unbounded);
num_children += 1;
last_left_valid_child.replace((key, child));
lower_border.replace((key, child));
}
RemoveRangeState::FullyContained => {
// Get rid of the child.
Expand All @@ -467,34 +469,71 @@ where
child.swap((None, Tag::None), Relaxed);
}
RemoveRangeState::MaybeAbove => {
// TODO: #120 need to recursively call this method for the child.
//
// `start_unbounded = true` for recursive calls.
debug_assert!(!end_unbounded);
num_children += 1;
first_right_valid_child.replace((Some(key), child));
upper_border.replace((Some(key), child));
break;
}
RemoveRangeState::Above => return Ok(num_children + 1),
}
}

if !end_unbounded && first_right_valid_child.is_none() {
// If the end of the range is bounded, and no valid right child is found, use
// the unbounded child as the first right valid child.
first_right_valid_child.replace((None, &self.unbounded_child));
// Now, examine the unbounded child.
if let Some(unbounded) = self.unbounded_child.load(Acquire, guard).as_ref() {
match current_state {
RemoveRangeState::Below => {
// No children possibly in the range, or the unbounded child is an only child.
debug_assert!(lower_border.is_none() && upper_border.is_none());
let empty = unbounded.remove_range(
range,
start_unbounded,
end_unbounded,
valid_lower_max_leaf,
valid_upper_min_node,
async_wait,
guard,
)? == 0;
if empty {
debug_assert!(unbounded.retired(Relaxed));
cleanup_target.replace((None, &self.unbounded_child));
}
}
RemoveRangeState::MaybeBelow => {
debug_assert!(!start_unbounded);
debug_assert!(lower_border.is_some() && upper_border.is_none());
if end_unbounded {
// The unbounded child can be removed.
cleanup_target.replace((None, &self.unbounded_child));
} else {
// The unbounded child needs to be checked.
upper_border.replace((None, &self.unbounded_child));
}
}
RemoveRangeState::FullyContained => {
debug_assert!(upper_border.is_none());
if end_unbounded {
// The unbounded child can be removed.
cleanup_target.replace((None, &self.unbounded_child));
} else {
// The unbounded child needs to be checked.
upper_border.replace((None, &self.unbounded_child));
}
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => {
debug_assert!(!end_unbounded);
}
}
}

let mut cleanup_right_node = None;
if let Some((key, child)) = last_left_valid_child {
debug_assert!(!start_unbounded && last_left_valid_leaf.is_none());
debug_assert!(first_right_valid_child.is_none() || first_right_valid_node.is_none());
if let Some((key, child)) = lower_border {
debug_assert!(!start_unbounded && valid_lower_max_leaf.is_none());
debug_assert!(upper_border.is_none() || valid_upper_min_node.is_none());
if let Some(child_node) = child.load(Acquire, guard).as_ref() {
// Make a recursive call.
let right_child = if first_right_valid_node.is_some() {
first_right_valid_node
let right_child = if valid_upper_min_node.is_some() {
valid_upper_min_node
} else {
first_right_valid_child
upper_border
.as_ref()
.and_then(|(_, c)| c.load(Acquire, guard).as_ref())
};
Expand All @@ -515,43 +554,32 @@ where
child.swap((None, Tag::None), Relaxed);
num_children -= 1;
}

if let Some((key, child)) = first_right_valid_child {
if let Some(child_node) = child.load(Acquire, guard).as_ref() {
if child_node.retired(Relaxed) {
cleanup_right_node.replace((key, child));
}
}
}
}
} else if let Some((key, child)) = first_right_valid_child {
}

if let Some((key, child)) = upper_border {
debug_assert!(!end_unbounded);
if let Some(child_node) = child.load(Acquire, guard).as_ref() {
// Make a recursive call: note that this can be already a part of a recursive call.
let empty = child_node.remove_range(
range,
true,
false,
last_left_valid_leaf,
None,
async_wait,
guard,
)? == 0;
let empty = child_node.retired(Relaxed)
|| child_node.remove_range(
range,
true,
false,
valid_lower_max_leaf,
None,
async_wait,
guard,
)? == 0;

if empty {
debug_assert!(child_node.retired(Relaxed));
cleanup_right_node.replace((key, child));
cleanup_target.replace((key, child));
}
}
} else {
// This amounts to clearing the entire sub-tree.
debug_assert_eq!(num_children, 1);
self.unbounded_child.swap((None, RETIRED), Relaxed);
debug_assert!(self.retired(Relaxed));
return Ok(0);
}

if let Some((key, child)) = cleanup_right_node {
if let Some((key, child)) = cleanup_target {
if let Some(key) = key {
self.children.remove_if(key, &mut |_| true);
child.swap((None, Tag::None), Relaxed);
Expand Down
143 changes: 132 additions & 11 deletions src/tree_index/leaf_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,142 @@ where
/// Removes a range of entries.
///
/// Returns the number of remaining children.
#[allow(clippy::too_many_arguments)]
#[allow(clippy::unused_self, clippy::unnecessary_wraps)]
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
#[inline]
pub(super) fn remove_range<'g, R: RangeBounds<K>, D: DeriveAsyncWait>(
&self,
_range: &R,
_start_unbounded: bool,
_end_unbounded: bool,
_last_left_valid_leaf: Option<&'g Leaf<K, V>>,
_first_right_valid_node: Option<&'g Node<K, V>>,
_async_wait: &mut D,
_guard: &'g Guard,
range: &R,
start_unbounded: bool,
end_unbounded: bool,
valid_lower_max_leaf: Option<&'g Leaf<K, V>>,
valid_upper_min_node: Option<&'g Node<K, V>>,
async_wait: &mut D,
guard: &'g Guard,
) -> Result<usize, ()> {
// TODO: #120 - implement O(1) bulk removal without using `Range`.
Ok(2)
// If `last_left_valid_leaf` is supplied, this method is removing entries near the upper
// bound of the range.
debug_assert!(valid_lower_max_leaf.is_none() || start_unbounded);
debug_assert!(valid_lower_max_leaf.is_none() || valid_upper_min_node.is_none());

let Some(_lock) = Locker::try_lock(self) else {
self.wait(async_wait);
return Err(());
};

let mut current_state = RemoveRangeState::Below;
let mut num_leaves = 1;
let mut lower_border = None;
let mut upper_border = None;

for (key, leaf) in Scanner::new(&self.children) {
let contains = range.contains(key);
current_state = if contains {
match current_state {
RemoveRangeState::Below => {
if start_unbounded {
RemoveRangeState::FullyContained
} else {
RemoveRangeState::MaybeBelow
}
}
RemoveRangeState::MaybeBelow | RemoveRangeState::FullyContained => {
RemoveRangeState::FullyContained
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => unreachable!(),
}
} else {
match current_state {
RemoveRangeState::Below => RemoveRangeState::Below,
RemoveRangeState::MaybeBelow | RemoveRangeState::FullyContained => {
RemoveRangeState::MaybeAbove
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => {
RemoveRangeState::Above
}
}
};

match current_state {
RemoveRangeState::Below => {
debug_assert!(!start_unbounded);
num_leaves += 1;
}
RemoveRangeState::MaybeBelow => {
debug_assert!(!start_unbounded);
num_leaves += 1;
lower_border.replace(leaf);
}
RemoveRangeState::FullyContained => {
// Get rid of the child.
//
// There can be another thread inserting keys into the leaf, and this may
// render those operations completely ineffective.
self.children.remove_if(key, &mut |_| true);
leaf.swap((None, Tag::None), Relaxed);
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => {
debug_assert!(!end_unbounded);
num_leaves += 1;
upper_border.replace(leaf);
break;
}
}
}

// Now, examine the unbounded child.
match current_state {
RemoveRangeState::Below => {
// No children possibly in the range, or the unbounded child is an only child.
debug_assert!(lower_border.is_none() && upper_border.is_none());
upper_border.replace(&self.unbounded_child);
}
RemoveRangeState::MaybeBelow => {
debug_assert!(!start_unbounded);
debug_assert!(lower_border.is_some() && upper_border.is_none());
upper_border.replace(&self.unbounded_child);
}
RemoveRangeState::FullyContained => {
debug_assert!(upper_border.is_none());
upper_border.replace(&self.unbounded_child);
}
RemoveRangeState::MaybeAbove | RemoveRangeState::Above => {
debug_assert!(!end_unbounded && upper_border.is_some());
}
}

if let Some(lower_leaf) = lower_border.and_then(|l| l.load(Acquire, guard).as_ref()) {
debug_assert!(!start_unbounded && valid_lower_max_leaf.is_none());
debug_assert!(upper_border.is_none() || valid_upper_min_node.is_none());
if let Some(valid_upper_min_node) = valid_upper_min_node {
debug_assert!(end_unbounded && upper_border.is_none());
valid_upper_min_node.remove_range(
range,
true,
false,
Some(lower_leaf),
None,
async_wait,
guard,
)?;
} else if let Some(upper_leaf) = upper_border {
debug_assert!(!end_unbounded);
lower_leaf
.link_ref()
.swap((upper_leaf.get_shared(Acquire, guard), Tag::None), Release);
} else {
lower_leaf.link_ref().swap((None, Tag::None), Relaxed);
}
} else if let Some(upper_leaf) = upper_border {
debug_assert!(!end_unbounded && valid_upper_min_node.is_none());
if let Some(valid_lower_max_leaf) = valid_lower_max_leaf {
debug_assert!(start_unbounded && lower_border.is_none());
valid_lower_max_leaf
.link_ref()
.swap((upper_leaf.get_shared(Acquire, guard), Tag::None), Release);
}
}

Ok(num_leaves)
}

/// Splits itself into the given leaf nodes, and returns the middle key value.
Expand Down
Loading

0 comments on commit ac205ce

Please sign in to comment.