From 4dd2e8f15b96876ae8a1262855b0fff0cb140cc3 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Thu, 7 Nov 2024 18:15:16 +0200 Subject: [PATCH 1/5] WIP Signed-off-by: adi_holden --- src/core/dash.h | 40 ++++++++++++++++++++++++++++++++++++++++ src/server/snapshot.cc | 7 +++---- src/server/snapshot.h | 2 +- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index a7fd01fc257b..9f39d2222747 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -233,6 +233,8 @@ class DashTable : public detail::DashTableBase { // Returns: cursor that is guaranteed to be less than 2^40. template Cursor Traverse(Cursor curs, Cb&& cb); + template Cursor TraverseBuckets(Cursor curs, Cb&& cb); + // Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by // calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket. template void TraverseBucket(const_iterator it, Cb&& cb); @@ -253,6 +255,10 @@ class DashTable : public detail::DashTableBase { return const_bucket_iterator{this, segment_id, uint8_t(bucket_id)}; } + bucket_iterator BucketIt(unsigned segment_id, unsigned bucket_id) { + return bucket_iterator{this, segment_id, uint8_t(bucket_id)}; + } + iterator GetIterator(unsigned segment_id, unsigned bucket_id, unsigned slot_id) { return iterator{this, segment_id, uint8_t(bucket_id), uint8_t(slot_id)}; } @@ -958,6 +964,40 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { return Cursor{global_depth_, sid, bid}; } +template +template +auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor curs, Cb&& cb) -> Cursor { + if (curs.bucket_id() >= SegmentType::kTotalBuckets) // sanity. + return 0; + + uint32_t sid = curs.segment_id(global_depth_); + uint8_t bid = curs.bucket_id(); + + bool fetched = false; + + // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. + do { + SegmentType* s = segment_[sid]; + assert(s); + + const auto& bucket = s->GetBucket(bid); + if (bucket.GetBusy()) { // call cb on bucket only if it has elements. + cb(BucketIt(sid, bid)); + fetched = true; + } + sid = NextSeg(sid); + if (sid >= segment_.size()) { + sid = 0; + ++bid; + + if (bid >= SegmentType::kTotalBuckets) + return 0; // "End of traversal" cursor. + } + } while (!fetched); + + return Cursor{global_depth_, sid, bid}; +} + template template void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 745233552526..77cec6d9ef7e 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn return; PrimeTable::Cursor next = - db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); + pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; PushSerialized(false); @@ -242,14 +242,13 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } } -bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { +bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { ++stats_.savecb_calls; auto check = [&](auto v) { if (v >= snapshot_version_) { // either has been already serialized or added after snapshotting started. - DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() - << " at " << v; + DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v; ++stats_.skipped; return false; } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 38ad86c889ad..e3839fb9bd88 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -89,7 +89,7 @@ class SliceSnapshot { void SwitchIncrementalFb(Context* cntx, LSN lsn); // Called on traversing cursor by IterateBucketsFb. - bool BucketSaveCb(PrimeIterator it); + bool BucketSaveCb(PrimeTable::bucket_iterator it); // Serialize single bucket. // Returns number of serialized entries, updates bucket version to snapshot version. From 9f47ccc2b1afdc080e922eb35d9b021d7075689c Mon Sep 17 00:00:00 2001 From: adi_holden Date: Sun, 10 Nov 2024 14:09:53 +0200 Subject: [PATCH 2/5] add comments Signed-off-by: adi_holden --- src/core/dash.h | 33 ++++++++++++--------------------- src/core/dash_test.cc | 20 -------------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 9f39d2222747..89d2a35baa1e 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -222,23 +222,24 @@ class DashTable : public detail::DashTableBase { // Returns: cursor with a random position Cursor GetRandomCursor(absl::BitGen* bitgen); - // Traverses over a single bucket in table and calls cb(iterator) 0 or more + // Traverses over a single logical bucket in table and calls cb(iterator) 0 or more // times. if cursor=0 starts traversing from the beginning, otherwise continues from where it // stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket - // granularity, which means for each non-empty bucket it calls cb per each entry in the bucket - // before returning. Unlike begin/end interface, traverse is stable during table mutations. - // It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the table - // during the traversal, then Traverse() will eventually reach it even when the - // table shrinks or grows. - // Returns: cursor that is guaranteed to be less than 2^40. + // logical granularity, which means for each non-empty bucket it calls cb per each entry in the + // logical bucket before returning. Unlike begin/end interface, traverse is stable during table + // mutations. It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the + // table during the traversal, then Traverse() will eventually reach it even when the table + // shrinks or grows. Returns: cursor that is guaranteed to be less than 2^40. template Cursor Traverse(Cursor curs, Cb&& cb); + // Traverses over a single physical bucket in table call cb once on bucket iterator. + // if cursor=0 starts traversing from the beginning, otherwise continues from where + // it stopped. returns 0 if the supplied cursor reached end of traversal. + // Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in + // bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets + // that existed at the beginning of traversal. template Cursor TraverseBuckets(Cursor curs, Cb&& cb); - // Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by - // calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket. - template void TraverseBucket(const_iterator it, Cb&& cb); - // Traverses over a single bucket in table and calls cb(iterator). The traverse order will be // segment by segment over physical backets. // traverse by segment order does not guarantees coverage if the table grows/shrinks, it is useful @@ -998,14 +999,4 @@ auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor curs, Cb&& cb) -> C return Cursor{global_depth_, sid, bid}; } -template -template -void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) { - SegmentType& s = *segment_[it.seg_id_]; - const auto& b = s.GetBucket(it.bucket_id_); - b.ForEachSlot([it, cb = std::move(cb), table = this](auto* bucket, uint8_t slot, bool probe) { - cb(iterator{table, it.seg_id_, it.bucket_id_, slot}); - }); -} - } // namespace dfly diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 24577fb45e97..68fe68347382 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -566,26 +566,6 @@ TEST_F(DashTest, Traverse) { EXPECT_EQ(kNumItems - 1, nums.back()); } -TEST_F(DashTest, Bucket) { - constexpr auto kNumItems = 250; - for (size_t i = 0; i < kNumItems; ++i) { - dt_.Insert(i, 0); - } - std::vector s; - auto it = dt_.begin(); - auto bucket_it = Dash64::BucketIt(it); - - dt_.TraverseBucket(it, [&](auto i) { s.push_back(i->first); }); - - unsigned num_items = 0; - while (!bucket_it.is_done()) { - ASSERT_TRUE(find(s.begin(), s.end(), bucket_it->first) != s.end()); - ++bucket_it; - ++num_items; - } - EXPECT_EQ(s.size(), num_items); -} - TEST_F(DashTest, TraverseSegmentOrder) { constexpr auto kNumItems = 50; for (size_t i = 0; i < kNumItems; ++i) { From ee346c7ace95db0263cc9f7f4988d6689174877d Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 11 Nov 2024 11:06:20 +0200 Subject: [PATCH 3/5] fix pr Signed-off-by: adi_holden --- src/core/dash.h | 70 +++++++++++++++++++++++-------------------- src/core/dash_test.cc | 35 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 89d2a35baa1e..5d6bd02c42f5 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -240,6 +240,8 @@ class DashTable : public detail::DashTableBase { // that existed at the beginning of traversal. template Cursor TraverseBuckets(Cursor curs, Cb&& cb); + Cursor AdvanceCursorBucketOrder(Cursor cursor); + // Traverses over a single bucket in table and calls cb(iterator). The traverse order will be // segment by segment over physical backets. // traverse by segment order does not guarantees coverage if the table grows/shrinks, it is useful @@ -937,66 +939,70 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { if (curs.bucket_id() >= Policy::kBucketNum) // sanity. return 0; - uint32_t sid = curs.segment_id(global_depth_); - uint8_t bid = curs.bucket_id(); - auto hash_fun = [this](const auto& k) { return policy_.HashFn(k); }; bool fetched = false; // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. - do { + while (!fetched) { + uint32_t sid = curs.segment_id(global_depth_); + uint8_t bid = curs.bucket_id(); + SegmentType* s = segment_[sid]; assert(s); auto dt_cb = [&](const SegmentIterator& it) { cb(iterator{this, sid, it.index, it.slot}); }; fetched = s->TraverseLogicalBucket(bid, hash_fun, std::move(dt_cb)); - sid = NextSeg(sid); - if (sid >= segment_.size()) { - sid = 0; - ++bid; + curs = AdvanceCursorBucketOrder(curs); + if (!curs) // Check for end of traversal + return curs; + } - if (bid >= Policy::kBucketNum) - return 0; // "End of traversal" cursor. - } - } while (!fetched); + return curs; +} +template +auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> Cursor { + uint32_t sid = cursor.segment_id(global_depth_); + uint8_t bid = cursor.bucket_id(); + sid = NextSeg(sid); + if (sid >= segment_.size()) { + sid = 0; + ++bid; + + if (bid >= SegmentType::kTotalBuckets) + return 0; // "End of traversal" cursor. + } return Cursor{global_depth_, sid, bid}; } template template -auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor curs, Cb&& cb) -> Cursor { - if (curs.bucket_id() >= SegmentType::kTotalBuckets) // sanity. +auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor { + if (cursor.bucket_id() >= SegmentType::kTotalBuckets) // sanity. return 0; - uint32_t sid = curs.segment_id(global_depth_); - uint8_t bid = curs.bucket_id(); - - bool fetched = false; + constexpr uint32_t kMaxIterations = 8; + bool invoked = false; - // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. - do { + for (uint32_t i = 0; i < kMaxIterations; ++i) { + uint32_t sid = cursor.segment_id(global_depth_); + uint8_t bid = cursor.bucket_id(); SegmentType* s = segment_[sid]; assert(s); const auto& bucket = s->GetBucket(bid); - if (bucket.GetBusy()) { // call cb on bucket only if it has elements. + if (bucket.GetBusy()) { // Invoke callback only if bucket has elements. cb(BucketIt(sid, bid)); - fetched = true; + invoked = true; } - sid = NextSeg(sid); - if (sid >= segment_.size()) { - sid = 0; - ++bid; - - if (bid >= SegmentType::kTotalBuckets) - return 0; // "End of traversal" cursor. - } - } while (!fetched); - return Cursor{global_depth_, sid, bid}; + cursor = AdvanceCursorBucketOrder(cursor); + if (invoked || !cursor) // Break end of traversal or callback invoked. + return cursor; + } + return cursor; } } // namespace dfly diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 68fe68347382..cb8ddbc9a13d 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -590,6 +590,41 @@ TEST_F(DashTest, TraverseSegmentOrder) { EXPECT_EQ(kNumItems - 1, nums.back()); } +TEST_F(DashTest, TraverseBucketOrder) { + constexpr auto kNumItems = 18000; + for (size_t i = 0; i < kNumItems; ++i) { + dt_.Insert(i, i); + } + for (size_t i = 0; i < kNumItems; ++i) { + dt_.Erase(i); + } + constexpr auto kSparseItems = kNumItems / 50; + for (size_t i = 0; i < kSparseItems; ++i) { // create sparse table + dt_.Insert(i, i); + } + + vector nums; + auto tr_cb = [&](Dash64::bucket_iterator it) { + VLOG(1) << "call cb"; + while (!it.is_done()) { + nums.push_back(it->first); + VLOG(1) << it.bucket_id() << " " << it.slot_id() << " " << it->first; + ++it; + } + }; + + Dash64::Cursor cursor; + do { + cursor = dt_.TraverseBuckets(cursor, tr_cb); + } while (cursor); + + sort(nums.begin(), nums.end()); + nums.resize(unique(nums.begin(), nums.end()) - nums.begin()); + ASSERT_EQ(kSparseItems, nums.size()); + EXPECT_EQ(0, nums[0]); + EXPECT_EQ(kSparseItems - 1, nums.back()); +} + struct TestEvictionPolicy { static constexpr bool can_evict = true; static constexpr bool can_gc = false; From daee83f66d17526ce1238b8076c3834549f0f020 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 11 Nov 2024 11:08:42 +0200 Subject: [PATCH 4/5] fix pr Signed-off-by: adi_holden --- src/core/dash.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 5d6bd02c42f5..07737a60abb3 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -232,7 +232,7 @@ class DashTable : public detail::DashTableBase { // shrinks or grows. Returns: cursor that is guaranteed to be less than 2^40. template Cursor Traverse(Cursor curs, Cb&& cb); - // Traverses over a single physical bucket in table call cb once on bucket iterator. + // Traverses over physical buckets. It calls cb once for each bucket by passing a bucket iterator. // if cursor=0 starts traversing from the beginning, otherwise continues from where // it stopped. returns 0 if the supplied cursor reached end of traversal. // Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in @@ -943,7 +943,6 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { bool fetched = false; - // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. while (!fetched) { uint32_t sid = curs.segment_id(global_depth_); uint8_t bid = curs.bucket_id(); @@ -964,6 +963,7 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { template auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> Cursor { + // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. uint32_t sid = cursor.segment_id(global_depth_); uint8_t bid = cursor.bucket_id(); sid = NextSeg(sid); From 2a6baa43847b9c38689922b5bc884ffbfa05106d Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 11 Nov 2024 13:17:22 +0200 Subject: [PATCH 5/5] fix pr Signed-off-by: adi_holden --- src/core/dash.h | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 07737a60abb3..0c67fd6b2c41 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -939,26 +939,32 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { if (curs.bucket_id() >= Policy::kBucketNum) // sanity. return 0; + uint32_t sid = curs.segment_id(global_depth_); + uint8_t bid = curs.bucket_id(); + auto hash_fun = [this](const auto& k) { return policy_.HashFn(k); }; bool fetched = false; - while (!fetched) { - uint32_t sid = curs.segment_id(global_depth_); - uint8_t bid = curs.bucket_id(); - + // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. + do { SegmentType* s = segment_[sid]; assert(s); auto dt_cb = [&](const SegmentIterator& it) { cb(iterator{this, sid, it.index, it.slot}); }; fetched = s->TraverseLogicalBucket(bid, hash_fun, std::move(dt_cb)); - curs = AdvanceCursorBucketOrder(curs); - if (!curs) // Check for end of traversal - return curs; - } + sid = NextSeg(sid); + if (sid >= segment_.size()) { + sid = 0; + ++bid; - return curs; + if (bid >= Policy::kBucketNum) + return 0; // "End of traversal" cursor. + } + } while (!fetched); + + return Cursor{global_depth_, sid, bid}; } template