Skip to content

Commit

Permalink
add dense_set.SetExpiryTime in preparation for fieldexpire
Browse files Browse the repository at this point in the history
  • Loading branch information
NegatioN committed Sep 24, 2024
1 parent 5819755 commit 125add6
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 18 deletions.
26 changes: 26 additions & 0 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ void DenseSet::Grow(size_t prev_size) {
}
}

auto DenseSet::FindDense(void* ptr) -> DensePtr* {
uint64_t hc = Hash(ptr, 0);
uint32_t bucket_id = BucketId(hc);
DensePtr* dptr = Find(ptr, bucket_id, 0).second;
return dptr;
}

auto DenseSet::AddOrFindDense(void* ptr, bool has_ttl) -> DensePtr* {
uint64_t hc = Hash(ptr, 0);

Expand Down Expand Up @@ -544,6 +551,25 @@ void* DenseSet::PopInternal() {
return ret;
}

void* DenseSet::ReplaceObj(void* obj, bool has_ttl) {
DensePtr* ptr = FindDense(obj);
if (!ptr)
return nullptr;

if (ptr->IsLink()) {
ptr = ptr->AsLink();
}

void* res = ptr->Raw();
obj_malloc_used_ -= ObjectAllocSize(res);
obj_malloc_used_ += ObjectAllocSize(obj);

ptr->SetObject(obj);
ptr->SetTtl(has_ttl);

return res;
}

void* DenseSet::AddOrReplaceObj(void* obj, bool has_ttl) {
DensePtr* ptr = AddOrFindDense(obj, has_ttl);
if (!ptr)
Expand Down
15 changes: 15 additions & 0 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ class DenseSet {
return curr_entry_->HasTtl() ? owner_->ObjExpireTime(curr_entry_->GetObject()) : UINT32_MAX;
}

void SetExpiryTime(uint32_t ttl_sec) {
if (HasExpiry()) {
owner_->ObjUpdateExpireTime(curr_entry_->GetObject(), ttl_sec);
} else {
owner_->ObjReplace(curr_entry_->GetObject(), ttl_sec);
}
}

bool HasExpiry() const {
return curr_entry_->HasTtl();
}
Expand Down Expand Up @@ -263,6 +271,8 @@ class DenseSet {
virtual bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const = 0;
virtual size_t ObjectAllocSize(const void* obj) const = 0;
virtual uint32_t ObjExpireTime(const void* obj) const = 0;
virtual uint32_t ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) = 0;
virtual uint32_t ObjReplace(const void* obj, uint32_t ttl_sec) = 0;
virtual void ObjDelete(void* obj, bool has_ttl) const = 0;

void CollectExpired();
Expand Down Expand Up @@ -314,6 +324,7 @@ class DenseSet {
// Returns the previous object if it has been replaced.
// nullptr, if obj was added.
void* AddOrReplaceObj(void* obj, bool has_ttl);
void* ReplaceObj(void* obj, bool has_ttl);

// Assumes that the object does not exist in the set.
void AddUnique(void* obj, bool has_ttl, uint64_t hashcode);
Expand Down Expand Up @@ -356,6 +367,10 @@ class DenseSet {
// Returns null if obj was added.
DensePtr* AddOrFindDense(void* obj, bool has_ttl);

// Returns DensePtr if the object with such key exists,
// Returns null if key does not exist.
DensePtr* FindDense(void* obj);

// ============ Pseudo Linked List in DenseSet end ==================

// returns (prev, item) pair. If item is root, then prev is null.
Expand Down
10 changes: 10 additions & 0 deletions src/core/score_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ uint32_t ScoreMap::ObjExpireTime(const void* obj) const {
return UINT32_MAX;
}

uint32_t ScoreMap::ObjReplace(const void* obj, uint32_t ttl_sec) {
// Should not reach.
return UINT32_MAX;
}

uint32_t ScoreMap::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
// Should not reach.
return UINT32_MAX;
}

void ScoreMap::ObjDelete(void* obj, bool has_ttl) const {
sds s1 = (sds)obj;
sdsfree(s1);
Expand Down
2 changes: 2 additions & 0 deletions src/core/score_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class ScoreMap : public DenseSet {
bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const final;
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
uint32_t ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
uint32_t ObjReplace(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const final;
};

Expand Down
29 changes: 29 additions & 0 deletions src/core/string_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,35 @@ uint32_t StringMap::ObjExpireTime(const void* obj) const {
return UINT32_MAX;
}

uint32_t StringMap::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
sds str = (sds)obj;
char* valptr = str + sdslen(str) + 1;

uint32_t at = time_now() + ttl_sec;
uint64_t val = absl::little_endian::Load64(valptr);

absl::little_endian::Store32(valptr + 8, at);
// TODO
return 1;
}

uint32_t StringMap::ObjReplace(const void* obj, uint32_t ttl_sec) {
sds str = (sds)obj;
char* valptr = str + sdslen(str) + 1;

uint32_t at = time_now() + ttl_sec;
uint64_t val = absl::little_endian::Load64(valptr);

auto pair = detail::SdsPair(str, GetValue(str));
auto [newkey, sdsval_tag] = CreateEntry(pair->first, pair->second, time_now(), ttl_sec);

sds prev_entry = (sds)ReplaceObj(newkey, sdsval_tag & kValTtlBit);
if (prev_entry) {
ObjDelete(prev_entry, false);
}
return 1;
}

void StringMap::ObjDelete(void* obj, bool has_ttl) const {
sds s1 = (sds)obj;
sds value = GetValue(s1);
Expand Down
5 changes: 4 additions & 1 deletion src/core/string_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class StringMap : public DenseSet {

using IteratorBase::ExpiryTime;
using IteratorBase::HasExpiry;
using IteratorBase::SetExpiryTime;
};

// Returns true if field was added
Expand All @@ -114,7 +115,7 @@ class StringMap : public DenseSet {

bool Contains(std::string_view s1) const;

/// @brief Returns value of the key or nullptr if key not found.
/// @brief Returns value of the key or an empty iterator if key not found.
/// @param key
/// @return sds
iterator Find(std::string_view member) {
Expand Down Expand Up @@ -157,6 +158,8 @@ class StringMap : public DenseSet {
bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const final;
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
uint32_t ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
uint32_t ObjReplace(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const final;
};

Expand Down
19 changes: 19 additions & 0 deletions src/core/string_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,25 @@ TEST_F(StringMapTest, IterateExpired) {
EXPECT_EQ(it, sm_->end());
}

TEST_F(StringMapTest, SetFieldExpireHasExpiry) {
EXPECT_TRUE(sm_->AddOrUpdate("k1", "v1", 5));
auto k = sm_->Find("k1");
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 5);
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

TEST_F(StringMapTest, SetFieldExpireNoHasExpiry) {
EXPECT_TRUE(sm_->AddOrUpdate("k1", "v1"));
auto k = sm_->Find("k1");
EXPECT_FALSE(k.HasExpiry());
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

unsigned total_wasted_memory = 0;

TEST_F(StringMapTest, ReallocIfNeeded) {
Expand Down
71 changes: 54 additions & 17 deletions src/core/string_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ namespace dfly {

namespace {

constexpr uint64_t kValTtlBit = 1ULL << 63;

inline bool MayHaveTtl(sds s) {
char* alloc_ptr = (char*)sdsAllocPtr(s);
return sdslen(s) + 1 + 4 <= zmalloc_usable_size(alloc_ptr);
}

sds AllocImmutableWithTtl(uint32_t len, uint32_t at) {
sds res = AllocSdsWithSpace(len, sizeof(at));
absl::little_endian::Store32(res + len + 1, at);
absl::little_endian::Store32(res + len + 1, at); // Save TTL

return res;
}
Expand All @@ -43,22 +45,8 @@ bool StringSet::AddSds(sds s1) {
}

bool StringSet::Add(string_view src, uint32_t ttl_sec) {
DCHECK_GT(ttl_sec, 0u); // ttl_sec == 0 would mean find and delete immediately

sds newsds = nullptr;
bool has_ttl = false;

if (ttl_sec == UINT32_MAX) {
newsds = sdsnewlen(src.data(), src.size());
} else {
uint32_t at = time_now() + ttl_sec;
DCHECK_LT(time_now(), at);

newsds = AllocImmutableWithTtl(src.size(), at);
if (!src.empty())
memcpy(newsds, src.data(), src.size());
has_ttl = true;
}
sds newsds = MakeSdsWithTtl(src, ttl_sec);
bool has_ttl = ttl_sec == UINT32_MAX ? false : true;

if (AddOrFindObj(newsds, has_ttl) != nullptr) {
sdsfree(newsds);
Expand All @@ -68,6 +56,21 @@ bool StringSet::Add(string_view src, uint32_t ttl_sec) {
return true;
}

bool StringSet::Replace(string_view src, uint32_t ttl_sec) {
sds newsds = MakeSdsWithTtl(src, ttl_sec);
bool has_ttl = ttl_sec == UINT32_MAX ? false : true;

sds prev_entry = (sds)ReplaceObj(newsds, has_ttl);
if (prev_entry) {
sdsfree(prev_entry);
return true;
} else {
// No previous entry found, so we need to free the new entry.
sdsfree(newsds);
return false;
}
}

std::optional<std::string> StringSet::Pop() {
sds str = (sds)PopInternal();

Expand Down Expand Up @@ -129,8 +132,42 @@ uint32_t StringSet::ObjExpireTime(const void* str) const {
return absl::little_endian::Load32(ttlptr);
}

uint32_t StringSet::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
DCHECK_GT(ttl_sec, 0u); // ttl_sec == 0 would mean find and delete immediately
sds str = (sds)obj;
char* valptr = str + sdslen(str) + 1;

uint32_t at = time_now() + ttl_sec;
absl::little_endian::Store32(valptr, at);
return 1;
}

uint32_t StringSet::ObjReplace(const void* obj, uint32_t ttl_sec) {
sds str = (sds)obj;
Replace(str, ttl_sec);
return 1;
}

void StringSet::ObjDelete(void* obj, bool has_ttl) const {
sdsfree((sds)obj);
}

sds StringSet::MakeSdsWithTtl(string_view src, uint32_t ttl_sec) {
DCHECK_GT(ttl_sec, 0u); // ttl_sec == 0 would mean find and delete immediately
sds newsds = nullptr;

if (ttl_sec == UINT32_MAX) {
newsds = sdsnewlen(src.data(), src.size());
} else {
uint32_t at = time_now() + ttl_sec;
DCHECK_LT(time_now(), at);

newsds = AllocImmutableWithTtl(src.size(), at);
if (!src.empty())
memcpy(newsds, src.data(), src.size());
}

return newsds;
}

} // namespace dfly
5 changes: 5 additions & 0 deletions src/core/string_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class StringSet : public DenseSet {

// Returns true if elem was added.
bool Add(std::string_view s1, uint32_t ttl_sec = UINT32_MAX);
bool Replace(std::string_view s1, uint32_t ttl_sec = UINT32_MAX);

// Used currently by rdb_load. Returns true if elem was added.
bool AddSds(sds elem);
Expand Down Expand Up @@ -88,6 +89,7 @@ class StringSet : public DenseSet {

using IteratorBase::ExpiryTime;
using IteratorBase::HasExpiry;
using IteratorBase::SetExpiryTime;
};

iterator begin() {
Expand All @@ -111,7 +113,10 @@ class StringSet : public DenseSet {

size_t ObjectAllocSize(const void* s1) const override;
uint32_t ObjExpireTime(const void* obj) const override;
uint32_t ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
uint32_t ObjReplace(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const override;
sds MakeSdsWithTtl(std::string_view src, uint32_t ttl_sec);
};

} // end namespace dfly
19 changes: 19 additions & 0 deletions src/core/string_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,25 @@ TEST_F(StringSetTest, Iteration) {
EXPECT_EQ(to_insert.size(), 0);
}

TEST_F(StringSetTest, SetFieldExpireHasExpiry) {
EXPECT_TRUE(ss_->Add("k1", 100));
auto k = ss_->Find("k1");
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 100);
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

TEST_F(StringSetTest, SetFieldExpireNoHasExpiry) {
EXPECT_TRUE(ss_->Add("k1"));
auto k = ss_->Find("k1");
EXPECT_FALSE(k.HasExpiry());
k.SetExpiryTime(10);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 10);
}

TEST_F(StringSetTest, Ttl) {
EXPECT_TRUE(ss_->Add("bla"sv, 1));
EXPECT_FALSE(ss_->Add("bla"sv, 1));
Expand Down

0 comments on commit 125add6

Please sign in to comment.