Skip to content

Commit

Permalink
chore: hset family replies and trim
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Sep 22, 2024
1 parent ed21867 commit f628bca
Showing 1 changed file with 29 additions and 47 deletions.
76 changes: 29 additions & 47 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
auto& db_slice = op_args.GetDbSlice();
auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(op_res);
if (!op_res) {
return op_res.status();
}

auto& add_res = *op_res;

DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
Expand Down Expand Up @@ -346,9 +344,7 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList valu

auto& db_slice = op_args.GetDbSlice();
auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH);

if (!it_res)
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

PrimeValue& pv = it_res->it->second;
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);
Expand Down Expand Up @@ -411,9 +407,7 @@ OpResult<vector<OptStr>> OpHMGet(const OpArgs& op_args, std::string_view key, Cm

auto& db_slice = op_args.GetDbSlice();
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (!it_res)
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

const PrimeValue& pv = (*it_res)->second;

Expand Down Expand Up @@ -506,8 +500,7 @@ OpResult<int> OpExist(const OpArgs& op_args, string_view key, string_view field)
OpResult<string> OpGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.GetDbSlice();
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);

const PrimeValue& pv = (*it_res)->second;
void* ptr = pv.RObjPtr();
Expand Down Expand Up @@ -706,6 +699,23 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
return created;
}

struct HSetReplies {
HSetReplies(ConnectionContext* cntx)
: rb{static_cast<RedisReplyBuilder*>(cntx->reply_builder())} {
DCHECK(dynamic_cast<RedisReplyBuilder*>(cntx->reply_builder()));
}

template <typename T> void Send(OpResult<T> res) {
static_assert(is_integral_v<T>);
if (res)
rb->SendLong(*res);
else
rb->SendError(res.status());
}

RedisReplyBuilder* rb;
};

void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask) {
string_view key = ArgS(args, 0);

Expand Down Expand Up @@ -753,12 +763,7 @@ void HSetEx(CmdArgList args, ConnectionContext* cntx) {
return OpSet(t->GetOpArgs(shard), key, fields, op_sp);
};

OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
HSetReplies{cntx}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

} // namespace
Expand All @@ -772,24 +777,16 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
};

OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
if (result.status() == OpStatus::KEY_NOTFOUND)
result = OpResult<uint32_t>(0);
HSetReplies{cntx}.Send(result);
}

void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);

auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };

OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
HSetReplies{cntx}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -800,12 +797,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
return OpExist(t->GetOpArgs(shard), key, field);
};

OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
HSetReplies{cntx}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -1016,12 +1008,7 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true});
};

OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
HSetReplies{cntx}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -1032,12 +1019,7 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
return OpStrLen(t->GetOpArgs(shard), key, field);
};

OpResult<size_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
} else {
cntx->SendError(result.status());
}
HSetReplies{cntx}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {
Expand Down

0 comments on commit f628bca

Please sign in to comment.