Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server) : snapshot traverse physical buckets #4084

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,25 @@ class DashTable : public detail::DashTableBase {
// Returns: cursor with a random position
Cursor GetRandomCursor(absl::BitGen* bitgen);

// Traverses over a single bucket in table and calls cb(iterator) 0 or more
// Traverses over a single logical bucket in table and calls cb(iterator) 0 or more
// times. if cursor=0 starts traversing from the beginning, otherwise continues from where it
// stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket
// granularity, which means for each non-empty bucket it calls cb per each entry in the bucket
// before returning. Unlike begin/end interface, traverse is stable during table mutations.
// It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the table
// during the traversal, then Traverse() will eventually reach it even when the
// table shrinks or grows.
// Returns: cursor that is guaranteed to be less than 2^40.
// logical granularity, which means for each non-empty bucket it calls cb per each entry in the
// logical bucket before returning. Unlike begin/end interface, traverse is stable during table
// mutations. It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the
// table during the traversal, then Traverse() will eventually reach it even when the table
// shrinks or grows. Returns: cursor that is guaranteed to be less than 2^40.
template <typename Cb> Cursor Traverse(Cursor curs, Cb&& cb);

// Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by
// calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket.
template <typename Cb> void TraverseBucket(const_iterator it, Cb&& cb);
// Traverses over physical buckets. It calls cb once for each bucket by passing a bucket iterator.
// if cursor=0 starts traversing from the beginning, otherwise continues from where
// it stopped. returns 0 if the supplied cursor reached end of traversal.
// Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in
// bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets
// that existed at the beginning of traversal.
template <typename Cb> Cursor TraverseBuckets(Cursor curs, Cb&& cb);

Cursor AdvanceCursorBucketOrder(Cursor cursor);

// Traverses over a single bucket in table and calls cb(iterator). The traverse order will be
// segment by segment over physical backets.
Expand All @@ -253,6 +258,10 @@ class DashTable : public detail::DashTableBase {
return const_bucket_iterator{this, segment_id, uint8_t(bucket_id)};
}

bucket_iterator BucketIt(unsigned segment_id, unsigned bucket_id) {
return bucket_iterator{this, segment_id, uint8_t(bucket_id)};
}

iterator GetIterator(unsigned segment_id, unsigned bucket_id, unsigned slot_id) {
return iterator{this, segment_id, uint8_t(bucket_id), uint8_t(slot_id)};
}
Expand Down Expand Up @@ -958,14 +967,48 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor {
return Cursor{global_depth_, sid, bid};
}

template <typename _Key, typename _Value, typename Policy>
auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> Cursor {
// We fix bid and go over all segments. Once we reach the end we increase bid and repeat.
uint32_t sid = cursor.segment_id(global_depth_);
uint8_t bid = cursor.bucket_id();
sid = NextSeg(sid);
if (sid >= segment_.size()) {
sid = 0;
++bid;

if (bid >= SegmentType::kTotalBuckets)
return 0; // "End of traversal" cursor.
}
return Cursor{global_depth_, sid, bid};
}

template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) {
SegmentType& s = *segment_[it.seg_id_];
const auto& b = s.GetBucket(it.bucket_id_);
b.ForEachSlot([it, cb = std::move(cb), table = this](auto* bucket, uint8_t slot, bool probe) {
cb(iterator{table, it.seg_id_, it.bucket_id_, slot});
});
auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor {
if (cursor.bucket_id() >= SegmentType::kTotalBuckets) // sanity.
return 0;

constexpr uint32_t kMaxIterations = 8;
bool invoked = false;

for (uint32_t i = 0; i < kMaxIterations; ++i) {
uint32_t sid = cursor.segment_id(global_depth_);
uint8_t bid = cursor.bucket_id();
SegmentType* s = segment_[sid];
assert(s);

const auto& bucket = s->GetBucket(bid);
if (bucket.GetBusy()) { // Invoke callback only if bucket has elements.
cb(BucketIt(sid, bid));
invoked = true;
}

cursor = AdvanceCursorBucketOrder(cursor);
if (invoked || !cursor) // Break end of traversal or callback invoked.
return cursor;
}
return cursor;
}

} // namespace dfly
55 changes: 35 additions & 20 deletions src/core/dash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,26 +566,6 @@ TEST_F(DashTest, Traverse) {
EXPECT_EQ(kNumItems - 1, nums.back());
}

TEST_F(DashTest, Bucket) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you want to test the change?

constexpr auto kNumItems = 250;
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, 0);
}
std::vector<uint64_t> s;
auto it = dt_.begin();
auto bucket_it = Dash64::BucketIt(it);

dt_.TraverseBucket(it, [&](auto i) { s.push_back(i->first); });

unsigned num_items = 0;
while (!bucket_it.is_done()) {
ASSERT_TRUE(find(s.begin(), s.end(), bucket_it->first) != s.end());
++bucket_it;
++num_items;
}
EXPECT_EQ(s.size(), num_items);
}

TEST_F(DashTest, TraverseSegmentOrder) {
constexpr auto kNumItems = 50;
for (size_t i = 0; i < kNumItems; ++i) {
Expand All @@ -610,6 +590,41 @@ TEST_F(DashTest, TraverseSegmentOrder) {
EXPECT_EQ(kNumItems - 1, nums.back());
}

TEST_F(DashTest, TraverseBucketOrder) {
constexpr auto kNumItems = 18000;
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, i);
}
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Erase(i);
}
constexpr auto kSparseItems = kNumItems / 50;
for (size_t i = 0; i < kSparseItems; ++i) { // create sparse table
dt_.Insert(i, i);
}

vector<unsigned> nums;
auto tr_cb = [&](Dash64::bucket_iterator it) {
VLOG(1) << "call cb";
while (!it.is_done()) {
nums.push_back(it->first);
VLOG(1) << it.bucket_id() << " " << it.slot_id() << " " << it->first;
++it;
}
};

Dash64::Cursor cursor;
do {
cursor = dt_.TraverseBuckets(cursor, tr_cb);
} while (cursor);

sort(nums.begin(), nums.end());
nums.resize(unique(nums.begin(), nums.end()) - nums.begin());
ASSERT_EQ(kSparseItems, nums.size());
EXPECT_EQ(0, nums[0]);
EXPECT_EQ(kSparseItems - 1, nums.back());
}

struct TestEvictionPolicy {
static constexpr bool can_evict = true;
static constexpr bool can_gc = false;
Expand Down
7 changes: 3 additions & 4 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
return;

PrimeTable::Cursor next =
db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
cursor = next;
PushSerialized(false);

Expand Down Expand Up @@ -242,14 +242,13 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
}
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
++stats_.savecb_calls;

auto check = [&](auto v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v;
++stats_.skipped;
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SliceSnapshot {
void SwitchIncrementalFb(Context* cntx, LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeIterator it);
bool BucketSaveCb(PrimeTable::bucket_iterator it);

// Serialize single bucket.
// Returns number of serialized entries, updates bucket version to snapshot version.
Expand Down
Loading