Skip to content

Commit

Permalink
chore: tiered fixes (#3393)
Browse files Browse the repository at this point in the history
1. Use introsive::list for CoolQueue.
2. Make sure that we ignore cool memory usage when computing average object size to
   prevent evictions during dashtable growth attempts.
3. Remove items from the cool storage before evicting them from the dash table.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 25, 2024
1 parent 2867d54 commit 0a26a06
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 194 deletions.
2 changes: 1 addition & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ add_subdirectory(json)

set(SEARCH_LIB query_parser)

add_library(dfly_core bloom.cc compact_object.cc cool_queue.cc dragonfly_core.cc extent_tree.cc
add_library(dfly_core bloom.cc compact_object.cc dragonfly_core.cc extent_tree.cc
interpreter.cc mi_memory_resource.cc sds_utils.cc
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc
tx_queue.cc dense_set.cc allocation_tracker.cc
Expand Down
6 changes: 3 additions & 3 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <absl/base/internal/endian.h>

#include <boost/intrusive/list_hook.hpp>
#include <optional>
#include <type_traits>

Expand Down Expand Up @@ -537,9 +538,8 @@ class CompactObjectView {

namespace detail {

struct TieredColdRecord {
TieredColdRecord* next = nullptr;
TieredColdRecord* prev = nullptr;
struct TieredColdRecord : public ::boost::intrusive::list_base_hook<
boost::intrusive::link_mode<boost::intrusive::normal_link>> {
uint64_t key_hash; // Allows searching the entry in the dbslice.
CompactObj value;
uint16_t db_index;
Expand Down
75 changes: 0 additions & 75 deletions src/core/cool_queue.cc

This file was deleted.

40 changes: 0 additions & 40 deletions src/core/cool_queue.h

This file was deleted.

12 changes: 0 additions & 12 deletions src/core/dfly_core_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//

#include "base/gtest.h"
#include "core/cool_queue.h"
#include "core/intent_lock.h"
#include "core/tx_queue.h"

Expand Down Expand Up @@ -66,15 +65,4 @@ TEST_F(IntentLockTest, Basic) {
ASSERT_TRUE(lk_.Check(IntentLock::EXCLUSIVE));
}

TEST_F(TxQueueTest, CoolQueue) {
CoolQueue queue;

ASSERT_TRUE(queue.Empty());
auto* record = queue.PushFront(0, 1, 2, CompactObj{});
EXPECT_EQ(record->key_hash, 1);
ASSERT_FALSE(queue.Empty());
queue.PopBack();
ASSERT_TRUE(queue.Empty());
}

} // namespace dfly
29 changes: 21 additions & 8 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}

SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 112, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 120, "You should update this function with new fields");

ADD(evicted_keys);
ADD(hard_evictions);
Expand All @@ -270,6 +270,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
ADD(insertion_rejections);
ADD(update);
ADD(ram_hits);
ADD(ram_cool_hits);
ADD(ram_misses);

return *this;
Expand Down Expand Up @@ -518,7 +519,10 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
db.slots_stats[cluster::KeySlot(key)].total_reads++;
}
if (res.it->second.IsExternal()) {
events_.ram_misses++;
if (res.it->second.IsCool())
events_.ram_cool_hits++;
else
events_.ram_misses++;
} else {
events_.ram_hits++;
}
Expand Down Expand Up @@ -612,7 +616,9 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
// Instead, we create a positive, converging force that should help with freeing enough memory.
// Free at least 256 bytes or 3% of the total debt.
size_t evict_goal = std::max<size_t>(256, (-evp.mem_budget()) / 32);
evicted_obj_bytes = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal);
auto [items, bytes] = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal);
evicted_obj_bytes = bytes;
events_.hard_evictions += items;
}

table_memory_ += (db.table_memory() - table_before);
Expand Down Expand Up @@ -1213,11 +1219,19 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
db_arr_[db_ind]->prime.GetSegmentCount();
}

size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id,
size_t increase_goal_bytes) {
pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id,
size_t increase_goal_bytes) {
DCHECK(!owner_->IsReplica());
if ((!caching_mode_) || !expire_allowed_)
return 0;
return {0, 0};

size_t evicted_items = 0, evicted_bytes = 0;

if (owner_->tiered_storage()) {
evicted_bytes = owner_->tiered_storage()->ReclaimMemory(increase_goal_bytes);
if (evicted_bytes >= increase_goal_bytes)
return {0, evicted_bytes};
}

auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat);
auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider);
Expand All @@ -1227,7 +1241,6 @@ size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_
constexpr int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets;
constexpr int32_t num_slots = PrimeTable::Segment_t::kSlotNum;

size_t evicted_items = 0, evicted_bytes = 0;
string tmp;

bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_;
Expand Down Expand Up @@ -1291,7 +1304,7 @@ size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/"
<< max_eviction_per_hb;
DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000;
return evicted_bytes;
return {evicted_items, evicted_bytes};
}

void DbSlice::CreateDb(DbIndex db_ind) {
Expand Down
7 changes: 4 additions & 3 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct SliceEvents {

// ram hit/miss when tiering is enabled
size_t ram_hits = 0;
size_t ram_cool_hits = 0;
size_t ram_misses = 0;

// how many insertions were rejected due to OOM.
Expand Down Expand Up @@ -444,9 +445,9 @@ class DbSlice {

// Evicts items with dynamically allocated data from the primary table.
// Does not shrink tables.
// Returnes number of bytes freed due to evictions.
size_t FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
size_t increase_goal_bytes);
// Returnes number of (elements,bytes) freed due to evictions.
std::pair<uint64_t, size_t> FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
size_t increase_goal_bytes);
void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes);

int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;
Expand Down
3 changes: 3 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ void EngineShard::CacheStats() {
}
}
DCHECK_EQ(table_memory, db_slice.table_memory());
if (tiered_storage_) {
table_memory += tiered_storage_->CoolMemoryUsage();
}
size_t obj_memory = table_memory <= used_mem ? used_mem - table_memory : 0;

size_t bytes_per_obj = entries > 0 ? obj_memory / entries : 0;
Expand Down
6 changes: 3 additions & 3 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2161,9 +2161,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("reply_count", reply_stats.send_stats.count);
append("reply_latency_usec", reply_stats.send_stats.total_duration);
append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter);
append("ram_hits", m.events.ram_hits);
append("ram_misses", m.events.ram_misses);

append("lua_interpreter_cnt", m.lua_stats.interpreter_cnt);
append("lua_blocked", m.lua_stats.blocked_cnt);
}
Expand Down Expand Up @@ -2191,6 +2188,9 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
append("tiered_cold_storage_bytes", m.tiered_stats.cold_storage_bytes);
append("tiered_ram_hits", m.events.ram_hits);
append("tiered_ram_cool_hits", m.events.ram_cool_hits);
append("tiered_ram_misses", m.events.ram_misses);
}

if (should_enter("PERSISTENCE", true)) {
Expand Down
Loading

0 comments on commit 0a26a06

Please sign in to comment.