diff --git a/conf/pika.conf b/conf/pika.conf index eef70e97a2..3a991619de 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -289,6 +289,16 @@ sync-window-size : 9000 # Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB]. max-conn-rbuf-size : 268435456 +################### +## Migrate Settings +################### + +target-redis-host : 127.0.0.1 +target-redis-port : 6379 +target-redis-pwd : + +sync-batch-num : 100 +redis-sender-num : 10 #######################################################################E####### #! Critical Settings !# diff --git a/include/migrator_thread.h b/include/migrator_thread.h new file mode 100644 index 0000000000..949ca34d4f --- /dev/null +++ b/include/migrator_thread.h @@ -0,0 +1,66 @@ +#ifndef MIGRATOR_THREAD_H_ +#define MIGRATOR_THREAD_H_ + +#include +#include + +#include "storage/storage.h" +#include "net/include/redis_cli.h" + +#include "include/pika_sender.h" + +class MigratorThread : public net::Thread { + public: + MigratorThread(std::shared_ptr storage_, std::vector *senders, int type, int thread_num) : + storage_(storage_), + should_exit_(false), + senders_(senders), + type_(type), + thread_num_(thread_num), + thread_index_(0), + num_(0) { + } + + virtual ~ MigratorThread(); + + int64_t num() { + std::lock_guard l(num_mutex_); + return num_; + } + + void Stop() { + should_exit_ = true; + } + + private: + void PlusNum() { + std::lock_guard l(num_mutex_); + ++num_; + } + + void DispatchKey(const std::string &command, const std::string& key = ""); + + void MigrateDB(); + void MigrateStringsDB(); + void MigrateListsDB(); + void MigrateHashesDB(); + void MigrateSetsDB(); + void MigrateZsetsDB(); + + virtual void *ThreadMain(); + + private: + std::shared_ptr storage_; + bool should_exit_; + + std::vector *senders_; + int type_; + int thread_num_; + int thread_index_; + + int64_t num_; + std::mutex num_mutex_; +}; + +#endif + diff --git a/include/pika_conf.h b/include/pika_conf.h index 536da5d54d..cccc2b8959 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf { int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); } int consensus_level() { return consensus_level_.load(); } int replication_num() { return replication_num_.load(); } + + std::string target_redis_host() { return target_redis_host_; } + int target_redis_port() { return target_redis_port_; } + std::string target_redis_pwd() { return target_redis_pwd_; } + int sync_batch_num() { return sync_batch_num_; } + int redis_sender_num() { return redis_sender_num_; } + + int rate_limiter_mode() { std::shared_lock l(rwlock_); return rate_limiter_mode_; @@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf { std::map diff_commands_; void TryPushDiffCommands(const std::string& command, const std::string& value); + // migrate configure items + std::string target_redis_host_; + int target_redis_port_; + std::string target_redis_pwd_; + int sync_batch_num_; + int redis_sender_num_; + // // Critical configure items // diff --git a/include/pika_sender.h b/include/pika_sender.h new file mode 100644 index 0000000000..ba92f37a9b --- /dev/null +++ b/include/pika_sender.h @@ -0,0 +1,43 @@ +#ifndef PIKA_SENDER_H_ +#define PIKA_SENDER_H_ + +#include +#include +#include +#include +#include + +#include "net/include/bg_thread.h" +#include "net/include/net_cli.h" +#include "net/include/redis_cli.h" + +class PikaSender : public net::Thread { +public: + PikaSender(std::string ip, int64_t port, std::string password); + virtual ~PikaSender(); + void LoadKey(const std::string &cmd); + void Set_should_stop(); + bool Should_stop() { return should_exit_.load(); } + + int64_t elements() { return elements_; } + + void SendCommand(std::string &command, const std::string &key); + int QueueSize(); + void ConnectRedis(); + +private: + net::NetCli *cli_; + pstd::CondVar wsignal_; + pstd::CondVar rsignal_; + std::mutex keys_mutex_; + std::queue keys_queue_; + std::string ip_; + int port_; + std::string password_; + std::atomic should_exit_; + int64_t elements_; + + virtual void *ThreadMain(); +}; + +#endif diff --git a/include/pika_server.h b/include/pika_server.h index 2802eb157d..6d195ff4b2 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -46,6 +46,7 @@ #include "include/pika_statistic.h" #include "include/pika_transaction.h" #include "include/rsync_server.h" +#include "include/redis_sender.h" extern std::unique_ptr g_pika_conf; @@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable { pstd::Status GetCmdRouting(std::vector& redis_cmds, std::vector* dst, bool* all_local); + /* + * migrate used + */ + int SendRedisCommand(const std::string& command, const std::string& key); + void RetransmitData(const std::string& path); + // info debug use void ServerStatus(std::string* info); @@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable { */ std::unique_ptr pika_auxiliary_thread_; + /* + * migrate to redis used + */ + std::vector> redis_senders_; + /* * Async slotsMgrt use */ diff --git a/include/redis_sender.h b/include/redis_sender.h new file mode 100644 index 0000000000..a3e3a8a050 --- /dev/null +++ b/include/redis_sender.h @@ -0,0 +1,47 @@ +#ifndef REDIS_SENDER_H_ +#define REDIS_SENDER_H_ + +#include +#include +#include +#include +#include + +#include "pika_repl_bgworker.h" +#include "net/include/net_cli.h" +#include "net/include/redis_cli.h" + +class RedisSender : public net::Thread { + public: + RedisSender(int id, std::string ip, int64_t port, std::string password); + virtual ~RedisSender(); + void Stop(void); + int64_t elements() { + return elements_; + } + + void SendRedisCommand(const std::string &command); + + private: + int SendCommand(std::string &command); + void ConnectRedis(); + + private: + int id_; + net::NetCli *cli_; + pstd::CondVar rsignal_; + pstd::CondVar wsignal_; + pstd::Mutex commands_mutex_; + std::queue commands_queue_; + std::string ip_; + int port_; + std::string password_; + bool should_exit_; + int32_t cnt_; + int64_t elements_; + std::atomic last_write_time_; + + virtual void *ThreadMain(); +}; + +#endif diff --git a/src/migrator_thread.cc b/src/migrator_thread.cc new file mode 100644 index 0000000000..5c3a223bd1 --- /dev/null +++ b/src/migrator_thread.cc @@ -0,0 +1,466 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/migrator_thread.h" + +#include + +#include +#include +#define GLOG_USE_GLOG_EXPORT +#include + +#include "storage/storage.h" +#include "src/redis_strings.h" +#include "src/redis_lists.h" +#include "src/redis_hashes.h" +#include "src/redis_sets.h" +#include "src/redis_zsets.h" +#include "src/scope_snapshot.h" +#include "src/strings_value_format.h" + +#include "include/pika_conf.h" + +const int64_t MAX_BATCH_NUM = 30000; + +extern PikaConf* g_pika_conf; + +MigratorThread::~MigratorThread() { +} + +void MigratorThread::MigrateStringsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::string value; + std::vector keys; + std::map type_timestamp; + std::map type_status; + while (true) { + cursor = storage_->Scan(storage::DataType::kStrings, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + s = storage_->Get(key, &value); + if (!s.ok()) { + LOG(WARNING) << "get " << key << " error: " << s.ToString(); + continue; + } + + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("SET"); + argv.push_back(key); + argv.push_back(value); + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kStrings] != -2) { + ttl = type_timestamp[storage::kStrings]; + } + + if (ttl > 0) { + argv.push_back("EX"); + argv.push_back(std::to_string(ttl)); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateListsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kLists, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + int64_t pos = 0; + std::vector nodes; + storage::Status s = storage_->LRange(key, pos, pos + g_pika_conf->sync_batch_num() - 1, &nodes); + if (!s.ok()) { + LOG(WARNING) << "db->LRange(key:" << key << ", pos:" << pos + << ", batch size: " << g_pika_conf->sync_batch_num() << ") = " << s.ToString(); + continue; + } + + while (s.ok() && !should_exit_ && !nodes.empty()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("RPUSH"); + argv.push_back(key); + for (const auto& node : nodes) { + argv.push_back(node); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + + pos += g_pika_conf->sync_batch_num(); + nodes.clear(); + s = storage_->LRange(key, pos, pos + g_pika_conf->sync_batch_num() - 1, &nodes); + if (!s.ok()) { + LOG(WARNING) << "db->LRange(key:" << key << ", pos:" << pos + << ", batch size:" << g_pika_conf->sync_batch_num() << ") = " << s.ToString(); + } + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kLists] != -2) { + ttl = type_timestamp[storage::kLists]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateHashesDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kHashes, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector fvs; + storage::Status s = storage_->HGetall(key, &fvs); + if (!s.ok()) { + LOG(WARNING) << "db->HGetall(key:" << key << ") = " << s.ToString(); + continue; + } + + auto it = fvs.begin(); + while (!should_exit_ && it != fvs.end()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("HMSET"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != fvs.end(); + idx++, it++) { + argv.push_back(it->field); + argv.push_back(it->value); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kHashes] != -2) { + ttl = type_timestamp[storage::kHashes]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateSetsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kSets, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector members; + storage::Status s = storage_->SMembers(key, &members); + if (!s.ok()) { + LOG(WARNING) << "db->SMembers(key:" << key << ") = " << s.ToString(); + continue; + } + auto it = members.begin(); + while (!should_exit_ && it != members.end()) { + std::string cmd; + net::RedisCmdArgsType argv; + + argv.push_back("SADD"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != members.end(); + idx++, it++) { + argv.push_back(*it); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kSets] != -2) { + ttl = type_timestamp[storage::kSets]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateZsetsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kZSets, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector score_members; + storage::Status s = storage_->ZRange(key, 0, -1, &score_members); + if (!s.ok()) { + LOG(WARNING) << "db->ZRange(key:" << key << ") = " << s.ToString(); + continue; + } + auto it = score_members.begin(); + while (!should_exit_ && it != score_members.end()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("ZADD"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != score_members.end(); + idx++, it++) { + argv.push_back(std::to_string(it->score)); + argv.push_back(it->member); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kZSets] != -2) { + ttl = type_timestamp[storage::kZSets]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateDB() { + switch (int(type_)) { + case int(storage::kStrings) : { + MigrateStringsDB(); + break; + } + + case int(storage::kLists) : { + MigrateListsDB(); + break; + } + + case int(storage::kHashes) : { + MigrateHashesDB(); + break; + } + + case int(storage::kSets) : { + MigrateSetsDB(); + break; + } + + case int(storage::kZSets) : { + MigrateZsetsDB(); + break; + } + + default: { + LOG(WARNING) << "illegal db type " << type_; + break; + } + } +} + +void MigratorThread::DispatchKey(const std::string &command, const std::string& key) { + thread_index_ = (thread_index_ + 1) % thread_num_; + size_t idx = thread_index_; + if (key.size()) { // no empty + idx = std::hash()(key) % thread_num_; + } + (*senders_)[idx]->LoadKey(command); +} + +const char* GetDBTypeString(int type) { + switch (type) { + case int(storage::kStrings) : { + return "storage::kStrings"; + } + + case int(storage::kLists) : { + return "storage::kLists"; + } + + case int(storage::kHashes) : { + return "storage::kHashes"; + } + + case int(storage::kSets) : { + return "storage::kSets"; + } + + case int(storage::kZSets) : { + return "storage::kZSets"; + } + + default: { + return "storage::Unknown"; + } + } +} + +void *MigratorThread::ThreadMain() { + MigrateDB(); + should_exit_ = true; + LOG(INFO) << GetDBTypeString(type_) << " keys have been dispatched completly"; + return NULL; +} + diff --git a/src/pika_conf.cc b/src/pika_conf.cc index baf1e54375..4d33f387b6 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -627,6 +627,22 @@ int PikaConf::Load() { sync_window_size_.store(tmp_sync_window_size); } + // redis-migrate conifg args + target_redis_host_ = "127.0.0.1"; + GetConfStr("target-redis-host", &target_redis_host_); + + target_redis_port_ = 6379; + GetConfInt("target-redis-port", &target_redis_port_); + + target_redis_pwd_ = ""; + GetConfStr("target-redis-pwd" , &target_redis_pwd_); + + sync_batch_num_ = 100; + GetConfInt("sync-batch-num", &sync_batch_num_); + + redis_sender_num_ = 8; + GetConfInt("redis-sender-num", &redis_sender_num_); + // max conn rbuf size int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF; GetConfIntHuman("max-conn-rbuf-size", &tmp_max_conn_rbuf_size); diff --git a/src/pika_db.cc b/src/pika_db.cc index 58c4f3bf77..1ecfec0568 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -473,6 +473,9 @@ bool DB::TryUpdateMasterOffset() { << ", master_ip: " << master_ip << ", master_port: " << master_port << ", filenum: " << filenum << ", offset: " << offset << ", term: " << term << ", index: " << index; + // Retransmit Data to target redis + g_pika_server->RetransmitData(dbsync_path_); + pstd::DeleteFile(info_path); if (!ChangeDb(dbsync_path_)) { LOG(WARNING) << "DB: " << db_name_ << ", Failed to change db"; diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 52afad4ba1..3312d697d0 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -231,6 +231,10 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write()) { + std::string command; + net::SerializeRedisCommand(argv, &command); + std::string dispatchKey = argv.size() >= 2 ? argv[1] : argv[0]; + g_pika_server->SendRedisCommand(command, dispatchKey); c_ptr->DoThroughDB(); if (c_ptr->IsNeedUpdateCache()) { c_ptr->DoUpdateCache(); @@ -239,6 +243,10 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { LOG(WARNING) << "It is impossbile to reach here"; } } else { + std::string command; + net::SerializeRedisCommand(argv, &command); + std::string dispatchKey = argv.size() >= 2 ? argv[1] : argv[0]; + g_pika_server->SendRedisCommand(command, dispatchKey); c_ptr->Do(); } if (!c_ptr->IsSuspend()) { diff --git a/src/pika_sender.cc b/src/pika_sender.cc new file mode 100644 index 0000000000..7d77212abc --- /dev/null +++ b/src/pika_sender.cc @@ -0,0 +1,173 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_sender.h" + +#include + +PikaSender::PikaSender(std::string ip, int64_t port, std::string password): + cli_(NULL), + ip_(ip), + port_(port), + password_(password), + should_exit_(false), + elements_(0) + { + } + +PikaSender::~PikaSender() { +} + +int PikaSender::QueueSize() { + // The locking operation is guaranteed by the external caller + return keys_queue_.size(); +} + +void PikaSender::Set_should_stop() { + should_exit_.store(true); + wsignal_.notify_all(); + rsignal_.notify_all(); +} + +void PikaSender::ConnectRedis() { + while (cli_ == NULL) { + // Connect to redis + cli_ = net::NewRedisCli(); + cli_->set_connect_timeout(1000); + pstd::Status s = cli_->Connect(ip_, port_); + if (!s.ok()) { + delete cli_; + cli_ = NULL; + LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString(); + continue; + } else { + // Connect success + + // Authentication + if (!password_.empty()) { + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("AUTH"); + argv.push_back(password_); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (resp[0] == "OK") { + } else { + LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "send auth failed: " << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + continue; + } + } else { + // If forget to input password + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("PING"); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (s.ok()) { + if (resp[0] == "NOAUTH Authentication required.") { + LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "Recv failed: " << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + } + } + } + } + } +} + +void PikaSender::LoadKey(const std::string &key) { + std::unique_lock lock(keys_mutex_); + wsignal_.wait(lock, [this]() { return keys_queue_.size() < 100000 || should_stop(); }); + if(!should_stop()) { + keys_queue_.push(key); + rsignal_.notify_one(); + } +} + +void PikaSender::SendCommand(std::string &command, const std::string &key) { + // Send command + pstd::Status s = cli_->Send(&command); + if (!s.ok()) { + elements_--; + LoadKey(key); + cli_->Close(); + LOG(INFO) << s.ToString().data(); + delete cli_; + cli_ = NULL; + ConnectRedis(); + } +} + +void *PikaSender::ThreadMain() { + LOG(INFO) << "Start sender thread..."; + int cnt = 0; + + if (cli_ == NULL) { + ConnectRedis(); + } + + while (!should_stop()) { + std::string command; + + std::unique_lock lock(keys_mutex_); + rsignal_.wait(lock, [this]() { return !keys_queue_.empty() > 0 || should_stop(); }); + if (QueueSize() == 0 && should_exit_) { + return NULL; + } + + std::string key = keys_queue_.front(); + elements_++; + keys_queue_.pop(); + + lock.unlock(); + SendCommand(key, key); + cnt++; + if (cnt >= 200) { + for(; cnt > 0; cnt--) { + cli_->Recv(NULL); + } + } + } + for(; cnt > 0; cnt--) { + cli_->Recv(NULL); + } + + if (cli_) { + cli_->Close(); + delete cli_; + cli_ = NULL; + } + LOG(INFO) << "PikaSender thread complete"; + return NULL; +} + diff --git a/src/pika_server.cc b/src/pika_server.cc index b1b065bb91..3a3212042d 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -24,6 +24,8 @@ #include "include/pika_monotonic_time.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_sender.h" +#include "include/migrator_thread.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -101,6 +103,15 @@ PikaServer::PikaServer() } } + // Create redis sender + for (int i = 0; i < g_pika_conf->redis_sender_num(); ++i) { + redis_senders_.emplace_back(std::make_unique(int(i), + g_pika_conf->target_redis_host(), + g_pika_conf->target_redis_port(), + g_pika_conf->target_redis_pwd())); + } + + acl_ = std::make_unique<::Acl>(); SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool()); bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_"); @@ -1542,6 +1553,78 @@ Status PikaServer::GetCmdRouting(std::vector& redis_cmds, return Status::OK(); } +int PikaServer::SendRedisCommand(const std::string& command, const std::string& key) { + // Send command + size_t idx = std::hash()(key) % redis_senders_.size(); + redis_senders_[idx]->SendRedisCommand(command); + return 0; +} + +void PikaServer::RetransmitData(const std::string& path) { + std::shared_ptr storage_ = std::make_shared(); + rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), path); + + if (!s.ok()) { + LOG(FATAL) << "open received database error: " << s.ToString(); + return; + } + + // Init SenderThread + int thread_num = g_pika_conf->redis_sender_num(); + std::string target_host = g_pika_conf->target_redis_host(); + int target_port = g_pika_conf->target_redis_port(); + std::string target_pwd = g_pika_conf->target_redis_pwd(); + + LOG(INFO) << "open received database success, start retransmit data to redis(" + << target_host << ":" << target_port << ")"; + + std::vector pika_senders; + std::vector migrators; + + for (int i = 0; i < thread_num; i++) { + pika_senders.emplace_back(new PikaSender(target_host, target_port, target_pwd)); + } + migrators.emplace_back(new MigratorThread(storage_, &pika_senders, storage::kStrings, thread_num)); + migrators.emplace_back(new MigratorThread(storage_, &pika_senders, storage::kLists, thread_num)); + migrators.emplace_back(new MigratorThread(storage_, &pika_senders, storage::kHashes, thread_num)); + migrators.emplace_back(new MigratorThread(storage_, &pika_senders, storage::kSets, thread_num)); + migrators.emplace_back(new MigratorThread(storage_, &pika_senders, storage::kZSets, thread_num)); + + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->StartThread(); + } + for (size_t i = 0; i < migrators.size(); i++) { + migrators[i]->StartThread(); + } + + for (size_t i = 0; i < migrators.size(); i++) { + migrators[i]->JoinThread(); + } + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->Set_should_stop(); + } + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->JoinThread(); + } + + int64_t replies = 0, records = 0; + for (size_t i = 0; i < migrators.size(); i++) { + records += migrators[i]->num(); + delete migrators[i]; + } + migrators.clear(); + for (size_t i = 0; i < pika_senders.size(); i++) { + replies += pika_senders[i]->elements(); + delete pika_senders[i]; + } + pika_senders.clear(); + + LOG(INFO) << "=============== Retransmit Finish ====================="; + LOG(INFO) << "Total records : " << records << " have been Scaned"; + LOG(INFO) << "Total replies : " << replies << " received from redis server"; + LOG(INFO) << "======================================================="; +} + void PikaServer::ServerStatus(std::string* info) { std::stringstream tmp_stream; size_t q_size = ClientProcessorThreadPoolCurQueueSize(); diff --git a/src/redis_sender.cc b/src/redis_sender.cc new file mode 100644 index 0000000000..d8d8e5b15e --- /dev/null +++ b/src/redis_sender.cc @@ -0,0 +1,207 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + + +#include "include/redis_sender.h" + +#include +#include + +#include + +static time_t kCheckDiff = 1; + +RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string password): + id_(id), + cli_(NULL), + ip_(ip), + port_(port), + password_(password), + should_exit_(false), + cnt_(0), + elements_(0) { + + last_write_time_ = ::time(NULL); +} + +RedisSender::~RedisSender() { + LOG(INFO) << "RedisSender thread " << id_ << " exit!!!"; +} + +void RedisSender::ConnectRedis() { + while (cli_ == NULL) { + // Connect to redis + cli_ = net::NewRedisCli(); + cli_->set_connect_timeout(1000); + cli_->set_recv_timeout(10000); + cli_->set_send_timeout(10000); + pstd::Status s = cli_->Connect(ip_, port_); + if (!s.ok()) { + LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString(); + delete cli_; + cli_ = NULL; + sleep(3); + continue; + } else { + // Connect success + + // Authentication + if (!password_.empty()) { + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("AUTH"); + argv.push_back(password_); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (resp[0] == "OK") { + } else { + LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "send auth failed: " << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + continue; + } + } else { + // If forget to input password + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("PING"); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (s.ok()) { + if (resp[0] == "NOAUTH Authentication required.") { + LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + } + } + } + } + } +} + +void RedisSender::Stop() { + set_should_stop(); + should_exit_ = true; + rsignal_.notify_all(); + wsignal_.notify_all(); +} + +void RedisSender::SendRedisCommand(const std::string &command) { + std::unique_lock lock(commands_mutex_); + wsignal_.wait(lock, [this]() { return commands_queue_.size() < 100000; }); + if (!should_exit_) { + commands_queue_.push(command); + rsignal_.notify_one(); + } +} + +int RedisSender::SendCommand(std::string &command) { + time_t now = ::time(NULL); + if (kCheckDiff < now - last_write_time_) { + int ret = cli_->CheckAliveness(); + if (ret < 0) { + ConnectRedis(); + } + last_write_time_ = now; + } + + // Send command + int idx = 0; + do { + pstd::Status s = cli_->Send(&command); + if (s.ok()) { + return 0; + } + + LOG(WARNING) << "RedisSender " << id_ << "fails to send redis command " << command << ", times: " << idx + 1 << ", error: " << s.ToString(); + + cli_->Close(); + delete cli_; + cli_ = NULL; + ConnectRedis(); + } while(++idx < 3); + + return -1; +} + +void *RedisSender::ThreadMain() { + LOG(INFO) << "Start redis sender " << id_ << " thread..."; + // sleep(15); + int ret = 0; + + ConnectRedis(); + + while (!should_exit_) { + std::unique_lock lock(commands_mutex_); + while (commands_queue_.size() == 0 && !should_exit_) { + rsignal_.wait_for(lock, std::chrono::milliseconds(100)); + // rsignal_.Wait(); + } + // if (commands_queue_.size() == 0 && should_exit_) { + if (should_exit_) { + lock.unlock(); + break; + } + + if (commands_queue_.size() == 0) { + lock.unlock(); + continue; + } + + // get redis command + std::string command; + command = commands_queue_.front(); + // printf("%d, command %s\n", id_, command.c_str()); + elements_++; + commands_queue_.pop(); + wsignal_.notify_one(); + lock.unlock(); + ret = SendCommand(command); + if (ret == 0) { + cnt_++; + } + + if (cnt_ >= 200) { + for(; cnt_ > 0; cnt_--) { + cli_->Recv(NULL); + } + } + } + for(; cnt_ > 0; cnt_--) { + cli_->Recv(NULL); + } + + LOG(INFO) << "RedisSender thread " << id_ << " complete"; + delete cli_; + cli_ = NULL; + return NULL; +} +