diff --git a/src/core/dash.h b/src/core/dash.h index a7fd01fc257b..0c67fd6b2c41 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -222,20 +222,25 @@ class DashTable : public detail::DashTableBase { // Returns: cursor with a random position Cursor GetRandomCursor(absl::BitGen* bitgen); - // Traverses over a single bucket in table and calls cb(iterator) 0 or more + // Traverses over a single logical bucket in table and calls cb(iterator) 0 or more // times. if cursor=0 starts traversing from the beginning, otherwise continues from where it // stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket - // granularity, which means for each non-empty bucket it calls cb per each entry in the bucket - // before returning. Unlike begin/end interface, traverse is stable during table mutations. - // It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the table - // during the traversal, then Traverse() will eventually reach it even when the - // table shrinks or grows. - // Returns: cursor that is guaranteed to be less than 2^40. + // logical granularity, which means for each non-empty bucket it calls cb per each entry in the + // logical bucket before returning. Unlike begin/end interface, traverse is stable during table + // mutations. It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the + // table during the traversal, then Traverse() will eventually reach it even when the table + // shrinks or grows. Returns: cursor that is guaranteed to be less than 2^40. template Cursor Traverse(Cursor curs, Cb&& cb); - // Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by - // calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket. - template void TraverseBucket(const_iterator it, Cb&& cb); + // Traverses over physical buckets. It calls cb once for each bucket by passing a bucket iterator. + // if cursor=0 starts traversing from the beginning, otherwise continues from where + // it stopped. returns 0 if the supplied cursor reached end of traversal. + // Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in + // bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets + // that existed at the beginning of traversal. + template Cursor TraverseBuckets(Cursor curs, Cb&& cb); + + Cursor AdvanceCursorBucketOrder(Cursor cursor); // Traverses over a single bucket in table and calls cb(iterator). The traverse order will be // segment by segment over physical backets. @@ -253,6 +258,10 @@ class DashTable : public detail::DashTableBase { return const_bucket_iterator{this, segment_id, uint8_t(bucket_id)}; } + bucket_iterator BucketIt(unsigned segment_id, unsigned bucket_id) { + return bucket_iterator{this, segment_id, uint8_t(bucket_id)}; + } + iterator GetIterator(unsigned segment_id, unsigned bucket_id, unsigned slot_id) { return iterator{this, segment_id, uint8_t(bucket_id), uint8_t(slot_id)}; } @@ -958,14 +967,48 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { return Cursor{global_depth_, sid, bid}; } +template +auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> Cursor { + // We fix bid and go over all segments. Once we reach the end we increase bid and repeat. + uint32_t sid = cursor.segment_id(global_depth_); + uint8_t bid = cursor.bucket_id(); + sid = NextSeg(sid); + if (sid >= segment_.size()) { + sid = 0; + ++bid; + + if (bid >= SegmentType::kTotalBuckets) + return 0; // "End of traversal" cursor. + } + return Cursor{global_depth_, sid, bid}; +} + template template -void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) { - SegmentType& s = *segment_[it.seg_id_]; - const auto& b = s.GetBucket(it.bucket_id_); - b.ForEachSlot([it, cb = std::move(cb), table = this](auto* bucket, uint8_t slot, bool probe) { - cb(iterator{table, it.seg_id_, it.bucket_id_, slot}); - }); +auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor { + if (cursor.bucket_id() >= SegmentType::kTotalBuckets) // sanity. + return 0; + + constexpr uint32_t kMaxIterations = 8; + bool invoked = false; + + for (uint32_t i = 0; i < kMaxIterations; ++i) { + uint32_t sid = cursor.segment_id(global_depth_); + uint8_t bid = cursor.bucket_id(); + SegmentType* s = segment_[sid]; + assert(s); + + const auto& bucket = s->GetBucket(bid); + if (bucket.GetBusy()) { // Invoke callback only if bucket has elements. + cb(BucketIt(sid, bid)); + invoked = true; + } + + cursor = AdvanceCursorBucketOrder(cursor); + if (invoked || !cursor) // Break end of traversal or callback invoked. + return cursor; + } + return cursor; } } // namespace dfly diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 24577fb45e97..cb8ddbc9a13d 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -566,26 +566,6 @@ TEST_F(DashTest, Traverse) { EXPECT_EQ(kNumItems - 1, nums.back()); } -TEST_F(DashTest, Bucket) { - constexpr auto kNumItems = 250; - for (size_t i = 0; i < kNumItems; ++i) { - dt_.Insert(i, 0); - } - std::vector s; - auto it = dt_.begin(); - auto bucket_it = Dash64::BucketIt(it); - - dt_.TraverseBucket(it, [&](auto i) { s.push_back(i->first); }); - - unsigned num_items = 0; - while (!bucket_it.is_done()) { - ASSERT_TRUE(find(s.begin(), s.end(), bucket_it->first) != s.end()); - ++bucket_it; - ++num_items; - } - EXPECT_EQ(s.size(), num_items); -} - TEST_F(DashTest, TraverseSegmentOrder) { constexpr auto kNumItems = 50; for (size_t i = 0; i < kNumItems; ++i) { @@ -610,6 +590,41 @@ TEST_F(DashTest, TraverseSegmentOrder) { EXPECT_EQ(kNumItems - 1, nums.back()); } +TEST_F(DashTest, TraverseBucketOrder) { + constexpr auto kNumItems = 18000; + for (size_t i = 0; i < kNumItems; ++i) { + dt_.Insert(i, i); + } + for (size_t i = 0; i < kNumItems; ++i) { + dt_.Erase(i); + } + constexpr auto kSparseItems = kNumItems / 50; + for (size_t i = 0; i < kSparseItems; ++i) { // create sparse table + dt_.Insert(i, i); + } + + vector nums; + auto tr_cb = [&](Dash64::bucket_iterator it) { + VLOG(1) << "call cb"; + while (!it.is_done()) { + nums.push_back(it->first); + VLOG(1) << it.bucket_id() << " " << it.slot_id() << " " << it->first; + ++it; + } + }; + + Dash64::Cursor cursor; + do { + cursor = dt_.TraverseBuckets(cursor, tr_cb); + } while (cursor); + + sort(nums.begin(), nums.end()); + nums.resize(unique(nums.begin(), nums.end()) - nums.begin()); + ASSERT_EQ(kSparseItems, nums.size()); + EXPECT_EQ(0, nums[0]); + EXPECT_EQ(kSparseItems - 1, nums.back()); +} + struct TestEvictionPolicy { static constexpr bool can_evict = true; static constexpr bool can_gc = false; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 745233552526..77cec6d9ef7e 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn return; PrimeTable::Cursor next = - db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); + pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; PushSerialized(false); @@ -242,14 +242,13 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } } -bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { +bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { ++stats_.savecb_calls; auto check = [&](auto v) { if (v >= snapshot_version_) { // either has been already serialized or added after snapshotting started. - DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() - << " at " << v; + DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v; ++stats_.skipped; return false; } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 38ad86c889ad..e3839fb9bd88 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -89,7 +89,7 @@ class SliceSnapshot { void SwitchIncrementalFb(Context* cntx, LSN lsn); // Called on traversing cursor by IterateBucketsFb. - bool BucketSaveCb(PrimeIterator it); + bool BucketSaveCb(PrimeTable::bucket_iterator it); // Serialize single bucket. // Returns number of serialized entries, updates bucket version to snapshot version.