Skip to content

Commit

Permalink
chore: code cutting string family
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav <[email protected]>
  • Loading branch information
dranikpg committed Sep 19, 2024
1 parent ed21867 commit f7f06af
Showing 1 changed file with 24 additions and 50 deletions.
74 changes: 24 additions & 50 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,13 @@ size_t ExtendExisting(DbSlice::Iterator it, string_view key, string_view val, bo
return new_val.size();
}

OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) {
bool ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) {
auto& db_slice = op_args.GetDbSlice();
auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STRING);
if (!it_res) {
return false;
}
if (it_res)
ExtendExisting(it_res->it, key, val, prepend);

return ExtendExisting(it_res->it, key, val, prepend);
return it_res;
}

OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val) {
Expand Down Expand Up @@ -589,8 +588,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value

if (params.IsConditionalSet()) {
auto find_res = db_slice.FindMutable(op_args_.db_cntx, key);
if (auto status = CachePrevIfNeeded(params, find_res.it); status != OpStatus::OK)
return status;
RETURN_ON_BAD_STATUS(CachePrevIfNeeded(params, find_res.it));

if (params.flags & SET_IF_EXISTS) {
if (IsValid(find_res.it)) {
Expand All @@ -610,9 +608,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
RETURN_ON_BAD_STATUS(op_res);

if (!op_res->is_new) {
if (auto status = CachePrevIfNeeded(params, op_res->it); status != OpStatus::OK)
return status;

RETURN_ON_BAD_STATUS(CachePrevIfNeeded(params, op_res->it));
return SetExisting(params, op_res->it, op_res->exp_it, key, value);
} else {
AddNew(params, op_res->it, op_res->exp_it, key, value);
Expand Down Expand Up @@ -886,8 +882,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

return StringValue::Read(tx->GetDbIndex(), key, (*it_res)->second, es);
};
Expand All @@ -899,8 +894,7 @@ void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
auto& db_slice = tx->GetDbSlice(es->shard_id());
auto it_res = db_slice.FindMutable(tx->GetDbContext(), key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

auto value = StringValue::Read(tx->GetDbIndex(), key, it_res->it->second, es);
it_res->post_updater.Run(); // Run manually before delete
Expand Down Expand Up @@ -956,12 +950,11 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
return ExtendOrSkip(t->GetOpArgs(shard), key, value, prepend);
};

OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
bool result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
SinkReplyBuilder* builder = cntx->reply_builder();

if (result.value_or(false)) {
if (result)
return builder->SendStored();
}

builder->SendSetSkipped();
}
Expand Down Expand Up @@ -1004,10 +997,8 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {

auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringValue> {
auto op_args = t->GetOpArgs(shard);

auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_STRING);
if (!it_res)
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

StringValue value = StringValue::Read(t->GetDbIndex(), key, it_res->it->second, shard);

Expand Down Expand Up @@ -1041,37 +1032,29 @@ void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) {

void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view sval = ArgS(args, 1);
int64_t val;

if (!absl::SimpleAtoi(sval, &val)) {
if (!absl::SimpleAtoi(ArgS(args, 1), &val))
return cntx->SendError(kInvalidIntErr);
}

return IncrByGeneric(key, val, cntx);
}

void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view sval = ArgS(args, 1);
double val;

if (!absl::SimpleAtod(sval, &val)) {
if (!absl::SimpleAtod(ArgS(args, 1), &val))
return cntx->SendError(kInvalidFloatErr);
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpIncrFloat(t->GetOpArgs(shard), key, val);
};

OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* builder = (RedisReplyBuilder*)cntx->reply_builder();

DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
if (!result) {
return cntx->SendError(result.status());
}
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(cb);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (!result)
return rb->SendError(result.status());

builder->SendDouble(result.value());
rb->SendDouble(*result);
}

void StringFamily::Decr(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -1118,10 +1101,10 @@ void StringFamily::IncrByGeneric(string_view key, int64_t val, ConnectionContext
cntx->SendError(kIncrOverflow);
break;
case OpStatus::KEY_NOTFOUND: // Relevant only for MC
reinterpret_cast<MCReplyBuilder*>(builder)->SendNotFound();
static_cast<MCReplyBuilder*>(builder)->SendNotFound();
break;
default:
reinterpret_cast<RedisReplyBuilder*>(builder)->SendError(result.status());
static_cast<RedisReplyBuilder*>(builder)->SendError(result.status());
break;
}
}
Expand Down Expand Up @@ -1218,13 +1201,8 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
Transaction* transaction = cntx->transaction;

if (VLOG_IS_ON(2)) {
string str;
for (size_t i = 1; i < args.size(); ++i) {
absl::StrAppend(&str, " ", ArgS(args, i));
}
LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str;
}
VLOG(2) << "MSET/" << transaction->GetUniqueShardCnt()
<< " " << absl::StrJoin(facade::ArgRange(args), " ");

AggregateStatus result;
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand All @@ -1237,11 +1215,7 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
if (auto status = transaction->ScheduleSingleHop(std::move(cb)); status != OpStatus::OK)
result = status;

if (*result == OpStatus::OK) {
cntx->SendOk();
} else {
cntx->SendError(*result);
}
cntx->SendError(*result);
}

void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
Expand Down

0 comments on commit f7f06af

Please sign in to comment.