Skip to content

Commit

Permalink
chore: retire TEST_EnableHeartBeat (#3435)
Browse files Browse the repository at this point in the history
Now unit tests will run the same Hearbeat fiber like in prod.
The whole feature was redundant, with just few explicit settings of maxmemory_limit
I succeeeded to make all unit tests pass.

In addition, this change allows passing a global handler that is called by heartbeat from a single thread.
This is not used yet - preparation for the next PR to break hung up replication connections on a master.

Finally, this change has some non-functional clean-ups and warning fixes to improve code quality.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 3, 2024
1 parent 82298b8 commit c9ed3f7
Show file tree
Hide file tree
Showing 22 changed files with 56 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
ClearPipelinedMessages();
DCHECK(dispatch_q_.empty());

service_->OnClose(cc_.get());
service_->OnConnectionClose(cc_.get());
DecreaseStatsOnClose();

// We wait for dispatch_fb to finish writing the previous replies before replying to the last
Expand Down
2 changes: 1 addition & 1 deletion src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ServiceInterface {
virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
}

virtual void OnClose(ConnectionContext* cntx) {
virtual void OnConnectionClose(ConnectionContext* cntx) {
}

struct ContextInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/server/blocking_controller_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void BlockingControllerTest::SetUp() {
});

shard_set = new EngineShardSet(pp_.get());
shard_set->Init(kNumThreads, false);
shard_set->Init(kNumThreads, nullptr);

trans_.reset(new Transaction{&cid_});

Expand Down
2 changes: 1 addition & 1 deletion src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
return OpStatus::WRONG_TYPE;

// Order result by their keys position in the command arguments, push errors to back
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
auto comp = [](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = std::get<1>(*lhs);
Expand Down
1 change: 1 addition & 0 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ void DebugCmd::Stacktrace() {
std::unique_lock lk(m);
fb2::detail::FiberInterface::PrintAllFiberStackTraces();
});
base::FlushLogs();
cntx_->SendOk();
}

Expand Down
4 changes: 1 addition & 3 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
listeners.push_back(listener.release());
}

Service::InitOpts opts;
opts.disable_time_update = false;
const auto& bind = GetFlag(FLAGS_bind);
const char* bind_addr = bind.empty() ? nullptr : bind.c_str();

Expand Down Expand Up @@ -292,7 +290,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
listeners.push_back(listener.release());
}

service.Init(acceptor, listeners, opts);
service.Init(acceptor, listeners);

VersionMonitor version_monitor;

Expand Down
10 changes: 3 additions & 7 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() const {
lock_guard lk(mu_);

vec.reserve(replica_infos_.size());
auto replication_lags = ReplicationLags();
auto replication_lags = ReplicationLagsLocked();

for (const auto& [id, info] : replica_infos_) {
LSN lag = replication_lags[id];
Expand Down Expand Up @@ -712,14 +712,13 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {

pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
std::string_view id_str, RedisReplyBuilder* rb) {
unique_lock lk(mu_);

uint32_t sync_id;
if (!ToSyncId(id_str, &sync_id)) {
rb->SendError(kInvalidSyncId);
return {0, nullptr};
}

lock_guard lk(mu_);
auto sync_it = replica_infos_.find(sync_id);
if (sync_it == replica_infos_.end()) {
rb->SendError(kIdNotFound);
Expand All @@ -729,7 +728,7 @@ pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
return {sync_id, sync_it->second};
}

std::map<uint32_t, LSN> DflyCmd::ReplicationLags() const {
std::map<uint32_t, LSN> DflyCmd::ReplicationLagsLocked() const {
DCHECK(!mu_.try_lock()); // expects to be under global lock
if (replica_infos_.empty())
return {};
Expand Down Expand Up @@ -785,9 +784,6 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& repl_info, SyncState e
return true;
}

void DflyCmd::BreakOnShutdown() {
}

void DflyCmd::Shutdown() {
ReplicaInfoMap pending;
{
Expand Down
8 changes: 3 additions & 5 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

void BreakOnShutdown();

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

Expand Down Expand Up @@ -214,17 +212,17 @@ class DflyCmd {
bool CheckReplicaStateOrReply(const ReplicaInfo& ri, SyncState expected,
facade::RedisReplyBuilder* rb);

private:
// Return a map between replication ID to lag. lag is defined as the maximum of difference
// between the master's LSN and the last acknowledged LSN in over all shards.
std::map<uint32_t, LSN> ReplicationLags() const;
std::map<uint32_t, LSN> ReplicationLagsLocked() const;

private:
ServerFamily* sf_; // Not owned

uint32_t next_sync_id_ = 1;

using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;
ReplicaInfoMap replica_infos_ ABSL_GUARDED_BY(mu_);

mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels.
};
Expand Down
9 changes: 1 addition & 8 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ TEST_F(DflyEngineTest, FlushAll) {
}

TEST_F(DflyEngineTest, OOM) {
shard_set->TEST_EnableHeartBeat();
max_memory_limit = 300000;
size_t i = 0;
RespExpr resp;
Expand Down Expand Up @@ -444,7 +443,6 @@ TEST_F(DflyEngineTest, OOM) {
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
Expand Down Expand Up @@ -474,7 +472,6 @@ TEST_F(DflyEngineTest, Bug207) {
}

TEST_F(DflyEngineTest, StickyEviction) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
Expand Down Expand Up @@ -583,11 +580,7 @@ TEST_F(DflyEngineTest, Bug468) {
}

TEST_F(DflyEngineTest, Bug496) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;

shard_set->RunBlockingInParallel([](EngineShard* shard) {
auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id());

int cb_hits = 0;
Expand Down
32 changes: 13 additions & 19 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,15 @@ void EngineShard::StopPeriodicFiber() {
}
}

void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;

fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms] {
fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(global_handler)] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms));
RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
}

Expand Down Expand Up @@ -671,7 +672,8 @@ void EngineShard::Heartbeat() {
}
}

void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> global_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";

bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
Expand Down Expand Up @@ -716,6 +718,10 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
rss_mem_peak.store(total_rss, memory_order_relaxed);
}
}

if (global_handler) {
global_handler();
}
}
}
}
Expand Down Expand Up @@ -762,12 +768,6 @@ size_t EngineShard::UsedMemory() const {
search_indices()->GetUsedMemory();
}

void EngineShard::TEST_EnableHeartbeat() {
fiber_periodic_ = fb2::Fiber("shard_periodic_TEST", [this, period_ms = 1] {
RunPeriodic(std::chrono::milliseconds(period_ms));
});
}

bool EngineShard::ShouldThrottleForTiering() const { // see header for formula justification
if (!tiered_storage_)
return false;
Expand Down Expand Up @@ -902,7 +902,7 @@ size_t GetTieredFileLimit(size_t threads) {
return max_shard_file_size;
}

void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
void EngineShardSet::Init(uint32_t sz, std::function<void()> global_handler) {
CHECK_EQ(0u, size());
shard_queue_.resize(sz);

Expand All @@ -920,10 +920,8 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
auto* shard = EngineShard::tlocal();
shard->InitTieredStorage(pb, max_shard_file_size);

if (update_db_time) {
// Must be last, as it accesses objects initialized above.
shard->StartPeriodicFiber(pb);
}
// Must be last, as it accesses objects initialized above.
shard->StartPeriodicFiber(pb, global_handler);
}
});
}
Expand All @@ -949,10 +947,6 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
shard_queue_[es->shard_id()] = es->GetFiberQueue();
}

void EngineShardSet::TEST_EnableHeartBeat() {
RunBriefInParallel([](EngineShard* shard) { shard->TEST_EnableHeartbeat(); });
}

void EngineShardSet::TEST_EnableCacheMode() {
RunBlockingInParallel([](EngineShard* shard) {
namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode();
Expand Down
9 changes: 3 additions & 6 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ class EngineShard {
return continuation_trans_;
}

void TEST_EnableHeartbeat();

void StopPeriodicFiber();

struct TxQueueInfo {
Expand Down Expand Up @@ -205,10 +203,10 @@ class EngineShard {
// blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard.

void StartPeriodicFiber(util::ProactorBase* pb);
void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler);

void Heartbeat();
void RunPeriodic(std::chrono::milliseconds period_ms);
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> global_handler);

void CacheStats();

Expand Down Expand Up @@ -288,7 +286,7 @@ class EngineShardSet {
return pp_;
}

void Init(uint32_t size, bool update_db_time);
void Init(uint32_t size, std::function<void()> global_handler);

// Shutdown sequence:
// - EngineShardSet.PreShutDown()
Expand Down Expand Up @@ -342,7 +340,6 @@ class EngineShardSet {
}

// Used in tests
void TEST_EnableHeartBeat();
void TEST_EnableCacheMode();

private:
Expand Down
10 changes: 4 additions & 6 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,7 @@ Service::~Service() {
shard_set = nullptr;
}

void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
const InitOpts& opts) {
void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
InitRedisTables();

config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) {
Expand Down Expand Up @@ -881,7 +880,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
ServerState::Init(index, shard_num, &user_registry_);
});

shard_set->Init(shard_num, !opts.disable_time_update);
shard_set->Init(shard_num, nullptr);
const auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
// We assume that listeners.front() is the main_listener
// see dfly_main RunEngine
Expand Down Expand Up @@ -1600,10 +1599,9 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,

// a bit of a hack. I set up breaker callback here for the owner.
// Should work though it's confusing to have it here.
owner->RegisterBreakHook([res, this](uint32_t) {
owner->RegisterBreakHook([res](uint32_t) {
if (res->transaction)
res->transaction->CancelBlocking(nullptr);
this->server_family().BreakOnShutdown();
});

return res;
Expand Down Expand Up @@ -2529,7 +2527,7 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil
}
}

void Service::OnClose(facade::ConnectionContext* cntx) {
void Service::OnConnectionClose(facade::ConnectionContext* cntx) {
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
ConnectionState& conn_state = server_cntx->conn_state;

Expand Down
12 changes: 2 additions & 10 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,10 @@ using facade::MemcacheParser;

class Service : public facade::ServiceInterface {
public:
struct InitOpts {
bool disable_time_update;

InitOpts() : disable_time_update{false} {
}
};

explicit Service(util::ProactorPool* pp);
~Service();

void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
const InitOpts& opts = InitOpts{});
void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners);

void Shutdown();

Expand Down Expand Up @@ -93,7 +85,7 @@ class Service : public facade::ServiceInterface {
GlobalState GetGlobalState() const;

void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
void OnClose(facade::ConnectionContext* cntx) final;
void OnConnectionClose(facade::ConnectionContext* cntx) final;

Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final;

Expand Down
Loading

0 comments on commit c9ed3f7

Please sign in to comment.