Skip to content

Commit

Permalink
Merge branch 'unstable' into slave
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Dec 15, 2024
2 parents 917050d + 0906644 commit 2d3f35a
Show file tree
Hide file tree
Showing 30 changed files with 570 additions and 39 deletions.
1 change: 0 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ timeout : 60

# The [password of administrator], which is empty by default.
# [NOTICE] If this admin password is the same as user password (including both being empty),
# the value of userpass will be ignored and all users are considered as administrators,
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
# PS: "user password" refers to value of the parameter below: userpass.
requirepass :
Expand Down
9 changes: 7 additions & 2 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,17 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);

// Bit Commands
rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
6 changes: 6 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("write-buffer-size", std::to_string(value));
write_buffer_size_ = value;
}
void SetLogRetentionTime(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("log-retention-time", std::to_string(value));
log_retention_time_ = value;
}
void SetMinWriteBufferNumberToMerge(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("min-write-buffer-number-to-merge", std::to_string(value));
Expand Down Expand Up @@ -933,6 +938,7 @@ class PikaConf : public pstd::BaseConf {
int cache_lfu_decay_time() { return cache_lfu_decay_time_; }
int Load();
int ConfigRewrite();
int ConfigRewriteSlaveOf();
int ConfigRewriteReplicationID();

private:
Expand Down
4 changes: 4 additions & 0 deletions include/pika_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopmaxCmd(*this); }

private:
Expand All @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopminCmd(*this); }

private:
Expand Down
6 changes: 1 addition & 5 deletions src/acl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,7 @@ void Acl::UpdateDefaultUserPassword(const std::string& pass) {
if (pass.empty()) {
u->SetUser("nopass");
} else {
if (g_pika_conf->userpass().empty()) {
u->SetUser("nopass");
} else {
u->SetUser(">" + pass);
}
u->SetUser(">" + pass);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class RedisCache {
std::vector<std::string> *members);
Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZRemrangebylex(std::string& key, std::string &min, std::string &max);
Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

// Bit Commands
Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
78 changes: 78 additions & 0 deletions src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,83 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin
return Status::OK();
}


Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = 0; i < to_return; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = items_size - to_return; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = items_size - 1; i >= 0; --i) {
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}

DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

} // namespace cache
/* EOF */
33 changes: 33 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void SlaveofCmd::Do() {
if (is_none_) {
res_.SetRes(CmdRes::kOk);
g_pika_conf->SetSlaveof(std::string());
g_pika_conf->ConfigRewriteSlaveOf();
return;
}

Expand Down Expand Up @@ -1590,6 +1591,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->port());
}

if (pstd::stringmatch(pattern.data(), "log-retention-time", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-retention-time");
EncodeNumber(&config_body, g_pika_conf->log_retention_time());
}

if (pstd::stringmatch(pattern.data(), "thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "thread-num");
Expand Down Expand Up @@ -1809,6 +1816,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->max_background_compactions());
}

if (pstd::stringmatch(pattern.data(), "max-subcompactions", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-subcompactions");
EncodeNumber(&config_body, g_pika_conf->max_subcompactions());
}

if (pstd::stringmatch(pattern.data(), "max-background-jobs", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-background-jobs");
Expand Down Expand Up @@ -2345,6 +2358,13 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetTimeout(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "log-retention-time") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'log-retention-time'\r\n");
return;
}
g_pika_conf->SetLogRetentionTime(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "requirepass") {
g_pika_conf->SetRequirePass(value);
g_pika_server->Acl()->UpdateDefaultUserPassword(value);
Expand Down Expand Up @@ -2677,6 +2697,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetMaxBackgroudCompactions(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-subcompactions") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-subcompactions'\r\n");
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_subcompactions", value}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map);
if (!s.ok()) {
res_.AppendStringRaw("-ERR Set max_subcompactions wrong: " + s.ToString() + "\r\n");
return;
}
g_pika_conf->SetMaxSubcompactions(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "rocksdb-periodic-second") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-periodic-second'\r\n");
Expand Down
31 changes: 31 additions & 0 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,37 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string
}
}

Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}

Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}


/*-----------------------------------------------------------------------------
* Bit Commands
*----------------------------------------------------------------------------*/
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr)));
////ZPopmax
std::unique_ptr<Cmd> zpopmaxptr = std::make_unique<ZPopmaxCmd>(
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmax, std::move(zpopmaxptr)));
////ZPopmin
std::unique_ptr<Cmd> zpopminptr = std::make_unique<ZPopminCmd>(
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmin, std::move(zpopminptr)));

// Set
Expand Down
24 changes: 24 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,30 @@ int PikaConf::ConfigRewrite() {
return static_cast<int>(WriteBack());
}

int PikaConf::ConfigRewriteSlaveOf() {
std::lock_guard l(rwlock_);
SetConfStr("slaveof", slaveof_);
if (!diff_commands_.empty()) {
std::vector<pstd::BaseConf::Rep::ConfItem> filtered_items;
for (const auto& diff_command : diff_commands_) {
if (!diff_command.second.empty()) {
pstd::BaseConf::Rep::ConfItem item(pstd::BaseConf::Rep::kConf, diff_command.first, diff_command.second);
filtered_items.push_back(item);
}
}
if (!filtered_items.empty()) {
pstd::BaseConf::Rep::ConfItem comment_item(pstd::BaseConf::Rep::kComment,
"# Generated by ReplicationID CONFIG REWRITE\n");
PushConfItem(comment_item);
for (const auto& item : filtered_items) {
PushConfItem(item);
}
}
diff_commands_.clear();
}
return static_cast<int>(WriteBack());
}

int PikaConf::ConfigRewriteReplicationID() {
std::lock_guard l(rwlock_);
SetConfStr("replication-id", replication_id_);
Expand Down
22 changes: 22 additions & 0 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,17 @@ void ZPopmaxCmd::Do() {
}
}

void ZPopmaxCmd::DoThroughDB(){
Do();
}

void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin);
Expand All @@ -1523,6 +1534,17 @@ void ZPopminCmd::DoInitial() {
}
}

void ZPopminCmd::DoThroughDB(){
Do();
}

void ZPopminCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::Do() {
std::vector<storage::ScoreMember> score_members;
rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members);
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/options_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ inline int offset_of(T1 T2::*member) {
static std::unordered_map<std::string, MemberTypeInfo> mutable_db_options_member_type_info = {
{"max_background_jobs", {offsetof(struct rocksdb::DBOptions, max_background_jobs), MemberType::kInt}},
{"max_background_compactions", {offsetof(struct rocksdb::DBOptions, max_background_compactions), MemberType::kInt}},
{"max_subcompactions", {offsetof(struct rocksdb::DBOptions, max_subcompactions), MemberType::kInt}},
// {"base_background_compactions", {offsetof(struct rocksdb::DBOptions, base_background_compactions),
// MemberType::kInt}},
{"max_open_files", {offsetof(struct rocksdb::DBOptions, max_open_files), MemberType::kInt}},
Expand Down
Loading

0 comments on commit 2d3f35a

Please sign in to comment.