diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 8d1e98d35906..7fd18a0b1c02 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -897,7 +897,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { ClearPipelinedMessages(); DCHECK(dispatch_q_.empty()); - service_->OnClose(cc_.get()); + service_->OnConnectionClose(cc_.get()); DecreaseStatsOnClose(); // We wait for dispatch_fb to finish writing the previous replies before replying to the last diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index 901b06af573b..f2968470c251 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -39,7 +39,7 @@ class ServiceInterface { virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) { } - virtual void OnClose(ConnectionContext* cntx) { + virtual void OnConnectionClose(ConnectionContext* cntx) { } struct ContextInfo { diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index d1204095e6c7..43988b40b5aa 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -52,7 +52,7 @@ void BlockingControllerTest::SetUp() { }); shard_set = new EngineShardSet(pp_.get()); - shard_set->Init(kNumThreads, false); + shard_set->Init(kNumThreads, nullptr); trans_.reset(new Transaction{&cid_}); diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 8848dae82f46..aea658012f8c 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -116,7 +116,7 @@ OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) return OpStatus::WRONG_TYPE; // Order result by their keys position in the command arguments, push errors to back - auto comp = [trans](const OpResult& lhs, const OpResult& rhs) { + auto comp = [](const OpResult& lhs, const OpResult& rhs) { if (!lhs || !rhs) return lhs.ok(); size_t i1 = std::get<1>(*lhs); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 8bdf23da548a..2f66b9a2cc1b 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -928,6 +928,7 @@ void DebugCmd::Stacktrace() { std::unique_lock lk(m); fb2::detail::FiberInterface::PrintAllFiberStackTraces(); }); + base::FlushLogs(); cntx_->SendOk(); } diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 36b667b726b3..015b9814be50 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -186,8 +186,6 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) { listeners.push_back(listener.release()); } - Service::InitOpts opts; - opts.disable_time_update = false; const auto& bind = GetFlag(FLAGS_bind); const char* bind_addr = bind.empty() ? nullptr : bind.c_str(); @@ -292,7 +290,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) { listeners.push_back(listener.release()); } - service.Init(acceptor, listeners, opts); + service.Init(acceptor, listeners); VersionMonitor version_monitor; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index f937b54e7017..968901b25294 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -660,7 +660,7 @@ std::vector DflyCmd::GetReplicasRoleInfo() const { lock_guard lk(mu_); vec.reserve(replica_infos_.size()); - auto replication_lags = ReplicationLags(); + auto replication_lags = ReplicationLagsLocked(); for (const auto& [id, info] : replica_infos_) { LSN lag = replication_lags[id]; @@ -712,14 +712,13 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const { pair> DflyCmd::GetReplicaInfoOrReply( std::string_view id_str, RedisReplyBuilder* rb) { - unique_lock lk(mu_); - uint32_t sync_id; if (!ToSyncId(id_str, &sync_id)) { rb->SendError(kInvalidSyncId); return {0, nullptr}; } + lock_guard lk(mu_); auto sync_it = replica_infos_.find(sync_id); if (sync_it == replica_infos_.end()) { rb->SendError(kIdNotFound); @@ -729,7 +728,7 @@ pair> DflyCmd::GetReplicaInfoOrReply( return {sync_id, sync_it->second}; } -std::map DflyCmd::ReplicationLags() const { +std::map DflyCmd::ReplicationLagsLocked() const { DCHECK(!mu_.try_lock()); // expects to be under global lock if (replica_infos_.empty()) return {}; @@ -785,9 +784,6 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& repl_info, SyncState e return true; } -void DflyCmd::BreakOnShutdown() { -} - void DflyCmd::Shutdown() { ReplicaInfoMap pending; { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 62f3f7bbf3a2..f7ecfa90b001 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -132,8 +132,6 @@ class DflyCmd { void OnClose(ConnectionContext* cntx); - void BreakOnShutdown(); - // Stop all background processes so we can exit in orderly manner. void Shutdown(); @@ -214,17 +212,17 @@ class DflyCmd { bool CheckReplicaStateOrReply(const ReplicaInfo& ri, SyncState expected, facade::RedisReplyBuilder* rb); + private: // Return a map between replication ID to lag. lag is defined as the maximum of difference // between the master's LSN and the last acknowledged LSN in over all shards. - std::map ReplicationLags() const; + std::map ReplicationLagsLocked() const; - private: ServerFamily* sf_; // Not owned uint32_t next_sync_id_ = 1; using ReplicaInfoMap = absl::btree_map>; - ReplicaInfoMap replica_infos_; + ReplicaInfoMap replica_infos_ ABSL_GUARDED_BY(mu_); mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels. }; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 550777f5efe7..2e229ee1f405 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -402,7 +402,6 @@ TEST_F(DflyEngineTest, FlushAll) { } TEST_F(DflyEngineTest, OOM) { - shard_set->TEST_EnableHeartBeat(); max_memory_limit = 300000; size_t i = 0; RespExpr resp; @@ -444,7 +443,6 @@ TEST_F(DflyEngineTest, OOM) { /// and then written with the same key. TEST_F(DflyEngineTest, Bug207) { max_memory_limit = 300000; - shard_set->TEST_EnableHeartBeat(); shard_set->TEST_EnableCacheMode(); absl::FlagSaver fs; absl::SetFlag(&FLAGS_oom_deny_ratio, 4); @@ -474,7 +472,6 @@ TEST_F(DflyEngineTest, Bug207) { } TEST_F(DflyEngineTest, StickyEviction) { - shard_set->TEST_EnableHeartBeat(); shard_set->TEST_EnableCacheMode(); absl::FlagSaver fs; absl::SetFlag(&FLAGS_oom_deny_ratio, 4); @@ -583,11 +580,7 @@ TEST_F(DflyEngineTest, Bug468) { } TEST_F(DflyEngineTest, Bug496) { - shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { - EngineShard* shard = EngineShard::tlocal(); - if (shard == nullptr) - return; - + shard_set->RunBlockingInParallel([](EngineShard* shard) { auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); int cb_hits = 0; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index e06afa0375ab..09d19d0a9105 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -406,14 +406,15 @@ void EngineShard::StopPeriodicFiber() { } } -void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) { +void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function global_handler) { uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); if (clock_cycle_ms == 0) clock_cycle_ms = 1; - fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms] { + fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, + handler = std::move(global_handler)] { ThisFiber::SetName(absl::StrCat("shard_periodic", index)); - RunPeriodic(std::chrono::milliseconds(period_ms)); + RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); }); } @@ -671,7 +672,8 @@ void EngineShard::Heartbeat() { } } -void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { +void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms, + std::function global_handler) { VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms"; bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic. @@ -716,6 +718,10 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { rss_mem_peak.store(total_rss, memory_order_relaxed); } } + + if (global_handler) { + global_handler(); + } } } } @@ -762,12 +768,6 @@ size_t EngineShard::UsedMemory() const { search_indices()->GetUsedMemory(); } -void EngineShard::TEST_EnableHeartbeat() { - fiber_periodic_ = fb2::Fiber("shard_periodic_TEST", [this, period_ms = 1] { - RunPeriodic(std::chrono::milliseconds(period_ms)); - }); -} - bool EngineShard::ShouldThrottleForTiering() const { // see header for formula justification if (!tiered_storage_) return false; @@ -902,7 +902,7 @@ size_t GetTieredFileLimit(size_t threads) { return max_shard_file_size; } -void EngineShardSet::Init(uint32_t sz, bool update_db_time) { +void EngineShardSet::Init(uint32_t sz, std::function global_handler) { CHECK_EQ(0u, size()); shard_queue_.resize(sz); @@ -920,10 +920,8 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) { auto* shard = EngineShard::tlocal(); shard->InitTieredStorage(pb, max_shard_file_size); - if (update_db_time) { - // Must be last, as it accesses objects initialized above. - shard->StartPeriodicFiber(pb); - } + // Must be last, as it accesses objects initialized above. + shard->StartPeriodicFiber(pb, global_handler); } }); } @@ -949,10 +947,6 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb) { shard_queue_[es->shard_id()] = es->GetFiberQueue(); } -void EngineShardSet::TEST_EnableHeartBeat() { - RunBriefInParallel([](EngineShard* shard) { shard->TEST_EnableHeartbeat(); }); -} - void EngineShardSet::TEST_EnableCacheMode() { RunBlockingInParallel([](EngineShard* shard) { namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode(); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index fbb9652a4edd..66e03b4503a6 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -150,8 +150,6 @@ class EngineShard { return continuation_trans_; } - void TEST_EnableHeartbeat(); - void StopPeriodicFiber(); struct TxQueueInfo { @@ -205,10 +203,10 @@ class EngineShard { // blocks the calling fiber. void Shutdown(); // called before destructing EngineShard. - void StartPeriodicFiber(util::ProactorBase* pb); + void StartPeriodicFiber(util::ProactorBase* pb, std::function global_handler); void Heartbeat(); - void RunPeriodic(std::chrono::milliseconds period_ms); + void RunPeriodic(std::chrono::milliseconds period_ms, std::function global_handler); void CacheStats(); @@ -288,7 +286,7 @@ class EngineShardSet { return pp_; } - void Init(uint32_t size, bool update_db_time); + void Init(uint32_t size, std::function global_handler); // Shutdown sequence: // - EngineShardSet.PreShutDown() @@ -342,7 +340,6 @@ class EngineShardSet { } // Used in tests - void TEST_EnableHeartBeat(); void TEST_EnableCacheMode(); private: diff --git a/src/server/main_service.cc b/src/server/main_service.cc index f76dbbca2d9e..72e59fa5c977 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -842,8 +842,7 @@ Service::~Service() { shard_set = nullptr; } -void Service::Init(util::AcceptServer* acceptor, std::vector listeners, - const InitOpts& opts) { +void Service::Init(util::AcceptServer* acceptor, std::vector listeners) { InitRedisTables(); config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) { @@ -881,7 +880,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector ServerState::Init(index, shard_num, &user_registry_); }); - shard_set->Init(shard_num, !opts.disable_time_update); + shard_set->Init(shard_num, nullptr); const auto tcp_disabled = GetFlag(FLAGS_port) == 0u; // We assume that listeners.front() is the main_listener // see dfly_main RunEngine @@ -1600,10 +1599,9 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, // a bit of a hack. I set up breaker callback here for the owner. // Should work though it's confusing to have it here. - owner->RegisterBreakHook([res, this](uint32_t) { + owner->RegisterBreakHook([res](uint32_t) { if (res->transaction) res->transaction->CancelBlocking(nullptr); - this->server_family().BreakOnShutdown(); }); return res; @@ -2529,7 +2527,7 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil } } -void Service::OnClose(facade::ConnectionContext* cntx) { +void Service::OnConnectionClose(facade::ConnectionContext* cntx) { ConnectionContext* server_cntx = static_cast(cntx); ConnectionState& conn_state = server_cntx->conn_state; diff --git a/src/server/main_service.h b/src/server/main_service.h index 21ebbbda3351..0a8219cb8274 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -29,18 +29,10 @@ using facade::MemcacheParser; class Service : public facade::ServiceInterface { public: - struct InitOpts { - bool disable_time_update; - - InitOpts() : disable_time_update{false} { - } - }; - explicit Service(util::ProactorPool* pp); ~Service(); - void Init(util::AcceptServer* acceptor, std::vector listeners, - const InitOpts& opts = InitOpts{}); + void Init(util::AcceptServer* acceptor, std::vector listeners); void Shutdown(); @@ -93,7 +85,7 @@ class Service : public facade::ServiceInterface { GlobalState GetGlobalState() const; void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final; - void OnClose(facade::ConnectionContext* cntx) final; + void OnConnectionClose(facade::ConnectionContext* cntx) final; Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index c695359f20e7..34741b3883a2 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -835,7 +835,7 @@ error_code SerializerBase::WriteRaw(const io::Bytes& buf) { return error_code{}; } -error_code SerializerBase::FlushToSink(io::Sink* s, SerializerBase::FlushState flush_state) { +error_code SerializerBase::FlushToSink(io::Sink* sink, SerializerBase::FlushState flush_state) { auto bytes = PrepareFlush(flush_state); if (bytes.empty()) return error_code{}; @@ -843,7 +843,7 @@ error_code SerializerBase::FlushToSink(io::Sink* s, SerializerBase::FlushState f DVLOG(2) << "FlushToSink " << bytes.size() << " bytes"; // interrupt point. - RETURN_ON_ERR(s->Write(bytes)); + RETURN_ON_ERR(sink->Write(bytes)); mem_buf_.ConsumeInput(bytes.size()); return error_code{}; @@ -1121,7 +1121,9 @@ class RdbSaver::Impl { RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; - error_code Flush() { + error_code FlushSerializer(); + + error_code FlushSink() { return aligned_buf_ ? aligned_buf_->Flush() : error_code{}; } @@ -1133,10 +1135,6 @@ class RdbSaver::Impl { return &meta_serializer_; } - io::Sink* sink() { - return sink_; - } - private: unique_ptr& GetSnapshot(EngineShard* shard); @@ -1252,6 +1250,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { std::optional record; RecordsPopper records_popper(push_to_sink_with_order_, &channel_); + auto& stats = ServerState::tlocal()->stats; // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. @@ -1266,10 +1265,10 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { DVLOG(2) << "Pulled " << record->id; auto before = absl::GetCurrentTimeNanos(); io_error = sink_->Write(io::Buffer(record->value)); - auto& stats = ServerState::tlocal()->stats; stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - before) / 1'000; stats.rdb_save_count++; if (io_error) { + VLOG(1) << "Error writing to sink " << io_error.message(); break; } } while ((record = records_popper.TryPop())); @@ -1369,6 +1368,10 @@ RdbSaver::SnapshotStats RdbSaver::Impl::GetCurrentSnapshotProgress() const { }); } +error_code RdbSaver::Impl::FlushSerializer() { + return serializer()->FlushToSink(sink_, SerializerBase::FlushState::kFlushMidEntry); +} + RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) { StringVec script_bodies, search_indices; @@ -1471,8 +1474,7 @@ error_code RdbSaver::SaveHeader(const GlobalData& glob_state) { } error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) { - RETURN_ON_ERR( - impl_->serializer()->FlushToSink(impl_->sink(), SerializerBase::FlushState::kFlushMidEntry)); + RETURN_ON_ERR(impl_->FlushSerializer()); if (save_mode_ == SaveMode::SUMMARY) { impl_->serializer()->SendFullSyncCut(); @@ -1547,9 +1549,9 @@ error_code RdbSaver::SaveEpilog() { absl::little_endian::Store64(buf, chksum); RETURN_ON_ERR(ser.WriteRaw(buf)); - RETURN_ON_ERR(ser.FlushToSink(impl_->sink(), SerializerBase::FlushState::kFlushMidEntry)); + RETURN_ON_ERR(impl_->FlushSerializer()); - return impl_->Flush(); + return impl_->FlushSink(); } error_code RdbSaver::SaveAuxFieldStrInt(string_view key, int64_t val) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index df5547c6dd1e..25b49dfced33 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -86,7 +86,7 @@ class RdbSaver { ~RdbSaver(); // Initiates the serialization in the shard's thread. - // TODO: to implement break functionality to allow stopping early. + // cll allows breaking in the middle. void StartSnapshotInShard(bool stream_journal, const Cancellation* cll, EngineShard* shard); // Send only the incremental snapshot since start_lsn. diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 8a828c96774f..b56a6162d9f9 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -45,6 +45,7 @@ class RdbTest : public BaseFamilyTest { void RdbTest::SetUp() { InitWithDbFilename(); + max_memory_limit = 40000000; } inline const uint8_t* to_byte(const void* s) { diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 7d86bf84f7d0..ff8067127184 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -628,8 +628,6 @@ TEST_F(SearchFamilyTest, SimpleExpiry) { EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:2", "d:3")); - shard_set->TEST_EnableHeartBeat(); - AdvanceTime(60); ThisFiber::SleepFor(5ms); // Give heartbeat time to delete expired doc EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:3")); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b8232f44fe99..97d17ec8746d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -683,7 +683,7 @@ std::optional Pause(std::vector listeners, Namesp // command that did not pause on the new state yet we will pause after waking up. DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */, true /*ignore blocking*/}; - shard_set->pool()->AwaitBrief([&tracker, pause_state, ns](unsigned, util::ProactorBase*) { + shard_set->pool()->AwaitBrief([&tracker, pause_state](unsigned, util::ProactorBase*) { // Commands don't suspend before checking the pause state, so // it's impossible to deadlock on waiting for a command that will be paused. tracker.TrackOnThread(); @@ -1562,10 +1562,6 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { return cntx->SendLong(num_keys.load(memory_order_relaxed)); } -void ServerFamily::BreakOnShutdown() { - dfly_cmd_->BreakOnShutdown(); -} - void ServerFamily::CancelBlockingOnThread(std::function status_cb) { auto cb = [status_cb](unsigned thread_index, util::Connection* conn) { if (auto fcntx = static_cast(conn)->cntx(); fcntx) { diff --git a/src/server/server_family.h b/src/server/server_family.h index f32ad8737e6c..abc3a971bc26 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -227,8 +227,6 @@ class ServerFamily { void OnClose(ConnectionContext* cntx); - void BreakOnShutdown(); - void CancelBlockingOnThread(std::function = {}); // Sets the server to replicate another instance. Does not flush the database beforehand! diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 40036487bf43..8659ba409352 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -167,10 +167,10 @@ void BaseFamilyTest::SetUpTestSuite() { SetTestFlag(flag, value); } } - max_memory_limit = INT_MAX; } void BaseFamilyTest::SetUp() { + max_memory_limit = INT_MAX; ResetService(); } @@ -207,9 +207,7 @@ void BaseFamilyTest::ResetService() { pp_->Run(); service_ = std::make_unique(pp_.get()); - Service::InitOpts opts; - opts.disable_time_update = true; - service_->Init(nullptr, {}, opts); + service_->Init(nullptr, {}); used_mem_current = 0; TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index d1dae3c081de..1ebb99ad8529 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -192,7 +192,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str for (auto [dbid, hash, item_segment] : ts_->bins_->DeleteBin(segment, page)) { // Search for key with the same hash and value pointing to the same segment. // If it still exists, it must correspond to the value stored in this bin - auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) { + auto predicate = [item_segment = item_segment](const PrimeKey& key, const PrimeValue& probe) { return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment; }; auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate); diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 896ab8587de3..7b7a9ca0e092 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -209,7 +209,6 @@ TEST_F(TieredStorageTest, BackgroundOffloading) { const int kNum = 500; max_memory_limit = kNum * 4096; - pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); }); // Stash all values string value = BuildString(3000); @@ -302,10 +301,7 @@ TEST_F(TieredStorageTest, FlushPending) { TEST_F(TieredStorageTest, MemoryPressure) { max_memory_limit = 20_MB; - pp_->at(0)->AwaitBrief([] { - EngineShard::tlocal()->TEST_EnableHeartbeat(); - EngineShard::tlocal()->tiered_storage()->SetMemoryLowLimit(2_MB); - }); + pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->tiered_storage()->SetMemoryLowLimit(2_MB); }); constexpr size_t kNum = 10000; for (size_t i = 0; i < kNum; i++) {