Skip to content

Commit

Permalink
Add to recover SSD cache space on server restart (facebookincubator#1…
Browse files Browse the repository at this point in the history
…0967)

Summary:
Pull Request resolved: facebookincubator#10967

Current SSD checkpoint recover doesn't restore the cache size in each region. This PR adds to
recover this with unit test.
This PR also fixes a stats update issue.

Reviewed By: zacw7, oerling

Differential Revision: D62482872
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 12, 2024
1 parent 485329e commit 6c058c5
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 26 deletions.
45 changes: 35 additions & 10 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
return std::nullopt;
}
}
assert(!writableRegions_.empty());
VELOX_CHECK(!writableRegions_.empty());
const auto region = writableRegions_[0];
const auto offset = regionSizes_[region];
auto available = kRegionSize - offset;
Expand Down Expand Up @@ -554,6 +554,7 @@ void SsdFile::updateStats(SsdCacheStats& stats) const {
stats.readSsdErrors += stats_.readSsdErrors;
stats.readCheckpointErrors += stats_.readCheckpointErrors;
stats.readSsdCorruptions += stats_.readSsdCorruptions;
stats.readWithoutChecksumChecks += stats_.readWithoutChecksumChecks;
}

void SsdFile::clear() {
Expand Down Expand Up @@ -651,8 +652,8 @@ bool SsdFile::removeFileEntries(
VELOX_SSD_CACHE_LOG(INFO)
<< "Removed " << entriesAgedOut << " entries from " << fileName_
<< ". And erased " << toFree.size() << " regions with "
<< kMaxErasedSizePct << "% entries removed.";

<< kMaxErasedSizePct << "% entries removed, and " << entries_.size()
<< " left.";
return true;
}

Expand Down Expand Up @@ -1009,6 +1010,8 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
for (auto region : evicted) {
evictedMap.insert(region);
}

std::vector<uint32_t> regionCacheSizes(numRegions_, 0);
for (;;) {
const auto fileNum = readNumber<uint64_t>(state);
if (fileNum == kCheckpointEndMarker) {
Expand All @@ -1021,15 +1024,31 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
checksum = readNumber<uint32_t>(state);
}
const auto run = SsdRun(fileBits, checksum);
const auto region = regionIndex(run.offset());
// Check that the recovered entry does not fall in an evicted region.
if (evictedMap.find(regionIndex(run.offset())) == evictedMap.end()) {
// The file may have a different id on restore.
auto it = idMap.find(fileNum);
VELOX_CHECK(it != idMap.end());
FileCacheKey key{it->second, offset};
entries_[std::move(key)] = run;
if (evictedMap.find(region) != evictedMap.end()) {
continue;
}
// The file may have a different id on restore.
const auto it = idMap.find(fileNum);
VELOX_CHECK(it != idMap.end());
FileCacheKey key{it->second, offset};
entries_[std::move(key)] = run;
regionCacheSizes[region] += run.size();
regionSizes_[region] =
std::max<uint32_t>(regionSizes_[region], run.offset() + run.size());
}

// NOTE: we might erase entries from a region for TTL eviction, so we need to
// set the region size to the max offset of the recovered cache entry from the
// region. Correspondingly, we substract the cached size from the region size
// to get the erased size.
for (auto region = 0; region < numRegions_; ++region) {
VELOX_CHECK_LE(regionCacheSizes[region], regionSizes_[region]);
erasedRegionSizes_[region] =
regionSizes_[region] - regionCacheSizes[region];
}

++stats_.checkpointsRead;
stats_.entriesRecovered += entries_.size();

Expand All @@ -1042,10 +1061,16 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
writableRegions_.push_back(region);
}
tracker_.setRegionScores(scores);

uint64_t cachedBytes{0};
for (const auto regionSize : regionSizes_) {
cachedBytes += regionSize;
}
VELOX_SSD_CACHE_LOG(INFO) << fmt::format(
"Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}",
"Starting shard {} from checkpoint with {} entries, {} cached data, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}",
shardId_,
entries_.size(),
succinctBytes(cachedBytes),
numRegions_,
writableRegions_.size(),
checksumEnabled_ ? "enabled" : "disabled",
Expand Down
8 changes: 4 additions & 4 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ struct SsdCacheStats {
return result;
}

void clear() {
*this = SsdCacheStats();
}

/// Snapshot stats
tsan_atomic<uint64_t> entriesCached{0};
tsan_atomic<uint64_t> regionsCached{0};
Expand Down Expand Up @@ -399,10 +403,6 @@ class SsdFile {
return entries_;
}

SsdCacheStats testingStats() const {
return stats_;
}

bool testingChecksumReadVerificationEnabled() const {
return checksumReadVerificationEnabled_;
}
Expand Down
146 changes: 134 additions & 12 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ TEST_F(SsdFileTest, checkpoint) {
// Test removeFileEntries.
folly::F14FastSet<uint64_t> filesToRemove{fileName_.id()};
folly::F14FastSet<uint64_t> filesRetained{};
auto stats = ssdFile_->testingStats();
SsdCacheStats stats;
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.entriesAgedOut, 0);
EXPECT_EQ(stats.regionsAgedOut, 0);
EXPECT_EQ(stats.regionsEvicted, 0);
Expand All @@ -440,7 +441,8 @@ TEST_F(SsdFileTest, checkpoint) {
numEntriesFound = checkEntries(allEntries);
EXPECT_EQ(numEntriesFound, allEntries.size());
auto prevStats = stats;
stats = ssdFile_->testingStats();
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.entriesAgedOut - prevStats.entriesAgedOut, 0);
EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 0);
EXPECT_EQ(stats.regionsEvicted - prevStats.regionsEvicted, 0);
Expand All @@ -455,7 +457,8 @@ TEST_F(SsdFileTest, checkpoint) {
numEntriesFound = checkEntries(allEntries);
EXPECT_EQ(numEntriesFound, 0);
prevStats = stats;
stats = ssdFile_->testingStats();
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(
stats.entriesAgedOut - prevStats.entriesAgedOut, allEntries.size() - 16);
EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 16);
Expand Down Expand Up @@ -494,7 +497,9 @@ TEST_F(SsdFileTest, fileCorruption) {
populateCache(allEntries);
// All entries can be found.
EXPECT_EQ(checkEntries(allEntries), allEntries.size());
EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0);
SsdCacheStats stats;
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readSsdCorruptions, 0);

// Corrupt the SSD file, initialize the cache from checkpoint without read
// verification.
Expand All @@ -512,21 +517,29 @@ TEST_F(SsdFileTest, fileCorruption) {
initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true);
// Entries at the front are still loadable.
EXPECT_EQ(checkEntries({allEntries.begin(), allEntries.begin() + 100}), 100);
EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readSsdCorruptions, 0);
// The last 1/10 entries are corrupted and cannot be loaded.
VELOX_ASSERT_THROW(checkEntries(allEntries), "Corrupt SSD cache entry");
EXPECT_GT(ssdFile_->testingStats().readSsdCorruptions, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_GT(stats.readSsdCorruptions, 0);
// New entries can be written.
populateCache(allEntries);

// Corrupt the Checkpoint file. Cache cannot be recovered. All entries are
// lost.
ssdFile_->checkpoint(true);
corruptSsdFile(ssdFile_->getCheckpointFilePath());
EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 0);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readCheckpointErrors, 0);
initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true);
EXPECT_EQ(checkEntries(allEntries), 0);
EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 1);
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readCheckpointErrors, 1);
// New entries can be written.
populateCache(allEntries);
}
Expand Down Expand Up @@ -603,14 +616,40 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) {
// All entries can be found.
EXPECT_EQ(checkEntries(allEntries), allEntries.size());

SsdCacheStats stats;
ssdFile_->updateStats(stats);
VELOX_CHECK_GT(stats.bytesCached, 0);
VELOX_CHECK_GT(stats.regionsCached, 0);
VELOX_CHECK_GT(stats.entriesCached, 0);

// Try reinitializing cache from checkpoint with read verification
// enabled/disabled.
ssdFile_->checkpoint(true);

SsdCacheStats statsAfterCheckpoint;
ssdFile_->updateStats(statsAfterCheckpoint);
ASSERT_EQ(statsAfterCheckpoint.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterCheckpoint.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterCheckpoint.entriesCached, stats.entriesCached);

initializeSsdFile(
kSsdSize,
checkpointIntervalBytes,
testData.writeEnabledOnRecovery,
testData.readVerificationEnabledOnRecovery);

SsdCacheStats statsAfterRecover;
ssdFile_->updateStats(statsAfterRecover);
if (testData.expectedCheckpointOnRecovery) {
ASSERT_EQ(statsAfterRecover.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecover.entriesCached, stats.entriesCached);
} else {
ASSERT_EQ(statsAfterRecover.bytesCached, 0);
ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecover.entriesCached, 0);
}

EXPECT_EQ(
ssdFile_->testingChecksumReadVerificationEnabled(),
testData.expectedReadVerificationEnabledOnRecovery);
Expand All @@ -626,6 +665,83 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) {
}
}

TEST_F(SsdFileTest, recoverWithEvictedEntries) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;
const uint64_t checkpointIntervalBytes = 5 * SsdFile::kRegionSize;
const auto retainFile =
StringIdLease(fileIds(), "recoverWithEvictedEntries.Retained");
const auto evictFile =
StringIdLease(fileIds(), "recoverWithEvictedEntries.Evicted");
initializeCache(kSsdSize, checkpointIntervalBytes);

std::vector<TestEntry> allEntries;
uint32_t retainedCacheEntries{0};
uint64_t retainedCacheSize{0};
for (auto startOffset = 0; startOffset <= kSsdSize / 2 - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
auto pins = makePins(
retainFile.id(),
startOffset,
4096,
2048 * 1025,
SsdFile::kRegionSize / 2);
for (const auto& pin : pins) {
++retainedCacheEntries;
retainedCacheSize += pin.entry()->size();
}
ssdFile_->write(pins);
readAndCheckPins(pins);
}

uint32_t evictedCacheEntries{0};
uint64_t evictedCacheSize{0};
for (auto startOffset = kSsdSize / 2;
startOffset <= kSsdSize - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
auto pins = makePins(
evictFile.id(),
startOffset + SsdFile::kRegionSize,
4096,
2048 * 1025,
SsdFile::kRegionSize / 2);
for (const auto& pin : pins) {
++evictedCacheEntries;
evictedCacheSize += pin.entry()->size();
}
ssdFile_->write(pins);
readAndCheckPins(pins);
}

SsdCacheStats stats;
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.bytesCached, retainedCacheSize + evictedCacheSize);
ASSERT_EQ(stats.regionsCached, 9);
ASSERT_EQ(stats.entriesCached, retainedCacheEntries + evictedCacheEntries);

// Remove one file from the ssd cache.
folly::F14FastSet<uint64_t> retainedFileIds;
ssdFile_->removeFileEntries({evictFile.id()}, retainedFileIds);
ASSERT_TRUE(retainedFileIds.empty());

stats.clear();
ssdFile_->updateStats(stats);
// NOTE: remove file entries might erase region which has space utilization
// below certain threshold.
ASSERT_LE(stats.bytesCached, retainedCacheSize);
ASSERT_LE(stats.regionsCached, 9);
ASSERT_LE(stats.entriesCached, retainedCacheEntries);

// Re-initialize SSD file from checkpoint.
ssdFile_->checkpoint(true);
initializeSsdFile(kSsdSize, checkpointIntervalBytes);

SsdCacheStats statsAfterRecovery;
ssdFile_->updateStats(statsAfterRecovery);
ASSERT_EQ(statsAfterRecovery.bytesCached, stats.bytesCached);
ASSERT_EQ(statsAfterRecovery.regionsCached, stats.regionsCached);
ASSERT_EQ(statsAfterRecovery.entriesCached, stats.entriesCached);
}

TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;

Expand All @@ -637,7 +753,8 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
ssdFile_->write(pins);
ASSERT_EQ(pins.size(), 1);
pins.back().entry()->setExclusiveToShared();
auto stats = ssdFile_->testingStats();
SsdCacheStats stats;
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);

std::vector<TestEntry> entries;
Expand All @@ -658,17 +775,22 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) {
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

ASSERT_EQ(checkEntries(entries), entries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);
stats.clear();
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);

cache_->clear();
ASSERT_EQ(cache_->refreshStats().numEntries, 0);

stats.clear();
#ifndef NDEBUG
VELOX_ASSERT_THROW(checkEntries(shortEntries), "");
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0);
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 0);
#else
ASSERT_EQ(checkEntries(shortEntries), shortEntries.size());
ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 1);
ssdFile_->updateStats(stats);
ASSERT_EQ(stats.readWithoutChecksumChecks, 1);
#endif
}

Expand Down

0 comments on commit 6c058c5

Please sign in to comment.