Skip to content

Commit

Permalink
Feature: migrate-tools support pila3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-spild committed Dec 18, 2024
1 parent cd92a46 commit 1df434c
Show file tree
Hide file tree
Showing 13 changed files with 1,149 additions and 0 deletions.
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ sync-window-size : 9000
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456

###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd :

sync-batch-num : 100
redis-sender-num : 10

#######################################################################E#######
#! Critical Settings !#
Expand Down
66 changes: 66 additions & 0 deletions include/migrator_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef MIGRATOR_THREAD_H_
#define MIGRATOR_THREAD_H_

#include <iostream>
#include <mutex>

#include "storage/storage.h"
#include "net/include/redis_cli.h"

#include "include/pika_sender.h"

class MigratorThread : public net::Thread {
public:
MigratorThread(std::shared_ptr<storage::Storage> storage_, std::vector<PikaSender *> *senders, int type, int thread_num) :
storage_(storage_),
should_exit_(false),
senders_(senders),
type_(type),
thread_num_(thread_num),
thread_index_(0),
num_(0) {
}

virtual ~ MigratorThread();

int64_t num() {
std::lock_guard<std::mutex> l(num_mutex_);
return num_;
}

void Stop() {
should_exit_ = true;
}

private:
void PlusNum() {
std::lock_guard<std::mutex> l(num_mutex_);
++num_;
}

void DispatchKey(const std::string &command, const std::string& key = "");

void MigrateDB();
void MigrateStringsDB();
void MigrateListsDB();
void MigrateHashesDB();
void MigrateSetsDB();
void MigrateZsetsDB();

virtual void *ThreadMain();

private:
std::shared_ptr<storage::Storage> storage_;
bool should_exit_;

std::vector<PikaSender *> *senders_;
int type_;
int thread_num_;
int thread_index_;

int64_t num_;
std::mutex num_mutex_;
};

#endif

15 changes: 15 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }

std::string target_redis_host() { return target_redis_host_; }
int target_redis_port() { return target_redis_port_; }
std::string target_redis_pwd() { return target_redis_pwd_; }
int sync_batch_num() { return sync_batch_num_; }
int redis_sender_num() { return redis_sender_num_; }


int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
Expand Down Expand Up @@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf {
std::map<std::string, std::string> diff_commands_;
void TryPushDiffCommands(const std::string& command, const std::string& value);

// migrate configure items
std::string target_redis_host_;
int target_redis_port_;
std::string target_redis_pwd_;
int sync_batch_num_;
int redis_sender_num_;

//
// Critical configure items
//
Expand Down
43 changes: 43 additions & 0 deletions include/pika_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PIKA_SENDER_H_
#define PIKA_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class PikaSender : public net::Thread {
public:
PikaSender(std::string ip, int64_t port, std::string password);
virtual ~PikaSender();
void LoadKey(const std::string &cmd);
void Set_should_stop();
bool Should_stop() { return should_exit_.load(); }

int64_t elements() { return elements_; }

void SendCommand(std::string &command, const std::string &key);
int QueueSize();
void ConnectRedis();

private:
net::NetCli *cli_;
pstd::CondVar wsignal_;
pstd::CondVar rsignal_;
std::mutex keys_mutex_;
std::queue<std::string> keys_queue_;
std::string ip_;
int port_;
std::string password_;
std::atomic<bool> should_exit_;
int64_t elements_;

virtual void *ThreadMain();
};

#endif
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "include/pika_statistic.h"
#include "include/pika_transaction.h"
#include "include/rsync_server.h"
#include "include/redis_sender.h"

extern std::unique_ptr<PikaConf> g_pika_conf;

Expand Down Expand Up @@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable {

pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);

/*
* migrate used
*/
int SendRedisCommand(const std::string& command, const std::string& key);
void RetransmitData(const std::string& path);

// info debug use
void ServerStatus(std::string* info);

Expand Down Expand Up @@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;

/*
* migrate to redis used
*/
std::vector<std::unique_ptr<RedisSender>> redis_senders_;

/*
* Async slotsMgrt use
*/
Expand Down
47 changes: 47 additions & 0 deletions include/redis_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef REDIS_SENDER_H_
#define REDIS_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "pika_repl_bgworker.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class RedisSender : public net::Thread {
public:
RedisSender(int id, std::string ip, int64_t port, std::string password);
virtual ~RedisSender();
void Stop(void);
int64_t elements() {
return elements_;
}

void SendRedisCommand(const std::string &command);

private:
int SendCommand(std::string &command);
void ConnectRedis();

private:
int id_;
net::NetCli *cli_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
pstd::Mutex commands_mutex_;
std::queue<std::string> commands_queue_;
std::string ip_;
int port_;
std::string password_;
bool should_exit_;
int32_t cnt_;
int64_t elements_;
std::atomic<time_t> last_write_time_;

virtual void *ThreadMain();
};

#endif
Loading

0 comments on commit 1df434c

Please sign in to comment.