diff --git a/conf/pika.conf b/conf/pika.conf index eef70e97a2..3a991619de 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -289,6 +289,16 @@ sync-window-size : 9000 # Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB]. max-conn-rbuf-size : 268435456 +################### +## Migrate Settings +################### + +target-redis-host : 127.0.0.1 +target-redis-port : 6379 +target-redis-pwd : + +sync-batch-num : 100 +redis-sender-num : 10 #######################################################################E####### #! Critical Settings !# diff --git a/include/migrator_thread.h b/include/migrator_thread.h new file mode 100644 index 0000000000..42676d3442 --- /dev/null +++ b/include/migrator_thread.h @@ -0,0 +1,66 @@ +#ifndef MIGRATOR_THREAD_H_ +#define MIGRATOR_THREAD_H_ + +#include +#include + +#include "storage/storage.h" +#include "net/include/redis_cli.h" + +#include "include/pika_sender.h" + +class MigratorThread : public net::Thread { + public: + MigratorThread(std::shared_ptr storage_, std::vector> *senders, int type, int thread_num) : + storage_(storage_), + should_exit_(false), + senders_(senders), + type_(type), + thread_num_(thread_num), + thread_index_(0), + num_(0) { + } + + virtual ~ MigratorThread(); + + int64_t num() { + std::lock_guard l(num_mutex_); + return num_; + } + + void Stop() { + should_exit_ = true; + } + + private: + void PlusNum() { + std::lock_guard l(num_mutex_); + ++num_; + } + + void DispatchKey(const std::string &command, const std::string& key = ""); + + void MigrateDB(); + void MigrateStringsDB(); + void MigrateListsDB(); + void MigrateHashesDB(); + void MigrateSetsDB(); + void MigrateZsetsDB(); + + virtual void *ThreadMain(); + + private: + std::shared_ptr storage_; + bool should_exit_; + + std::vector> *senders_; + int type_; + int thread_num_; + int thread_index_; + + int64_t num_; + std::mutex num_mutex_; +}; + +#endif + diff --git a/include/pika_conf.h b/include/pika_conf.h index 536da5d54d..cccc2b8959 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf { int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); } int consensus_level() { return consensus_level_.load(); } int replication_num() { return replication_num_.load(); } + + std::string target_redis_host() { return target_redis_host_; } + int target_redis_port() { return target_redis_port_; } + std::string target_redis_pwd() { return target_redis_pwd_; } + int sync_batch_num() { return sync_batch_num_; } + int redis_sender_num() { return redis_sender_num_; } + + int rate_limiter_mode() { std::shared_lock l(rwlock_); return rate_limiter_mode_; @@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf { std::map diff_commands_; void TryPushDiffCommands(const std::string& command, const std::string& value); + // migrate configure items + std::string target_redis_host_; + int target_redis_port_; + std::string target_redis_pwd_; + int sync_batch_num_; + int redis_sender_num_; + // // Critical configure items // diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index dd62622fb9..e548ab551d 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -47,6 +47,7 @@ class PikaReplBgWorker { net::BGThread bg_thread_; static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv); static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset); + static void ParseAndSendPikaCommand(const std::shared_ptr& c_ptr); }; #endif // PIKA_REPL_BGWROKER_H_ diff --git a/include/pika_sender.h b/include/pika_sender.h new file mode 100644 index 0000000000..172c65b24c --- /dev/null +++ b/include/pika_sender.h @@ -0,0 +1,43 @@ +#ifndef PIKA_SENDER_H_ +#define PIKA_SENDER_H_ + +#include +#include +#include +#include +#include + +#include "net/include/bg_thread.h" +#include "net/include/net_cli.h" +#include "net/include/redis_cli.h" + +class PikaSender : public net::Thread { +public: + PikaSender(std::string ip, int64_t port, std::string password); + virtual ~PikaSender(); + void LoadKey(const std::string &cmd); + void Stop(); + + int64_t elements() { return elements_; } + + void SendCommand(std::string &command, const std::string &key); + int QueueSize(); + void ConnectRedis(); + +private: + net::NetCli *cli_; + pstd::CondVar wsignal_; + pstd::CondVar rsignal_; + std::mutex signal_mutex; + std::mutex keys_queue_mutex_; + std::queue keys_queue_; + std::string ip_; + int port_; + std::string password_; + std::atomic should_exit_; + int64_t elements_; + + virtual void *ThreadMain(); +}; + +#endif diff --git a/include/pika_server.h b/include/pika_server.h index 2802eb157d..6d195ff4b2 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -46,6 +46,7 @@ #include "include/pika_statistic.h" #include "include/pika_transaction.h" #include "include/rsync_server.h" +#include "include/redis_sender.h" extern std::unique_ptr g_pika_conf; @@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable { pstd::Status GetCmdRouting(std::vector& redis_cmds, std::vector* dst, bool* all_local); + /* + * migrate used + */ + int SendRedisCommand(const std::string& command, const std::string& key); + void RetransmitData(const std::string& path); + // info debug use void ServerStatus(std::string* info); @@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable { */ std::unique_ptr pika_auxiliary_thread_; + /* + * migrate to redis used + */ + std::vector> redis_senders_; + /* * Async slotsMgrt use */ diff --git a/include/redis_sender.h b/include/redis_sender.h new file mode 100644 index 0000000000..0b84c5f0f1 --- /dev/null +++ b/include/redis_sender.h @@ -0,0 +1,52 @@ +#ifndef REDIS_SENDER_H_ +#define REDIS_SENDER_H_ + +#include +#include +#include +#include +#include + +#include "pika_repl_bgworker.h" +#include "net/include/net_cli.h" +#include "net/include/redis_cli.h" + +class RedisSender : public net::Thread { + public: + RedisSender(int id, std::string ip, int64_t port, std::string password); + virtual ~RedisSender(); + void Stop(void); + int64_t elements() { + return elements_; + } + + void SendRedisCommand(const std::string &command); + + private: + int SendCommand(std::string &command); + void ConnectRedis(); + size_t commandQueueSize() { + std::lock_guard l(keys_mutex_); + return commands_queue_.size(); + } + + private: + int id_; + std::shared_ptr cli_; + pstd::CondVar rsignal_; + pstd::CondVar wsignal_; + pstd::Mutex signal_mutex_; + pstd::Mutex keys_mutex_; + std::queue commands_queue_; + std::string ip_; + int port_; + std::string password_; + bool should_exit_; + int32_t cnt_; + int64_t elements_; + std::atomic last_write_time_; + + virtual void *ThreadMain(); +}; + +#endif diff --git a/pika-migrate.md b/pika-migrate.md new file mode 100644 index 0000000000..320aa709dc --- /dev/null +++ b/pika-migrate.md @@ -0,0 +1,43 @@ +## Pika3.5到Redis迁移工具 + +### 适用版本: +Pika 3.5, 单机模式且只支持单db + +### 功能 +将Pika中的数据在线迁移到Pika、Redis(支持全量、增量同步) + +### 开发背景: +之前Pika项目官方提供的pika\_to\_redis工具仅支持离线将Pika的DB中的数据迁移到Pika、Redis, 且无法增量同步, 该工具实际上就是一个特殊的Pika, 只不过成为从库之后, 内部会将从主库获取到的数据转发给Redis,同时并支持增量同步, 实现热迁功能. + +### 热迁原理 +1. pika-port通过dbsync请求获取主库当前全量db数据, 以及当前db数据所对应的binlog点位 +2. 获取到主库当前全量db数据之后, 扫描db, 将db中的数据转发给Redis +3. 通过之前获取的binlog的点位向主库进行增量同步, 在增量同步的过程中, 将从主库获取到的binlog重组成Redis命令, 转发给Redis + +### 新增配置项 +```cpp +################### +## Migrate Settings +################### + +target-redis-host : 127.0.0.1 +target-redis-port : 6379 +target-redis-pwd : abc + +sync-batch-num : 100 +redis-sender-num : 10 +``` + +### 步骤 +1. 考虑到在pika-port在将全量数据写入到Redis这段时间可能耗时很长, 导致主库原先binlog点位已经被清理, 我们首先在主库上执行`config set expire-logs-nums 10000`, 让主库保留10000个Binlog文件(Binlog文件占用磁盘空间, 可以根据实际情况确定保留binlog的数量), 确保后续该工具请求增量同步的时候, 对应的Binlog文件还存在. +2. 修改该工具配置文件的`target-redis-host, target-redis-port, target-redis-pwd, sync-batch-num, redis-sender-num`配置项(`sync-batch-num`是该工具接收到主库的全量数据之后, 为了提升转发效率, 将`sync-batch-num`个数据一起打包发送给Redis, 此外该工具内部可以指定`redis-sender-num`个线程用于转发命令, 命令通过Key的哈希值被分配到不同的线程中, 所以无需担心多线程发送导致的数据错乱的问题) +3. 使用`pika -c pika.conf`命令启动该工具, 查看日志是否有报错信息 +4. 向该工具执行`slaveof ip port force`向主库请求同步, 观察是否有报错信息 +5. 在确认主从关系建立成功之后(此时pika-port同时也在向目标Redis转发数据了)通过向主库执行`info Replication`查看主从同步延迟(可在主库写入一个特殊的Key, 然后看在Redis测是否可以立马获取到, 来判断是否数据已经基本同步完毕) + +### 注意事项 +1. Pika支持不同数据结构采用同名Key, 但是Redis不支持, 所以在有同Key数据的场景下, 以第一个迁移到Redis数据结构为准, 其他同Key数据结构会丢失 +2. 该工具只支持热迁移单机模式下, 并且只采用单DB版本的Pika, 如果是集群模式, 或者是多DB场景, 工具会报错并且退出. +3. 为了避免由于主库Binlog被清理导致该工具触发多次全量同步向Redis写入脏数据, 工具自身做了保护, 在第二次触发全量同步时会报错退出. + + diff --git a/src/migrator_thread.cc b/src/migrator_thread.cc new file mode 100644 index 0000000000..5c3a223bd1 --- /dev/null +++ b/src/migrator_thread.cc @@ -0,0 +1,466 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/migrator_thread.h" + +#include + +#include +#include +#define GLOG_USE_GLOG_EXPORT +#include + +#include "storage/storage.h" +#include "src/redis_strings.h" +#include "src/redis_lists.h" +#include "src/redis_hashes.h" +#include "src/redis_sets.h" +#include "src/redis_zsets.h" +#include "src/scope_snapshot.h" +#include "src/strings_value_format.h" + +#include "include/pika_conf.h" + +const int64_t MAX_BATCH_NUM = 30000; + +extern PikaConf* g_pika_conf; + +MigratorThread::~MigratorThread() { +} + +void MigratorThread::MigrateStringsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::string value; + std::vector keys; + std::map type_timestamp; + std::map type_status; + while (true) { + cursor = storage_->Scan(storage::DataType::kStrings, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + s = storage_->Get(key, &value); + if (!s.ok()) { + LOG(WARNING) << "get " << key << " error: " << s.ToString(); + continue; + } + + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("SET"); + argv.push_back(key); + argv.push_back(value); + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kStrings] != -2) { + ttl = type_timestamp[storage::kStrings]; + } + + if (ttl > 0) { + argv.push_back("EX"); + argv.push_back(std::to_string(ttl)); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateListsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kLists, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + int64_t pos = 0; + std::vector nodes; + storage::Status s = storage_->LRange(key, pos, pos + g_pika_conf->sync_batch_num() - 1, &nodes); + if (!s.ok()) { + LOG(WARNING) << "db->LRange(key:" << key << ", pos:" << pos + << ", batch size: " << g_pika_conf->sync_batch_num() << ") = " << s.ToString(); + continue; + } + + while (s.ok() && !should_exit_ && !nodes.empty()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("RPUSH"); + argv.push_back(key); + for (const auto& node : nodes) { + argv.push_back(node); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + + pos += g_pika_conf->sync_batch_num(); + nodes.clear(); + s = storage_->LRange(key, pos, pos + g_pika_conf->sync_batch_num() - 1, &nodes); + if (!s.ok()) { + LOG(WARNING) << "db->LRange(key:" << key << ", pos:" << pos + << ", batch size:" << g_pika_conf->sync_batch_num() << ") = " << s.ToString(); + } + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kLists] != -2) { + ttl = type_timestamp[storage::kLists]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateHashesDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kHashes, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector fvs; + storage::Status s = storage_->HGetall(key, &fvs); + if (!s.ok()) { + LOG(WARNING) << "db->HGetall(key:" << key << ") = " << s.ToString(); + continue; + } + + auto it = fvs.begin(); + while (!should_exit_ && it != fvs.end()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("HMSET"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != fvs.end(); + idx++, it++) { + argv.push_back(it->field); + argv.push_back(it->value); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kHashes] != -2) { + ttl = type_timestamp[storage::kHashes]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateSetsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kSets, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector members; + storage::Status s = storage_->SMembers(key, &members); + if (!s.ok()) { + LOG(WARNING) << "db->SMembers(key:" << key << ") = " << s.ToString(); + continue; + } + auto it = members.begin(); + while (!should_exit_ && it != members.end()) { + std::string cmd; + net::RedisCmdArgsType argv; + + argv.push_back("SADD"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != members.end(); + idx++, it++) { + argv.push_back(*it); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kSets] != -2) { + ttl = type_timestamp[storage::kSets]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateZsetsDB() { + int64_t scan_batch_num = g_pika_conf->sync_batch_num() * 10; + if (MAX_BATCH_NUM < scan_batch_num) { + if (g_pika_conf->sync_batch_num() < MAX_BATCH_NUM) { + scan_batch_num = MAX_BATCH_NUM; + } else { + scan_batch_num = g_pika_conf->sync_batch_num() * 2; + } + } + + int64_t ttl = -1; + int64_t cursor = 0; + storage::Status s; + std::vector keys; + std::map type_timestamp; + std::map type_status; + + while (true) { + cursor = storage_->Scan(storage::DataType::kZSets, cursor, "*", scan_batch_num, &keys); + + for (const auto& key : keys) { + std::vector score_members; + storage::Status s = storage_->ZRange(key, 0, -1, &score_members); + if (!s.ok()) { + LOG(WARNING) << "db->ZRange(key:" << key << ") = " << s.ToString(); + continue; + } + auto it = score_members.begin(); + while (!should_exit_ && it != score_members.end()) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("ZADD"); + argv.push_back(key); + for (int idx = 0; + idx < g_pika_conf->sync_batch_num() && !should_exit_ && it != score_members.end(); + idx++, it++) { + argv.push_back(std::to_string(it->score)); + argv.push_back(it->member); + } + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + + ttl = -1; + type_status.clear(); + type_timestamp = storage_->TTL(key, &type_status); + if (type_timestamp[storage::kZSets] != -2) { + ttl = type_timestamp[storage::kZSets]; + } + + if (s.ok() && ttl > 0) { + net::RedisCmdArgsType argv; + std::string cmd; + + argv.push_back("EXPIRE"); + argv.push_back(key); + argv.push_back(std::to_string(ttl)); + + net::SerializeRedisCommand(argv, &cmd); + PlusNum(); + DispatchKey(cmd, key); + } + } + + if (!cursor) { + break; + } + } +} + +void MigratorThread::MigrateDB() { + switch (int(type_)) { + case int(storage::kStrings) : { + MigrateStringsDB(); + break; + } + + case int(storage::kLists) : { + MigrateListsDB(); + break; + } + + case int(storage::kHashes) : { + MigrateHashesDB(); + break; + } + + case int(storage::kSets) : { + MigrateSetsDB(); + break; + } + + case int(storage::kZSets) : { + MigrateZsetsDB(); + break; + } + + default: { + LOG(WARNING) << "illegal db type " << type_; + break; + } + } +} + +void MigratorThread::DispatchKey(const std::string &command, const std::string& key) { + thread_index_ = (thread_index_ + 1) % thread_num_; + size_t idx = thread_index_; + if (key.size()) { // no empty + idx = std::hash()(key) % thread_num_; + } + (*senders_)[idx]->LoadKey(command); +} + +const char* GetDBTypeString(int type) { + switch (type) { + case int(storage::kStrings) : { + return "storage::kStrings"; + } + + case int(storage::kLists) : { + return "storage::kLists"; + } + + case int(storage::kHashes) : { + return "storage::kHashes"; + } + + case int(storage::kSets) : { + return "storage::kSets"; + } + + case int(storage::kZSets) : { + return "storage::kZSets"; + } + + default: { + return "storage::Unknown"; + } + } +} + +void *MigratorThread::ThreadMain() { + MigrateDB(); + should_exit_ = true; + LOG(INFO) << GetDBTypeString(type_) << " keys have been dispatched completly"; + return NULL; +} + diff --git a/src/pika_conf.cc b/src/pika_conf.cc index baf1e54375..bed905f7ad 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -207,6 +207,12 @@ int PikaConf::Load() { if (classic_mode_.load()) { GetConfInt("databases", &databases_); + + // pika-migrate-tool only support 1 db + if (databases_ != 1) { + LOG(FATAL) << "pika-migrate-tool only support 1 db"; + } + if (databases_ < 1 || databases_ > MAX_DB_NUM) { LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_; } @@ -627,6 +633,22 @@ int PikaConf::Load() { sync_window_size_.store(tmp_sync_window_size); } + // redis-migrate conifg args + target_redis_host_ = "127.0.0.1"; + GetConfStr("target-redis-host", &target_redis_host_); + + target_redis_port_ = 6379; + GetConfInt("target-redis-port", &target_redis_port_); + + target_redis_pwd_ = ""; + GetConfStr("target-redis-pwd" , &target_redis_pwd_); + + sync_batch_num_ = 100; + GetConfInt("sync-batch-num", &sync_batch_num_); + + redis_sender_num_ = 8; + GetConfInt("redis-sender-num", &redis_sender_num_); + // max conn rbuf size int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF; GetConfIntHuman("max-conn-rbuf-size", &tmp_max_conn_rbuf_size); diff --git a/src/pika_db.cc b/src/pika_db.cc index 58c4f3bf77..1ecfec0568 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -473,6 +473,9 @@ bool DB::TryUpdateMasterOffset() { << ", master_ip: " << master_ip << ", master_port: " << master_port << ", filenum: " << filenum << ", offset: " << offset << ", term: " << term << ", index: " << index; + // Retransmit Data to target redis + g_pika_server->RetransmitData(dbsync_path_); + pstd::DeleteFile(info_path); if (!ChangeDb(dbsync_path_)) { LOG(WARNING) << "DB: " << db_name_ << ", Failed to change db"; diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 52afad4ba1..bfa897020d 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -231,6 +231,7 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write()) { + ParseAndSendPikaCommand(c_ptr); c_ptr->DoThroughDB(); if (c_ptr->IsNeedUpdateCache()) { c_ptr->DoUpdateCache(); @@ -239,6 +240,7 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { LOG(WARNING) << "It is impossbile to reach here"; } } else { + ParseAndSendPikaCommand(c_ptr); c_ptr->Do(); } if (!c_ptr->IsSuspend()) { @@ -274,3 +276,33 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { } } } + +void PikaReplBgWorker::ParseAndSendPikaCommand(const std::shared_ptr& c_ptr) { + const PikaCmdArgsType& argv = c_ptr->argv(); + if (!strcasecmp(argv[0].data(), "pksetexat")) { + if (argv.size() != 4) { + LOG(WARNING) << "find invaild command, command size: " << argv.size(); + return; + } else { + std::string key = argv[1]; + int timestamp = std::atoi(argv[2].data()); + std::string value = argv[3]; + + int seconds = timestamp - time(NULL); + PikaCmdArgsType tmp_argv; + tmp_argv.push_back("setex"); + tmp_argv.push_back(key); + tmp_argv.push_back(std::to_string(seconds)); + tmp_argv.push_back(value); + + std::string command; + net::SerializeRedisCommand(tmp_argv, &command); + g_pika_server->SendRedisCommand(command, key); + } + } else { + std::string key = argv.size() >= 2 ? argv[1] : argv[0]; + std::string command; + net::SerializeRedisCommand(argv, &command); + g_pika_server->SendRedisCommand(command, key); + } +} \ No newline at end of file diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 3a94b22b3b..c2f3cb9814 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -154,6 +154,7 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { LOG(INFO) << "Finish to handle meta sync response"; } +static bool alerady_full_sync = false; void PikaReplClientConn::HandleDBSyncResponse(void* arg) { std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; @@ -178,12 +179,16 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { return; } - slave_db->SetMasterSessionId(session_id); + if (!alerady_full_sync) { + alerady_full_sync = true; + } else { + LOG(FATAL) << "DBSyncResponse should only be called once"; + } + slave_db->SetMasterSessionId(session_id); slave_db->StopRsync(); slave_db->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "DB: " << db_name << " Need Wait To Sync"; - //now full sync is starting, add an unfinished full sync count g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName()); } diff --git a/src/pika_sender.cc b/src/pika_sender.cc new file mode 100644 index 0000000000..1c6b5b8cf4 --- /dev/null +++ b/src/pika_sender.cc @@ -0,0 +1,173 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_sender.h" + +#include + +PikaSender::PikaSender(std::string ip, int64_t port, std::string password): + cli_(NULL), + ip_(ip), + port_(port), + password_(password), + should_exit_(false), + elements_(0) + { + } + +PikaSender::~PikaSender() { +} + +int PikaSender::QueueSize() { + std::lock_guard lock(keys_queue_mutex_); + return keys_queue_.size(); +} + +void PikaSender::Stop() { + should_exit_.store(true); + wsignal_.notify_all(); + rsignal_.notify_all(); + LOG(INFO) << "PikaSender received stop signal"; +} + +void PikaSender::ConnectRedis() { + while (cli_ == NULL) { + // Connect to redis + cli_ = net::NewRedisCli(); + cli_->set_connect_timeout(1000); + pstd::Status s = cli_->Connect(ip_, port_); + if (!s.ok()) { + delete cli_; + cli_ = NULL; + LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString(); + continue; + } else { + // Connect success + + // Authentication + if (!password_.empty()) { + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("AUTH"); + argv.push_back(password_); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (resp[0] == "OK") { + } else { + LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "send auth failed: " << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + continue; + } + } else { + // If forget to input password + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("PING"); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (s.ok()) { + if (resp[0] == "NOAUTH Authentication required.") { + LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required"; + cli_->Close(); + delete cli_; + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "Recv failed: " << s.ToString(); + cli_->Close(); + delete cli_; + cli_ = NULL; + } + } + } + } + } +} + +void PikaSender::LoadKey(const std::string &key) { + std::unique_lock lock(signal_mutex); + wsignal_.wait(lock, [this]() { return keys_queue_.size() < 100000 || should_exit_; }); + if(!should_exit_) { + std::lock_guard lock(keys_queue_mutex_); + keys_queue_.push(key); + rsignal_.notify_one(); + } +} + +void PikaSender::SendCommand(std::string &command, const std::string &key) { + // Send command + pstd::Status s = cli_->Send(&command); + if (!s.ok()) { + elements_--; + LoadKey(key); + cli_->Close(); + LOG(INFO) << s.ToString().data(); + delete cli_; + cli_ = NULL; + ConnectRedis(); + }else{ + cli_->Recv(NULL); + } +} + +void *PikaSender::ThreadMain() { + LOG(INFO) << "Start sender thread..."; + + if (cli_ == NULL) { + ConnectRedis(); + } + + while (!should_exit_ || QueueSize() != 0) { + std::string command; + + std::unique_lock lock(signal_mutex); + rsignal_.wait(lock, [this]() { return !QueueSize() == 0 || should_exit_; }); + if (QueueSize() == 0 && should_exit_) { + return NULL; + } + lock.unlock(); + + std::string key; + { + std::lock_guard lock(keys_queue_mutex_); + key = keys_queue_.front(); + elements_++; + keys_queue_.pop(); + } + wsignal_.notify_one(); + SendCommand(key, key); + + } + + + if (cli_) { + cli_->Close(); + delete cli_; + cli_ = NULL; + } + LOG(INFO) << "PikaSender thread complete"; + return NULL; +} + diff --git a/src/pika_server.cc b/src/pika_server.cc index b1b065bb91..681d67f369 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -24,6 +24,8 @@ #include "include/pika_monotonic_time.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_sender.h" +#include "include/migrator_thread.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -101,6 +103,15 @@ PikaServer::PikaServer() } } + // Create redis sender + for (int i = 0; i < g_pika_conf->redis_sender_num(); ++i) { + redis_senders_.emplace_back(std::make_unique(int(i), + g_pika_conf->target_redis_host(), + g_pika_conf->target_redis_port(), + g_pika_conf->target_redis_pwd())); + } + + acl_ = std::make_unique<::Acl>(); SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool()); bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_"); @@ -129,7 +140,10 @@ PikaServer::~PikaServer() { bgsave_thread_.StopThread(); key_scan_thread_.StopThread(); pika_migrate_thread_->StopThread(); - + for (size_t i = 0; i < redis_senders_.size(); ++i) { + redis_senders_[i]->Stop(); + } + redis_senders_.clear(); dbs_.clear(); LOG(INFO) << "PikaServer " << pthread_self() << " exit!!!"; @@ -209,6 +223,15 @@ void PikaServer::Start() { << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + for (size_t i = 0; i < redis_senders_.size(); ++i) { + ret = redis_senders_[i]->StartThread(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start RedisSender Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + } + time(&start_time_s_); LOG(INFO) << "Pika Server going to start"; rsync_server_->Start(); @@ -1542,6 +1565,77 @@ Status PikaServer::GetCmdRouting(std::vector& redis_cmds, return Status::OK(); } +int PikaServer::SendRedisCommand(const std::string& command, const std::string& key) { + // Send command + size_t idx = std::hash()(key) % redis_senders_.size(); + redis_senders_[idx]->SendRedisCommand(command); + return 0; +} + +void PikaServer::RetransmitData(const std::string& path) { + std::shared_ptr storage_ = std::make_shared(); + rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), path); + + if (!s.ok()) { + LOG(FATAL) << "open received database error: " << s.ToString(); + return; + } + + // Init SenderThread + int thread_num = g_pika_conf->redis_sender_num(); + std::string target_host = g_pika_conf->target_redis_host(); + int target_port = g_pika_conf->target_redis_port(); + std::string target_pwd = g_pika_conf->target_redis_pwd(); + + LOG(INFO) << "open received database success, start retransmit data to redis(" + << target_host << ":" << target_port << ")"; + + + std::vector> pika_senders; + std::vector> migrators; + + for (int i = 0; i < thread_num; i++) { + pika_senders.emplace_back(std::make_shared(target_host, target_port, target_pwd)); + } + migrators.emplace_back(std::make_shared(storage_, &pika_senders, storage::kStrings, thread_num)); + migrators.emplace_back(std::make_shared(storage_, &pika_senders, storage::kLists, thread_num)); + migrators.emplace_back(std::make_shared(storage_, &pika_senders, storage::kHashes, thread_num)); + migrators.emplace_back(std::make_shared(storage_, &pika_senders, storage::kSets, thread_num)); + migrators.emplace_back(std::make_shared(storage_, &pika_senders, storage::kZSets, thread_num)); + + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->StartThread(); + } + for (size_t i = 0; i < migrators.size(); i++) { + migrators[i]->StartThread(); + } + + for (size_t i = 0; i < migrators.size(); i++) { + migrators[i]->JoinThread(); + } + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->Stop(); + } + for (size_t i = 0; i < pika_senders.size(); i++) { + pika_senders[i]->JoinThread(); + } + + int64_t replies = 0, records = 0; + for (size_t i = 0; i < migrators.size(); i++) { + records += migrators[i]->num(); + } + migrators.clear(); + for (size_t i = 0; i < pika_senders.size(); i++) { + replies += pika_senders[i]->elements(); + } + pika_senders.clear(); + + LOG(INFO) << "=============== Retransmit Finish ====================="; + LOG(INFO) << "Total records : " << records << " have been Scaned"; + LOG(INFO) << "Total replies : " << replies << " received from redis server"; + LOG(INFO) << "======================================================="; +} + void PikaServer::ServerStatus(std::string* info) { std::stringstream tmp_stream; size_t q_size = ClientProcessorThreadPoolCurQueueSize(); diff --git a/src/redis_sender.cc b/src/redis_sender.cc new file mode 100644 index 0000000000..b23f4586c9 --- /dev/null +++ b/src/redis_sender.cc @@ -0,0 +1,188 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + + +#include "include/redis_sender.h" + +#include +#include + +#include + +static time_t kCheckDiff = 1; + +RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string password): + id_(id), + cli_(NULL), + ip_(ip), + port_(port), + password_(password), + should_exit_(false), + cnt_(0), + elements_(0) { + + last_write_time_ = ::time(NULL); +} + +RedisSender::~RedisSender() { + LOG(INFO) << "RedisSender thread " << id_ << " exit!!!"; +} + +void RedisSender::ConnectRedis() { + while (cli_ == NULL) { + // Connect to redis + cli_ = std::shared_ptr(net::NewRedisCli()); + cli_->set_connect_timeout(1000); + cli_->set_recv_timeout(10000); + cli_->set_send_timeout(10000); + pstd::Status s = cli_->Connect(ip_, port_); + if (!s.ok()) { + LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString(); + cli_ = NULL; + sleep(3); + continue; + } else { + // Connect success + LOG(INFO) << "RedisSender thread " << id_ << "Connect to redis(" << ip_ << ":" << port_ << ") success"; + // Authentication + if (!password_.empty()) { + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("AUTH"); + argv.push_back(password_); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (resp[0] == "OK") { + } else { + LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password"; + cli_->Close(); + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << "send auth failed: " << s.ToString(); + cli_->Close(); + cli_ = NULL; + continue; + } + } else { + // If forget to input password + net::RedisCmdArgsType argv, resp; + std::string cmd; + + argv.push_back("PING"); + net::SerializeRedisCommand(argv, &cmd); + pstd::Status s = cli_->Send(&cmd); + + if (s.ok()) { + s = cli_->Recv(&resp); + if (s.ok()) { + if (resp[0] == "NOAUTH Authentication required.") { + LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required"; + cli_->Close(); + cli_ = NULL; + should_exit_ = true; + return; + } + } else { + LOG(WARNING) << s.ToString(); + cli_->Close(); + cli_ = NULL; + } + } + } + } + } +} + +void RedisSender::Stop() { + set_should_stop(); + should_exit_ = true; + rsignal_.notify_all(); + wsignal_.notify_all(); +} + +void RedisSender::SendRedisCommand(const std::string &command) { + std::unique_lock lock(signal_mutex_); + wsignal_.wait(lock, [this]() { return commandQueueSize() < 100000; }); + if (!should_exit_) { + std::lock_guard l(keys_mutex_); + commands_queue_.push(command); + rsignal_.notify_one(); + } +} + +int RedisSender::SendCommand(std::string &command) { + time_t now = ::time(NULL); + if (kCheckDiff < now - last_write_time_) { + int ret = cli_->CheckAliveness(); + if (ret < 0) { + cli_ = NULL; + ConnectRedis(); + } + last_write_time_ = now; + } + + // Send command + int idx = 0; + do { + pstd::Status s = cli_->Send(&command); + if (s.ok()) { + cli_->Recv(NULL); + return 0; + } + + cli_->Close(); + cli_ = NULL; + ConnectRedis(); + } while(++idx < 3); + LOG(WARNING) << "RedisSender " << id_ << " fails to send redis command " << command << ", times: " << idx << ", error: " << "send command failed"; + return -1; +} + +void *RedisSender::ThreadMain() { + LOG(INFO) << "Start redis sender " << id_ << " thread..."; + // sleep(15); + + ConnectRedis(); + + while (!should_exit_) { + std::unique_lock lock(signal_mutex_); + while (commandQueueSize() == 0 && !should_exit_) { + rsignal_.wait_for(lock, std::chrono::milliseconds(100)); + } + + if (should_exit_) { + break; + } + + if (commandQueueSize() == 0) { + continue; + } + + // get redis command + std::string command; + { + std::lock_guard l(keys_mutex_); + command = commands_queue_.front(); + elements_++; + commands_queue_.pop(); + } + + wsignal_.notify_one(); + SendCommand(command); + + } + + LOG(INFO) << "RedisSender thread " << id_ << " complete"; + cli_ = NULL; + return NULL; +} +