Skip to content

Commit

Permalink
Add to recover SSD cache space on server restart
Browse files Browse the repository at this point in the history
Summary:
Current SSD checkpoint recover doesn't restore the cache size in each region. This PR adds to
recover this with unit test

Differential Revision: D62482872
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 11, 2024
1 parent 7e4d626 commit da8d598
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 25 deletions.
29 changes: 19 additions & 10 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
return std::nullopt;
}
}
assert(!writableRegions_.empty());
VELOX_CHECK(!writableRegions_.empty());
const auto region = writableRegions_[0];
const auto offset = regionSizes_[region];
auto available = kRegionSize - offset;
Expand Down Expand Up @@ -651,8 +651,8 @@ bool SsdFile::removeFileEntries(
VELOX_SSD_CACHE_LOG(INFO)
<< "Removed " << entriesAgedOut << " entries from " << fileName_
<< ". And erased " << toFree.size() << " regions with "
<< kMaxErasedSizePct << "% entries removed.";

<< kMaxErasedSizePct << "% entries removed, and " << entries_.size()
<< " left.";
return true;
}

Expand Down Expand Up @@ -1021,14 +1021,17 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
checksum = readNumber<uint32_t>(state);
}
const auto run = SsdRun(fileBits, checksum);
const auto region = regionIndex(run.offset());
// Check that the recovered entry does not fall in an evicted region.
if (evictedMap.find(regionIndex(run.offset())) == evictedMap.end()) {
// The file may have a different id on restore.
auto it = idMap.find(fileNum);
VELOX_CHECK(it != idMap.end());
FileCacheKey key{it->second, offset};
entries_[std::move(key)] = run;
if (evictedMap.find(region) != evictedMap.end()) {
continue;
}
// The file may have a different id on restore.
auto it = idMap.find(fileNum);
VELOX_CHECK(it != idMap.end());
FileCacheKey key{it->second, offset};
entries_[std::move(key)] = run;
regionSizes_[region] += run.size();
}
++stats_.checkpointsRead;
stats_.entriesRecovered += entries_.size();
Expand All @@ -1042,10 +1045,16 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
writableRegions_.push_back(region);
}
tracker_.setRegionScores(scores);

uint64_t cachedBytes{0};
for (const auto regionSize : regionSizes_) {
cachedBytes += regionSize;
}
VELOX_SSD_CACHE_LOG(INFO) << fmt::format(
"Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}",
"Starting shard {} from checkpoint with {} entries, {} cached data, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}",
shardId_,
entries_.size(),
succinctBytes(cachedBytes),
numRegions_,
writableRegions_.size(),
checksumEnabled_ ? "enabled" : "disabled",
Expand Down
8 changes: 4 additions & 4 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ struct SsdCacheStats {
return result;
}

void clear() {
*this = SsdCacheStats();
}

/// Snapshot stats
tsan_atomic<uint64_t> entriesCached{0};
tsan_atomic<uint64_t> regionsCached{0};
Expand Down Expand Up @@ -399,10 +403,6 @@ class SsdFile {
return entries_;
}

SsdCacheStats testingStats() const {
return stats_;
}

bool testingChecksumReadVerificationEnabled() const {
return checksumReadVerificationEnabled_;
}
Expand Down
144 changes: 133 additions & 11 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ TEST_F(SsdFileTest, checkpoint) {
// Test removeFileEntries.
folly::F14FastSet<uint64_t> filesToRemove{fileName_.id()};
folly::F14FastSet<uint64_t> filesRetained{};
auto stats = ssdFile_->testingStats();
SsdCacheStats stats;
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.entriesAgedOut, 0);
EXPECT_EQ(stats.regionsAgedOut, 0);
EXPECT_EQ(stats.regionsEvicted, 0);
Expand All @@ -440,7 +441,8 @@ TEST_F(SsdFileTest, checkpoint) {
numEntriesFound = checkEntries(allEntries);
EXPECT_EQ(numEntriesFound, allEntries.size());
auto prevStats = stats;
stats = ssdFile_->testingStats();
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.entriesAgedOut - prevStats.entriesAgedOut, 0);
EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 0);
EXPECT_EQ(stats.regionsEvicted - prevStats.regionsEvicted, 0);
Expand All @@ -455,7 +457,8 @@ TEST_F(SsdFileTest, checkpoint) {
numEntriesFound = checkEntries(allEntries);
EXPECT_EQ(numEntriesFound, 0);
prevStats = stats;
stats = ssdFile_->testingStats();
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(
stats.entriesAgedOut - prevStats.entriesAgedOut, allEntries.size() - 16);
EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 16);
Expand Down Expand Up @@ -494,7 +497,9 @@ TEST_F(SsdFileTest, fileCorruption) {
populateCache(allEntries);
// All entries can be found.
EXPECT_EQ(checkEntries(allEntries), allEntries.size());
EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0);
SsdCacheStats stats;
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readSsdCorruptions, 0);

// Corrupt the SSD file, initialize the cache from checkpoint without read
// verification.
Expand All @@ -512,21 +517,29 @@ TEST_F(SsdFileTest, fileCorruption) {
initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true);
// Entries at the front are still loadable.
EXPECT_EQ(checkEntries({allEntries.begin(), allEntries.begin() + 100}), 100);
EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readSsdCorruptions, 0);
// The last 1/10 entries are corrupted and cannot be loaded.
VELOX_ASSERT_THROW(checkEntries(allEntries), "Corrupt SSD cache entry");
EXPECT_GT(ssdFile_->testingStats().readSsdCorruptions, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_GT(stats.readSsdCorruptions, 0);
// New entries can be written.
populateCache(allEntries);

// Corrupt the Checkpoint file. Cache cannot be recovered. All entries are
// lost.
ssdFile_->checkpoint(true);
corruptSsdFile(ssdFile_->getCheckpointFilePath());
EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readCheckpointErrors, 0);
initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true);
EXPECT_EQ(checkEntries(allEntries), 0);
EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 1);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readCheckpointErrors, 1);
// New entries can be written.
populateCache(allEntries);
}
Expand Down Expand Up @@ -603,14 +616,40 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) {
// All entries can be found.
EXPECT_EQ(checkEntries(allEntries), allEntries.size());

SsdCacheStats stats;
ssdFile_->updateStats(stats);
VELOX_CHECK_GT(stats.bytesCached, 0);
VELOX_CHECK_GT(stats.regionsCached, 0);
VELOX_CHECK_GT(stats.entriesCached, 0);

// Try reinitializing cache from checkpoint with read verification
// enabled/disabled.
ssdFile_->checkpoint(true);

SsdCacheStats statsAfterCheckpoint;
ssdFile_->updateStats(statsAfterCheckpoint);
ASSERT_EQ(statsAfterCheckpoint.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterCheckpoint.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterCheckpoint.entriesCached, stats.entriesCached);

initializeSsdFile(
kSsdSize,
checkpointIntervalBytes,
testData.writeEnabledOnRecovery,
testData.readVerificationEnabledOnRecovery);

SsdCacheStats statsAfterRecover;
ssdFile_->updateStats(statsAfterRecover);
if (testData.expectedCheckpointOnRecovery) {
ASSERT_EQ(statsAfterRecover.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecover.entriesCached, stats.entriesCached);
} else {
ASSERT_EQ(statsAfterRecover.bytesCached, 0);
ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecover.entriesCached, 0);
}

EXPECT_EQ(
ssdFile_->testingChecksumReadVerificationEnabled(),
testData.expectedReadVerificationEnabledOnRecovery);
Expand All @@ -626,6 +665,83 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) {
}
}

TEST_F(SsdFileTest, recoverWithEvictedEntries) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;
const uint64_t checkpointIntervalBytes = 5 * SsdFile::kRegionSize;
const auto retainFile =
StringIdLease(fileIds(), "recoverWithEvictedEntries.Retained");
const auto evictFile =
StringIdLease(fileIds(), "recoverWithEvictedEntries.Evicted");
initializeCache(kSsdSize, checkpointIntervalBytes);

std::vector<TestEntry> allEntries;
uint32_t retainedCacheEntries{0};
uint64_t retainedCacheSize{0};
for (auto startOffset = 0; startOffset <= kSsdSize / 2 - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
auto pins = makePins(
retainFile.id(),
startOffset,
4096,
2048 * 1025,
SsdFile::kRegionSize / 2);
for (const auto& pin : pins) {
++retainedCacheEntries;
retainedCacheSize += pin.entry()->size();
}
ssdFile_->write(pins);
readAndCheckPins(pins);
}

uint32_t evictedCacheEntries{0};
uint64_t evictedCacheSize{0};
for (auto startOffset = kSsdSize / 2;
startOffset <= kSsdSize - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
auto pins = makePins(
evictFile.id(),
startOffset + SsdFile::kRegionSize,
4096,
2048 * 1025,
SsdFile::kRegionSize / 2);
for (const auto& pin : pins) {
++evictedCacheEntries;
evictedCacheSize += pin.entry()->size();
}
ssdFile_->write(pins);
readAndCheckPins(pins);
}

SsdCacheStats stats;
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.bytesCached, retainedCacheSize + evictedCacheSize);
ASSERT_EQ(stats.regionsCached, 9);
ASSERT_EQ(stats.entriesCached, retainedCacheEntries + evictedCacheEntries);

// Remove one file from the ssd cache.
folly::F14FastSet<uint64_t> retainedFileIds;
ssdFile_->removeFileEntries({evictFile.id()}, retainedFileIds);
ASSERT_TRUE(retainedFileIds.empty());

stats.clear();
ssdFile_->updateStats(stats);
// NOTE: remove file entries might erase region which has space utilization
// below certain threshold.
ASSERT_LE(stats.bytesCached, retainedCacheSize);
ASSERT_LE(stats.regionsCached, 9);
ASSERT_LE(stats.entriesCached, retainedCacheEntries);

// Re-initialize SSD file from checkpoint.
ssdFile_->checkpoint(true);
initializeSsdFile(kSsdSize, checkpointIntervalBytes);

SsdCacheStats statsAfterRecovery;
ssdFile_->updateStats(statsAfterRecovery);
ASSERT_EQ(statsAfterRecovery.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterRecovery.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecovery.entriesCached, stats.entriesCached);
}

TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;

Expand All @@ -637,7 +753,8 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
ssdFile_->write(pins);
ASSERT_EQ(pins.size(), 1);
pins.back().entry()->setExclusiveToShared();
auto stats = ssdFile_->testingStats();
SsdCacheStats stats;
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);

std::vector<TestEntry> entries;
Expand All @@ -658,14 +775,19 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

ASSERT_EQ(checkEntries(entries), entries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);
stats.clear();
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);

cache_->clear();
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

#ifndef NDEBUG
VELOX_ASSERT_THROW(checkEntries(shortEntries), "");
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);
stats.clear();

ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);
#else
ASSERT_EQ(checkEntries(shortEntries), shortEntries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 1);
Expand Down

0 comments on commit da8d598

Please sign in to comment.