From 0a26a06065baee8e3cba7763c042efd472d0ddb2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 25 Jul 2024 23:38:44 +0300 Subject: [PATCH] chore: tiered fixes (#3393) 1. Use introsive::list for CoolQueue. 2. Make sure that we ignore cool memory usage when computing average object size to prevent evictions during dashtable growth attempts. 3. Remove items from the cool storage before evicting them from the dash table. Signed-off-by: Roman Gershman --- src/core/CMakeLists.txt | 2 +- src/core/compact_object.h | 6 +- src/core/cool_queue.cc | 75 -------------------- src/core/cool_queue.h | 40 ----------- src/core/dfly_core_test.cc | 12 ---- src/server/db_slice.cc | 29 +++++--- src/server/db_slice.h | 7 +- src/server/engine_shard_set.cc | 3 + src/server/server_family.cc | 6 +- src/server/tiered_storage.cc | 126 +++++++++++++++++++++------------ src/server/tiered_storage.h | 19 ++++- 11 files changed, 131 insertions(+), 194 deletions(-) delete mode 100644 src/core/cool_queue.cc delete mode 100644 src/core/cool_queue.h diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 4dd2d1bc895f..684b2a4cc5c8 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -3,7 +3,7 @@ add_subdirectory(json) set(SEARCH_LIB query_parser) -add_library(dfly_core bloom.cc compact_object.cc cool_queue.cc dragonfly_core.cc extent_tree.cc +add_library(dfly_core bloom.cc compact_object.cc dragonfly_core.cc extent_tree.cc interpreter.cc mi_memory_resource.cc sds_utils.cc segment_allocator.cc score_map.cc small_string.cc sorted_map.cc tx_queue.cc dense_set.cc allocation_tracker.cc diff --git a/src/core/compact_object.h b/src/core/compact_object.h index e48c8f8699ea..a59f7a7362c4 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -6,6 +6,7 @@ #include +#include #include #include @@ -537,9 +538,8 @@ class CompactObjectView { namespace detail { -struct TieredColdRecord { - TieredColdRecord* next = nullptr; - TieredColdRecord* prev = nullptr; +struct TieredColdRecord : public ::boost::intrusive::list_base_hook< + boost::intrusive::link_mode> { uint64_t key_hash; // Allows searching the entry in the dbslice. CompactObj value; uint16_t db_index; diff --git a/src/core/cool_queue.cc b/src/core/cool_queue.cc deleted file mode 100644 index eefb11ff2398..000000000000 --- a/src/core/cool_queue.cc +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2024, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#include "src/core/cool_queue.h" - -#include "base/logging.h" - -namespace dfly { - -CoolQueue::~CoolQueue() { - while (!Empty()) { - auto* record = PopBack(); - CompactObj::DeleteMR(record); - } -} - -detail::TieredColdRecord* CoolQueue::PushFront(uint16_t db_index, uint64_t key_hash, - uint32_t page_index, CompactObj obj) { - detail::TieredColdRecord* record = CompactObj::AllocateMR(); - record->key_hash = key_hash; - record->db_index = db_index; - record->page_index = page_index; - record->value = std::move(obj); - - record->next = head_; - if (head_) { - head_->prev = record; - } else { - DCHECK(tail_ == nullptr); - tail_ = record; - } - head_ = record; - used_memory_ += (sizeof(detail::TieredColdRecord) + record->value.MallocUsed()); - return record; -} - -detail::TieredColdRecord* CoolQueue::PopBack() { - auto* res = tail_; - if (tail_) { - auto* prev = tail_->prev; - tail_->prev = nullptr; - if (prev) { - prev->next = nullptr; - } else { - DCHECK(tail_ == head_); - head_ = nullptr; - } - tail_ = prev; - used_memory_ -= (sizeof(detail::TieredColdRecord) + res->value.MallocUsed()); - } - return res; -} - -CompactObj CoolQueue::Erase(detail::TieredColdRecord* record) { - DCHECK(record); - - if (record == tail_) { - PopBack(); - } else { - used_memory_ -= (sizeof(detail::TieredColdRecord) + record->value.MallocUsed()); - record->next->prev = record->prev; - if (record->prev) { - record->prev->next = record->next; - } else { - DCHECK(record == head_); - head_ = record->next; - } - } - - CompactObj res = std::move(record->value); - CompactObj::DeleteMR(record); - return res; -} -} // namespace dfly diff --git a/src/core/cool_queue.h b/src/core/cool_queue.h deleted file mode 100644 index 611736f5a449..000000000000 --- a/src/core/cool_queue.h +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2024, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#pragma once - -#include "core/compact_object.h" - -namespace dfly { - -// Used to store "cold" items before erasing them from RAM. -// -class CoolQueue { - public: - ~CoolQueue(); - - bool Empty() const { - return head_ == nullptr; - } - - detail::TieredColdRecord* PushFront(uint16_t db_index, uint64_t key_hash, uint32_t page_index, - CompactObj obj); - - // The ownership is passed to the caller. The record must be deleted with - // CompactObj::DeleteMR. - detail::TieredColdRecord* PopBack(); - - CompactObj Erase(detail::TieredColdRecord* record); - - size_t UsedMemory() const { - return used_memory_; - } - - private: - detail::TieredColdRecord* head_ = nullptr; - detail::TieredColdRecord* tail_ = nullptr; - size_t used_memory_ = 0; -}; - -} // namespace dfly diff --git a/src/core/dfly_core_test.cc b/src/core/dfly_core_test.cc index aff8bfe5ece6..cfeee87efc8c 100644 --- a/src/core/dfly_core_test.cc +++ b/src/core/dfly_core_test.cc @@ -3,7 +3,6 @@ // #include "base/gtest.h" -#include "core/cool_queue.h" #include "core/intent_lock.h" #include "core/tx_queue.h" @@ -66,15 +65,4 @@ TEST_F(IntentLockTest, Basic) { ASSERT_TRUE(lk_.Check(IntentLock::EXCLUSIVE)); } -TEST_F(TxQueueTest, CoolQueue) { - CoolQueue queue; - - ASSERT_TRUE(queue.Empty()); - auto* record = queue.PushFront(0, 1, 2, CompactObj{}); - EXPECT_EQ(record->key_hash, 1); - ASSERT_FALSE(queue.Empty()); - queue.PopBack(); - ASSERT_TRUE(queue.Empty()); -} - } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 9cdaf49b7421..e76435a91ff0 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -255,7 +255,7 @@ DbStats& DbStats::operator+=(const DbStats& o) { } SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { - static_assert(sizeof(SliceEvents) == 112, "You should update this function with new fields"); + static_assert(sizeof(SliceEvents) == 120, "You should update this function with new fields"); ADD(evicted_keys); ADD(hard_evictions); @@ -270,6 +270,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { ADD(insertion_rejections); ADD(update); ADD(ram_hits); + ADD(ram_cool_hits); ADD(ram_misses); return *this; @@ -518,7 +519,10 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: db.slots_stats[cluster::KeySlot(key)].total_reads++; } if (res.it->second.IsExternal()) { - events_.ram_misses++; + if (res.it->second.IsCool()) + events_.ram_cool_hits++; + else + events_.ram_misses++; } else { events_.ram_hits++; } @@ -612,7 +616,9 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt // Instead, we create a positive, converging force that should help with freeing enough memory. // Free at least 256 bytes or 3% of the total debt. size_t evict_goal = std::max(256, (-evp.mem_budget()) / 32); - evicted_obj_bytes = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal); + auto [items, bytes] = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal); + evicted_obj_bytes = bytes; + events_.hard_evictions += items; } table_memory_ += (db.table_memory() - table_before); @@ -1213,11 +1219,19 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c db_arr_[db_ind]->prime.GetSegmentCount(); } -size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id, - size_t increase_goal_bytes) { +pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id, + size_t increase_goal_bytes) { DCHECK(!owner_->IsReplica()); if ((!caching_mode_) || !expire_allowed_) - return 0; + return {0, 0}; + + size_t evicted_items = 0, evicted_bytes = 0; + + if (owner_->tiered_storage()) { + evicted_bytes = owner_->tiered_storage()->ReclaimMemory(increase_goal_bytes); + if (evicted_bytes >= increase_goal_bytes) + return {0, evicted_bytes}; + } auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat); auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider); @@ -1227,7 +1241,6 @@ size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_ constexpr int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets; constexpr int32_t num_slots = PrimeTable::Segment_t::kSlotNum; - size_t evicted_items = 0, evicted_bytes = 0; string tmp; bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_; @@ -1291,7 +1304,7 @@ size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_ DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/" << max_eviction_per_hb; DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000; - return evicted_bytes; + return {evicted_items, evicted_bytes}; } void DbSlice::CreateDb(DbIndex db_ind) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 7a986a97e3d5..9a0e11444b13 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -57,6 +57,7 @@ struct SliceEvents { // ram hit/miss when tiering is enabled size_t ram_hits = 0; + size_t ram_cool_hits = 0; size_t ram_misses = 0; // how many insertions were rejected due to OOM. @@ -444,9 +445,9 @@ class DbSlice { // Evicts items with dynamically allocated data from the primary table. // Does not shrink tables. - // Returnes number of bytes freed due to evictions. - size_t FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id, - size_t increase_goal_bytes); + // Returnes number of (elements,bytes) freed due to evictions. + std::pair FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id, + size_t increase_goal_bytes); void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes); int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index fe4649d94096..32e67e550059 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -738,6 +738,9 @@ void EngineShard::CacheStats() { } } DCHECK_EQ(table_memory, db_slice.table_memory()); + if (tiered_storage_) { + table_memory += tiered_storage_->CoolMemoryUsage(); + } size_t obj_memory = table_memory <= used_mem ? used_mem - table_memory : 0; size_t bytes_per_obj = entries > 0 ? obj_memory / entries : 0; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 645cb6f602c6..cef8211c1367 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2161,9 +2161,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("reply_count", reply_stats.send_stats.count); append("reply_latency_usec", reply_stats.send_stats.total_duration); append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter); - append("ram_hits", m.events.ram_hits); - append("ram_misses", m.events.ram_misses); - append("lua_interpreter_cnt", m.lua_stats.interpreter_cnt); append("lua_blocked", m.lua_stats.blocked_cnt); } @@ -2191,6 +2188,9 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt); append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes); append("tiered_cold_storage_bytes", m.tiered_stats.cold_storage_bytes); + append("tiered_ram_hits", m.events.ram_hits); + append("tiered_ram_cool_hits", m.events.ram_cool_hits); + append("tiered_ram_misses", m.events.ram_misses); } if (should_enter("PERSISTENCE", true)) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index e754b8b346d9..d920f126b1cb 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -164,12 +164,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { if (absl::GetFlag(FLAGS_tiered_experimental_cooling)) { RetireColdEntries(pv->MallocUsed()); - detail::TieredColdRecord* record = ts_->cool_queue_.PushFront( - key.first, CompactObj::HashCode(key.second), segment.offset / 4096, std::move(*pv)); - DCHECK(record); - - pv->SetCool(segment.offset, segment.length, record); - DCHECK_EQ(pv->Size(), record->value.Size()); + ts_->CoolDown(key.first, key.second, segment, pv); } else { stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed()); pv->SetExternal(segment.offset, segment.length); @@ -221,8 +216,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str tiering::DiskSegment segment = FromCoolItem(item); // We remove it from both cool storage and the offline storage. - PrimeValue hot = ts_->cool_queue_.Erase(item.record); - pv = std::move(hot); + pv = ts_->DeleteCool(item.record); auto* stats = GetDbTableStats(dbid); stats->tiered_entries--; stats->tiered_used_bytes -= segment.length; @@ -303,34 +297,7 @@ void TieredStorage::ShardOpManager::RetireColdEntries(size_t additional_memory) const double kHighFactor = 1.0; size_t needed_to_free = (threshold - db_slice_.memory_budget()) + memory_low_limit_ * kHighFactor; - size_t gained = 0; - - do { - size_t memory_before = ts_->cool_queue_.UsedMemory(); - detail::TieredColdRecord* record = ts_->cool_queue_.PopBack(); - if (record == nullptr) // nothing to pull anymore - break; - - gained += memory_before - ts_->cool_queue_.UsedMemory(); - - // Find the entry that points to the cool item and externalize it. - auto predicate = [record](const PrimeKey& key, const PrimeValue& probe) { - return probe.IsExternal() && probe.IsCool() && probe.GetCool().record == record; - }; - - PrimeIterator it = - db_slice_.GetDBTable(record->db_index)->prime.FindFirst(record->key_hash, predicate); - CHECK(IsValid(it)); - PrimeValue& pv = it->second; - tiering::DiskSegment segment = FromCoolItem(pv.GetCool()); - - // Now the item is only in storage. - pv.SetExternal(segment.offset, segment.length); - - auto* stats = GetDbTableStats(record->db_index); - stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed()); - CompactObj::DeleteMR(record); - } while (gained < needed_to_free); + size_t gained = ts_->ReclaimMemory(needed_to_free); VLOG(1) << "Memory budget: " << db_slice_.memory_budget() << ", gained " << gained; @@ -359,8 +326,11 @@ TieredStorage::TieredStorage(size_t max_size, DbSlice* db_slice) TieredStorage::~TieredStorage() { } -error_code TieredStorage::Open(string_view path) { - return op_manager_->Open(absl::StrCat(path, ProactorBase::me()->GetPoolIndex())); +error_code TieredStorage::Open(string_view base_path) { + // dts - dragonfly tiered storage. + string path = absl::StrCat( + base_path, "-", absl::Dec(ProactorBase::me()->GetPoolIndex(), absl::kZeroPad4), ".dts"); + return op_manager_->Open(path); } void TieredStorage::Close() { @@ -492,18 +462,14 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { DCHECK(value->IsExternal()); ++stats_.total_deletes; - tiering::DiskSegment segment; + tiering::DiskSegment segment = value->GetExternalSlice(); if (value->IsCool()) { // Delete the cool item. PrimeValue::CoolItem item = value->GetCool(); - segment.length = item.serialized_size; - segment.offset = item.page_offset + item.record->page_index * 4096; - - PrimeValue hot = cool_queue_.Erase(item.record); + PrimeValue hot = DeleteCool(item.record); DCHECK_EQ(OBJ_STRING, hot.ObjType()); - } else { - segment = value->GetExternalSlice(); } + // In any case we delete the offloaded segment and reset the value. value->Reset(); stats_.total_deletes++; @@ -555,7 +521,7 @@ TieredStats TieredStorage::GetStats() const { { // Own stats stats.total_stash_overflows = stats_.stash_overflow_cnt; - stats.cold_storage_bytes = cool_queue_.UsedMemory(); + stats.cold_storage_bytes = cool_memory_used_; } return stats; } @@ -596,6 +562,38 @@ void TieredStorage::RunOffloading(DbIndex dbid) { } while (offloading_cursor_ != start_cursor && iterations++ < kMaxIterations); } +size_t TieredStorage::ReclaimMemory(size_t goal) { + size_t gained = 0; + do { + size_t memory_before = cool_memory_used_; + detail::TieredColdRecord* record = PopCool(); + if (record == nullptr) // nothing to pull anymore + break; + + gained += memory_before - cool_memory_used_; + + // Find the entry that points to the cool item and externalize it. + auto predicate = [record](const PrimeKey& key, const PrimeValue& probe) { + return probe.IsExternal() && probe.IsCool() && probe.GetCool().record == record; + }; + + PrimeIterator it = op_manager_->db_slice_.GetDBTable(record->db_index) + ->prime.FindFirst(record->key_hash, predicate); + CHECK(IsValid(it)); + PrimeValue& pv = it->second; + tiering::DiskSegment segment = FromCoolItem(pv.GetCool()); + + // Now the item is only in storage. + pv.SetExternal(segment.offset, segment.length); + + auto* stats = op_manager_->GetDbTableStats(record->db_index); + stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed()); + CompactObj::DeleteMR(record); + } while (gained < goal); + + return gained; +} + bool TieredStorage::ShouldStash(const PrimeValue& pv) const { auto disk_stats = op_manager_->GetStats().disk_stats; return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING && @@ -603,11 +601,25 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const { disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size; } +void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, + const tiering::DiskSegment& segment, PrimeValue* pv) { + cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); + detail::TieredColdRecord* record = CompactObj::AllocateMR(); + record->key_hash = CompactObj::HashCode(str); + record->db_index = db_ind; + record->page_index = segment.offset / 4096; + record->value = std::move(*pv); + DCHECK(record); + cool_queue_.push_front(*record); + pv->SetCool(segment.offset, segment.length, record); + DCHECK_EQ(pv->Size(), record->value.Size()); +} + PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { tiering::DiskSegment segment = FromCoolItem(item); // We remove it from both cool storage and the offline storage. - PrimeValue hot = cool_queue_.Erase(item.record); + PrimeValue hot = DeleteCool(item.record); op_manager_->DeleteOffloaded(dbid, segment); // Bring it back to the PrimeTable. @@ -617,4 +629,24 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { return hot; } +PrimeValue TieredStorage::DeleteCool(detail::TieredColdRecord* record) { + auto it = CoolQueue::s_iterator_to(*record); + cool_queue_.erase(it); + + PrimeValue hot = std::move(record->value); + cool_memory_used_ -= (sizeof(detail::TieredColdRecord) + hot.MallocUsed()); + CompactObj::DeleteMR(record); + return hot; +} + +detail::TieredColdRecord* TieredStorage::PopCool() { + if (cool_queue_.empty()) + return nullptr; + + detail::TieredColdRecord& res = cool_queue_.back(); + cool_queue_.pop_back(); + cool_memory_used_ -= (sizeof(detail::TieredColdRecord) + res.value.MallocUsed()); + return &res; +} + } // namespace dfly diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 9a453b14406b..8a33846fccff 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -3,6 +3,7 @@ // #pragma once +#include #include #include @@ -13,7 +14,6 @@ #include -#include "core/cool_queue.h" #include "server/common.h" #include "server/table.h" @@ -78,19 +78,34 @@ class TieredStorage { // Run offloading loop until i/o device is loaded or all entries were traversed void RunOffloading(DbIndex dbid); + size_t ReclaimMemory(size_t goal); + + size_t CoolMemoryUsage() const { + return cool_memory_used_; + } + private: // Returns if a value should be stashed bool ShouldStash(const PrimeValue& pv) const; + // Moves pv contents to the cool storage and updates pv to point to it. + void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment, + PrimeValue* pv); + // Returns the primary value, and deletes the cool item as well as its offloaded storage. PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item); + PrimeValue DeleteCool(detail::TieredColdRecord* record); + detail::TieredColdRecord* PopCool(); + PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off std::unique_ptr op_manager_; std::unique_ptr bins_; - CoolQueue cool_queue_; + typedef ::boost::intrusive::list CoolQueue; + CoolQueue cool_queue_; + size_t cool_memory_used_ = 0; unsigned write_depth_limit_ = 10; struct { uint64_t stash_overflow_cnt = 0;