Skip to content

Commit

Permalink
feature: migrate tools support pika v3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-spild committed Jan 6, 2025
1 parent cd92a46 commit e744786
Show file tree
Hide file tree
Showing 16 changed files with 1,228 additions and 3 deletions.
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ sync-window-size : 9000
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456

###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd :

sync-batch-num : 100
redis-sender-num : 10

#######################################################################E#######
#! Critical Settings !#
Expand Down
66 changes: 66 additions & 0 deletions include/migrator_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef MIGRATOR_THREAD_H_
#define MIGRATOR_THREAD_H_

#include <iostream>
#include <mutex>

#include "storage/storage.h"
#include "net/include/redis_cli.h"

#include "include/pika_sender.h"

class MigratorThread : public net::Thread {
public:
MigratorThread(std::shared_ptr<storage::Storage> storage_, std::vector<std::shared_ptr<PikaSender>> *senders, int type, int thread_num) :
storage_(storage_),
should_exit_(false),
senders_(senders),
type_(type),
thread_num_(thread_num),
thread_index_(0),
num_(0) {
}

virtual ~ MigratorThread();

int64_t num() {
std::lock_guard<std::mutex> l(num_mutex_);
return num_;
}

void Stop() {
should_exit_ = true;
}

private:
void PlusNum() {
std::lock_guard<std::mutex> l(num_mutex_);
++num_;
}

void DispatchKey(const std::string &command, const std::string& key = "");

void MigrateDB();
void MigrateStringsDB();
void MigrateListsDB();
void MigrateHashesDB();
void MigrateSetsDB();
void MigrateZsetsDB();

virtual void *ThreadMain();

private:
std::shared_ptr<storage::Storage> storage_;
bool should_exit_;

std::vector<std::shared_ptr<PikaSender>> *senders_;
int type_;
int thread_num_;
int thread_index_;

int64_t num_;
std::mutex num_mutex_;
};

#endif

15 changes: 15 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }

std::string target_redis_host() { return target_redis_host_; }
int target_redis_port() { return target_redis_port_; }
std::string target_redis_pwd() { return target_redis_pwd_; }
int sync_batch_num() { return sync_batch_num_; }
int redis_sender_num() { return redis_sender_num_; }


int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
Expand Down Expand Up @@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf {
std::map<std::string, std::string> diff_commands_;
void TryPushDiffCommands(const std::string& command, const std::string& value);

// migrate configure items
std::string target_redis_host_;
int target_redis_port_;
std::string target_redis_pwd_;
int sync_batch_num_;
int redis_sender_num_;

//
// Critical configure items
//
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PikaReplBgWorker {
net::BGThread bg_thread_;
static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset);
static void ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr);
};

#endif // PIKA_REPL_BGWROKER_H_
43 changes: 43 additions & 0 deletions include/pika_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PIKA_SENDER_H_
#define PIKA_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class PikaSender : public net::Thread {
public:
PikaSender(std::string ip, int64_t port, std::string password);
virtual ~PikaSender();
void LoadKey(const std::string &cmd);
void Stop();

int64_t elements() { return elements_; }

void SendCommand(std::string &command, const std::string &key);
int QueueSize();
void ConnectRedis();

private:
net::NetCli *cli_;
pstd::CondVar wsignal_;
pstd::CondVar rsignal_;
std::mutex signal_mutex;
std::mutex keys_queue_mutex_;
std::queue<std::string> keys_queue_;
std::string ip_;
int port_;
std::string password_;
std::atomic<bool> should_exit_;
int64_t elements_;

virtual void *ThreadMain();
};

#endif
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "include/pika_statistic.h"
#include "include/pika_transaction.h"
#include "include/rsync_server.h"
#include "include/redis_sender.h"

extern std::unique_ptr<PikaConf> g_pika_conf;

Expand Down Expand Up @@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable {

pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);

/*
* migrate used
*/
int SendRedisCommand(const std::string& command, const std::string& key);
void RetransmitData(const std::string& path);

// info debug use
void ServerStatus(std::string* info);

Expand Down Expand Up @@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;

/*
* migrate to redis used
*/
std::vector<std::unique_ptr<RedisSender>> redis_senders_;

/*
* Async slotsMgrt use
*/
Expand Down
52 changes: 52 additions & 0 deletions include/redis_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#ifndef REDIS_SENDER_H_
#define REDIS_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "pika_repl_bgworker.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class RedisSender : public net::Thread {
public:
RedisSender(int id, std::string ip, int64_t port, std::string password);
virtual ~RedisSender();
void Stop(void);
int64_t elements() {
return elements_;
}

void SendRedisCommand(const std::string &command);

private:
int SendCommand(std::string &command);
void ConnectRedis();
size_t commandQueueSize() {
std::lock_guard l(keys_mutex_);
return commands_queue_.size();
}

private:
int id_;
std::shared_ptr<net::NetCli> cli_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
pstd::Mutex signal_mutex_;
pstd::Mutex keys_mutex_;
std::queue<std::string> commands_queue_;
std::string ip_;
int port_;
std::string password_;
bool should_exit_;
int32_t cnt_;
int64_t elements_;
std::atomic<time_t> last_write_time_;

virtual void *ThreadMain();
};

#endif
43 changes: 43 additions & 0 deletions pika-migrate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## Pika3.5到Redis迁移工具

### 适用版本:
Pika 3.5, 单机模式且只支持单db

### 功能
将Pika中的数据在线迁移到Pika、Redis(支持全量、增量同步)

### 开发背景:
之前Pika项目官方提供的pika\_to\_redis工具仅支持离线将Pika的DB中的数据迁移到Pika、Redis, 且无法增量同步, 该工具实际上就是一个特殊的Pika, 只不过成为从库之后, 内部会将从主库获取到的数据转发给Redis,同时并支持增量同步, 实现热迁功能.

### 热迁原理
1. pika-port通过dbsync请求获取主库当前全量db数据, 以及当前db数据所对应的binlog点位
2. 获取到主库当前全量db数据之后, 扫描db, 将db中的数据转发给Redis
3. 通过之前获取的binlog的点位向主库进行增量同步, 在增量同步的过程中, 将从主库获取到的binlog重组成Redis命令, 转发给Redis

### 新增配置项
```cpp
###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd : abc

sync-batch-num : 100
redis-sender-num : 10
```

### 步骤
1. 考虑到在pika-port在将全量数据写入到Redis这段时间可能耗时很长, 导致主库原先binlog点位已经被清理, 我们首先在主库上执行`config set expire-logs-nums 10000`, 让主库保留10000个Binlog文件(Binlog文件占用磁盘空间, 可以根据实际情况确定保留binlog的数量), 确保后续该工具请求增量同步的时候, 对应的Binlog文件还存在.
2. 修改该工具配置文件的`target-redis-host, target-redis-port, target-redis-pwd, sync-batch-num, redis-sender-num`配置项(`sync-batch-num`是该工具接收到主库的全量数据之后, 为了提升转发效率, 将`sync-batch-num`个数据一起打包发送给Redis, 此外该工具内部可以指定`redis-sender-num`个线程用于转发命令, 命令通过Key的哈希值被分配到不同的线程中, 所以无需担心多线程发送导致的数据错乱的问题)
3. 使用`pika -c pika.conf`命令启动该工具, 查看日志是否有报错信息
4. 向该工具执行`slaveof ip port force`向主库请求同步, 观察是否有报错信息
5. 在确认主从关系建立成功之后(此时pika-port同时也在向目标Redis转发数据了)通过向主库执行`info Replication`查看主从同步延迟(可在主库写入一个特殊的Key, 然后看在Redis测是否可以立马获取到, 来判断是否数据已经基本同步完毕)

### 注意事项
1. Pika支持不同数据结构采用同名Key, 但是Redis不支持, 所以在有同Key数据的场景下, 以第一个迁移到Redis数据结构为准, 其他同Key数据结构会丢失
2. 该工具只支持热迁移单机模式下, 并且只采用单DB版本的Pika, 如果是集群模式, 或者是多DB场景, 工具会报错并且退出.
3. 为了避免由于主库Binlog被清理导致该工具触发多次全量同步向Redis写入脏数据, 工具自身做了保护, 在第二次触发全量同步时会报错退出.


Loading

0 comments on commit e744786

Please sign in to comment.