Skip to content

Commit

Permalink
Merge branch 'unstable' into feature/config_log_print
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/pika_admin.cc
  • Loading branch information
cheniujh committed Dec 12, 2024
2 parents ec74fe3 + 7fcb916 commit 2f40971
Show file tree
Hide file tree
Showing 27 changed files with 543 additions and 33 deletions.
9 changes: 7 additions & 2 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,17 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);

// Bit Commands
rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("write-buffer-size", std::to_string(value));
write_buffer_size_ = value;
}
void SetLogRetentionTime(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("log-retention-time", std::to_string(value));
log_retention_time_ = value;
}
void SetMinWriteBufferNumberToMerge(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("min-write-buffer-number-to-merge", std::to_string(value));
Expand Down
4 changes: 4 additions & 0 deletions include/pika_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopmaxCmd(*this); }

private:
Expand All @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopminCmd(*this); }

private:
Expand Down
2 changes: 2 additions & 0 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class RedisCache {
std::vector<std::string> *members);
Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZRemrangebylex(std::string& key, std::string &min, std::string &max);
Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

// Bit Commands
Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
78 changes: 78 additions & 0 deletions src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,83 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin
return Status::OK();
}


Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = 0; i < to_return; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = items_size - to_return; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = items_size - 1; i >= 0; --i) {
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}

DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

} // namespace cache
/* EOF */
32 changes: 32 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->port());
}

if (pstd::stringmatch(pattern.data(), "log-retention-time", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-retention-time");
EncodeNumber(&config_body, g_pika_conf->log_retention_time());
}

if (pstd::stringmatch(pattern.data(), "logging-mode", 1) != 0) {
elements += 2;
EncodeString(&config_body, "logging-mode");
Expand Down Expand Up @@ -1816,6 +1822,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->max_background_compactions());
}

if (pstd::stringmatch(pattern.data(), "max-subcompactions", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-subcompactions");
EncodeNumber(&config_body, g_pika_conf->max_subcompactions());
}

if (pstd::stringmatch(pattern.data(), "max-background-jobs", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-background-jobs");
Expand Down Expand Up @@ -2346,6 +2358,13 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetTimeout(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "log-retention-time") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'log-retention-time'\r\n");
return;
}
g_pika_conf->SetLogRetentionTime(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "requirepass") {
g_pika_conf->SetRequirePass(value);
g_pika_server->Acl()->UpdateDefaultUserPassword(value);
Expand Down Expand Up @@ -2687,6 +2706,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetMaxBackgroudCompactions(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-subcompactions") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-subcompactions'\r\n");
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_subcompactions", value}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map);
if (!s.ok()) {
res_.AppendStringRaw("-ERR Set max_subcompactions wrong: " + s.ToString() + "\r\n");
return;
}
g_pika_conf->SetMaxSubcompactions(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "rocksdb-periodic-second") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rocksdb-periodic-second'\r\n");
Expand Down
31 changes: 31 additions & 0 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,37 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string
}
}

Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}

Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}


/*-----------------------------------------------------------------------------
* Bit Commands
*----------------------------------------------------------------------------*/
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr)));
////ZPopmax
std::unique_ptr<Cmd> zpopmaxptr = std::make_unique<ZPopmaxCmd>(
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmax, std::move(zpopmaxptr)));
////ZPopmin
std::unique_ptr<Cmd> zpopminptr = std::make_unique<ZPopminCmd>(
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmin, std::move(zpopminptr)));

// Set
Expand Down
22 changes: 22 additions & 0 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,17 @@ void ZPopmaxCmd::Do() {
}
}

void ZPopmaxCmd::DoThroughDB(){
Do();
}

void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin);
Expand All @@ -1523,6 +1534,17 @@ void ZPopminCmd::DoInitial() {
}
}

void ZPopminCmd::DoThroughDB(){
Do();
}

void ZPopminCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::Do() {
std::vector<storage::ScoreMember> score_members;
rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members);
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/options_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ inline int offset_of(T1 T2::*member) {
static std::unordered_map<std::string, MemberTypeInfo> mutable_db_options_member_type_info = {
{"max_background_jobs", {offsetof(struct rocksdb::DBOptions, max_background_jobs), MemberType::kInt}},
{"max_background_compactions", {offsetof(struct rocksdb::DBOptions, max_background_compactions), MemberType::kInt}},
{"max_subcompactions", {offsetof(struct rocksdb::DBOptions, max_subcompactions), MemberType::kInt}},
// {"base_background_compactions", {offsetof(struct rocksdb::DBOptions, base_background_compactions),
// MemberType::kInt}},
{"max_open_files", {offsetof(struct rocksdb::DBOptions, max_open_files), MemberType::kInt}},
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/zset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,38 @@ var _ = Describe("Zset Commands", func() {
}}))
})

It("should Zpopmin test", func() {
err := client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 1,
Member: "m1",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 3,
Member: "m3",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 4,
Member: "m4",
}).Err()
Expect(err).NotTo(HaveOccurred())

max, err := client.ZPopMax(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(max).To(Equal([]redis.Z{{Score: 4, Member: "m4"}}))

min, err := client.ZPopMin(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(min).To(Equal([]redis.Z{{Score: 1, Member: "m1"}}))

rangeResult, err := client.ZRange(ctx, "zpopzset1", 0, -1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(rangeResult).To(Equal([]string{"m3"}))
})

It("should ZRemRangeByRank", func() {
err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err()
Expect(err).NotTo(HaveOccurred())
Expand Down
9 changes: 9 additions & 0 deletions tools/kubeblocks_helm/BackupRepo_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kbcli backuprepo create my-repo \
--provider s3 \
--region ${S3_REGION} \
--bucket ${S3_BUCKET} \
--endpoint ${S3_ENDPOINT} \
--access-key-id ${S3_ACCESS_KEY} \
--secret-access-key ${S3_SECRET_KEY} \
--access-method ${S3_ACCESS_METHOD} \
--default
15 changes: 15 additions & 0 deletions tools/kubeblocks_helm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,19 @@ redis-cli -p 9221
```bash
helm uninstall pika-master-slave-cluster
helm uninstall pika-master-slave
```

### Back up and restore a cluster
Ensure that the default BackupRepo is defined
Fellow the kubeblock docs [kubeblocks](https://www.kubeblocks.io/docs/preview/user_docs/maintenance/backup-and-restore/backup/backup-repo)

create backup
```bash
kbcli cluster backup pika-master-slave-cluster --method datafile
```

Select a backup and create a cluster.

```bash
kbcli cluster restore <clusterName> --backup <backup-name>
```
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
helm uninstall pika && helm uninstall pika-cluster
sleep 5
helm install pika ./pika && helm install pika-cluster ./pika-cluster
1 change: 1 addition & 0 deletions tools/kubeblocks_helm/install_ms.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
helm install pika-master-slave ./pika-master-slave && helm install pika-master-slave-cluster ./pika-master-slave-cluster
4 changes: 0 additions & 4 deletions tools/kubeblocks_helm/installdebug.sh

This file was deleted.

Loading

0 comments on commit 2f40971

Please sign in to comment.