Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Nov 13, 2024
1 parent 40792d1 commit 714b857
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

#include <glog/logging.h>

#include "include/pika_cmd_table_manager.h"
#include "include/pika_conf.h"
#include "include/pika_repl_bgworker.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "pstd/include/pstd_defer.h"
#include "src/pstd/include/scope_record_lock.h"
#include "include/pika_conf.h"

extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
Expand Down Expand Up @@ -97,13 +97,15 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
// because DispatchBinlogRes() have been order them.
worker->db_name_ = db_name;

std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
std::shared_ptr<SyncMasterDB> db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << "DB " << db_name << " Not Found";
return;
}

std::shared_ptr<SyncSlaveDB> slave_db = g_pika_rm->GetSyncSlaveDBByName(DBInfo(db_name));
std::shared_ptr<SyncSlaveDB> slave_db =
g_pika_rm->GetSyncSlaveDBByName(DBInfo(db_name));
if (!slave_db) {
LOG(WARNING) << "Slave DB " << db_name << " Not Found";
return;
Expand All @@ -119,8 +121,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
}

if (slave_db->MasterSessionId() != binlog_res.session_id()) {
LOG(WARNING) << "Check SessionId Mismatch: " << slave_db->MasterIp() << ":" << slave_db->MasterPort() << ", "
<< slave_db->SyncDBInfo().ToString() << " expected_session: " << binlog_res.session_id()
LOG(WARNING) << "Check SessionId Mismatch: " << slave_db->MasterIp() << ":"
<< slave_db->MasterPort() << ", " << slave_db->SyncDBInfo().ToString()
<< " expected_session: " << binlog_res.session_id()
<< ", actual_session:" << slave_db->MasterSessionId();
LOG(WARNING) << "Check Session failed " << binlog_res.slot().db_name();
slave_db->SetReplState(ReplState::kTryConnect);
Expand Down Expand Up @@ -171,8 +174,8 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
std::string monitor_message;
if (g_pika_server->HasMonitorClients()) {
std::string db_name = worker->db_name_.substr(2);
std::string monitor_message = std::to_string(static_cast<double>(pstd::NowMicros()) / 1000000) + " [" + db_name +
" " + worker->ip_port_ + "]";
std::string monitor_message =
std::to_string(static_cast<double>(pstd::NowMicros()) / 1000000) + " [" + db_name + " " + worker->ip_port_ + "]";
for (const auto& item : argv) {
monitor_message += " " + pstd::ToRead(item);
}
Expand All @@ -193,7 +196,8 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red

g_pika_server->UpdateQueryNumAndExecCountDB(worker->db_name_, opt, c_ptr->is_write());

std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
std::shared_ptr<SyncMasterDB> db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if (!db) {
LOG(WARNING) << worker->db_name_ << "Not found.";
}
Expand Down Expand Up @@ -221,8 +225,9 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBLockShared();
}
if (c_ptr->IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->is_write()) {
c_ptr->DoThroughDB();
if (c_ptr->IsNeedUpdateCache()) {
Expand All @@ -238,8 +243,11 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
c_ptr->GetDB()->DBUnlockShared();
}

if (c_ptr->res().ok() && c_ptr->is_write() && c_ptr->name() != kCmdNameFlushdb && c_ptr->name() != kCmdNameFlushall &&
c_ptr->name() != kCmdNameExec) {
if (c_ptr->res().ok()
&& c_ptr->is_write()
&& c_ptr->name() != kCmdNameFlushdb
&& c_ptr->name() != kCmdNameFlushall
&& c_ptr->name() != kCmdNameExec) {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
Expand Down

0 comments on commit 714b857

Please sign in to comment.