Skip to content

Commit

Permalink
fix(kad): improve memory allocation when iterating over kbuckets
Browse files Browse the repository at this point in the history
Proposal to fix libp2p#5712.

I have changed to `ClosestIter` structure to only allocate when `kbucket_size` is higher than `K_VALUE` and only once along the life of `ClosestIter`. I think I did not break anything but I would really like some experienced people with Kademlia to take a look (@guillaumemichel 😉).

Pull-Request: libp2p#5715.
  • Loading branch information
stormshield-frb authored Dec 17, 2024
1 parent 54d7f21 commit 3be7104
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 37 deletions.
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

- Expose Distance private field U256 to public.
See [PR 5705](https://github.com/libp2p/rust-libp2p/pull/5705).
- Fix systematic memory allocation when iterating over `KBuckets`.
See [PR 5715](https://github.com/libp2p/rust-libp2p/pull/5715).

## 0.47.0

Expand Down
2 changes: 2 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ impl Config {
/// Sets the configuration for the k-buckets.
///
/// * Default to K_VALUE.
///
/// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
self.kbucket_config.set_bucket_size(size);
self
Expand Down
109 changes: 72 additions & 37 deletions protocols/kad/src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
use bucket::KBucket;
pub use bucket::NodeStatus;
pub use entry::*;
use smallvec::SmallVec;
use web_time::Instant;

/// Maximum number of k-buckets.
Expand Down Expand Up @@ -282,11 +283,8 @@ where
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: move |b: &KBucket<TKey, _>| -> Vec<_> {
let mut vec = Vec::with_capacity(bucket_size);
vec.extend(b.iter().map(|(n, _)| n.key.clone()));
vec
},
fmap: |(n, _status): (&Node<TKey, TVal>, NodeStatus)| n.key.clone(),
bucket_size,
}
}

Expand All @@ -307,15 +305,11 @@ where
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: move |b: &KBucket<_, TVal>| -> Vec<_> {
b.iter()
.take(bucket_size)
.map(|(n, status)| EntryView {
node: n.clone(),
status,
})
.collect()
fmap: |(n, status): (&Node<TKey, TVal>, NodeStatus)| EntryView {
node: n.clone(),
status,
},
bucket_size,
}
}

Expand Down Expand Up @@ -358,10 +352,12 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
/// distance of the local key to the target.
buckets_iter: ClosestBucketsIter,
/// The iterator over the entries in the currently traversed bucket.
iter: Option<std::vec::IntoIter<TOut>>,
iter: Option<ClosestIterBuffer<TOut>>,
/// The projection function / mapping applied on each bucket as
/// it is encountered, producing the next `iter`ator.
fmap: TMap,
/// The maximal number of nodes that a bucket can contain.
bucket_size: usize,
}

/// An iterator over the bucket indices, in the order determined by the `Distance` of
Expand Down Expand Up @@ -463,41 +459,80 @@ where
TTarget: AsRef<KeyBytes>,
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
TMap: Fn((&Node<TKey, TVal>, NodeStatus)) -> TOut,
TOut: AsRef<KeyBytes>,
{
type Item = TOut;

fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.iter {
Some(iter) => match iter.next() {
Some(k) => return Some(k),
None => self.iter = None,
},
None => {
if let Some(i) = self.buckets_iter.next() {
let bucket = &mut self.table.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}
let mut v = (self.fmap)(bucket);
v.sort_by(|a, b| {
self.target
.as_ref()
.distance(a.as_ref())
.cmp(&self.target.as_ref().distance(b.as_ref()))
});
self.iter = Some(v.into_iter());
} else {
return None;
}
let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() {
if let Some(next) = iter.next() {
self.iter = Some(iter);
return Some(next);
}

let bucket_index = self.buckets_iter.next()?;

// Reusing the same buffer so if there were any allocation, it only happen once over
// a `ClosestIter` life.
iter.buffer.clear();

(iter.buffer, bucket_index)
} else {
let bucket_index = self.buckets_iter.next()?;

// Allocation only occurs if `kbucket_size` is greater than `K_VALUE`.
(SmallVec::with_capacity(self.bucket_size), bucket_index)
};

let bucket = &mut self.table.buckets[bucket_index.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}

buffer.extend(
bucket
.iter()
.take(self.bucket_size)
.map(|e| (self.fmap)(e))
.map(Some),
);
buffer.sort_by(|a, b| {
let a = a.as_ref().expect("just initialized");
let b = b.as_ref().expect("just initialized");
self.target
.as_ref()
.distance(a.as_ref())
.cmp(&self.target.as_ref().distance(b.as_ref()))
});

self.iter = Some(ClosestIterBuffer::new(buffer));
}
}
}

struct ClosestIterBuffer<TOut> {
buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>,
index: usize,
}

impl<TOut> ClosestIterBuffer<TOut> {
fn new(buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>) -> Self {
Self { buffer, index: 0 }
}
}

impl<TOut> Iterator for ClosestIterBuffer<TOut> {
type Item = TOut;

fn next(&mut self) -> Option<Self::Item> {
let entry = self.buffer.get_mut(self.index)?;
self.index += 1;
entry.take()
}
}

/// A reference to a bucket.
pub struct KBucketRef<'a, TKey, TVal> {
index: BucketIndex,
Expand Down

0 comments on commit 3be7104

Please sign in to comment.