diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 131856ca79ea..9ff152271d79 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -294,7 +294,7 @@ std::optional> SsdFile::getSpace( return std::nullopt; } } - assert(!writableRegions_.empty()); + VELOX_CHECK(!writableRegions_.empty()); const auto region = writableRegions_[0]; const auto offset = regionSizes_[region]; auto available = kRegionSize - offset; @@ -554,6 +554,7 @@ void SsdFile::updateStats(SsdCacheStats& stats) const { stats.readSsdErrors += stats_.readSsdErrors; stats.readCheckpointErrors += stats_.readCheckpointErrors; stats.readSsdCorruptions += stats_.readSsdCorruptions; + stats.readWithoutChecksumChecks += stats_.readWithoutChecksumChecks; } void SsdFile::clear() { @@ -651,8 +652,8 @@ bool SsdFile::removeFileEntries( VELOX_SSD_CACHE_LOG(INFO) << "Removed " << entriesAgedOut << " entries from " << fileName_ << ". And erased " << toFree.size() << " regions with " - << kMaxErasedSizePct << "% entries removed."; - + << kMaxErasedSizePct << "% entries removed, and " << entries_.size() + << " left."; return true; } @@ -1009,6 +1010,8 @@ void SsdFile::readCheckpoint(std::ifstream& state) { for (auto region : evicted) { evictedMap.insert(region); } + + std::vector regionCacheSizes(numRegions_, 0); for (;;) { const auto fileNum = readNumber(state); if (fileNum == kCheckpointEndMarker) { @@ -1021,15 +1024,32 @@ void SsdFile::readCheckpoint(std::ifstream& state) { checksum = readNumber(state); } const auto run = SsdRun(fileBits, checksum); + const auto region = regionIndex(run.offset()); // Check that the recovered entry does not fall in an evicted region. - if (evictedMap.find(regionIndex(run.offset())) == evictedMap.end()) { - // The file may have a different id on restore. - auto it = idMap.find(fileNum); - VELOX_CHECK(it != idMap.end()); - FileCacheKey key{it->second, offset}; - entries_[std::move(key)] = run; + if (evictedMap.find(region) != evictedMap.end()) { + continue; } + // The file may have a different id on restore. + const auto it = idMap.find(fileNum); + VELOX_CHECK(it != idMap.end()); + FileCacheKey key{it->second, offset}; + entries_[std::move(key)] = run; + regionCacheSizes[region] += run.size(); + regionSizes_[region] = std::max( + regionSizes_[region], regionOffset(run.offset()) + run.size()); } + + // NOTE: we might erase entries from a region for TTL eviction, so we need to + // set the region size to the max offset of the recovered cache entry from the + // region. Correspondingly, we substract the cached size from the region size + // to get the erased size. + for (auto region = 0; region < numRegions_; ++region) { + VELOX_CHECK_LE(regionSizes_[region], kRegionSize); + VELOX_CHECK_LE(regionCacheSizes[region], regionSizes_[region]); + erasedRegionSizes_[region] = + regionSizes_[region] - regionCacheSizes[region]; + } + ++stats_.checkpointsRead; stats_.entriesRecovered += entries_.size(); @@ -1042,10 +1062,16 @@ void SsdFile::readCheckpoint(std::ifstream& state) { writableRegions_.push_back(region); } tracker_.setRegionScores(scores); + + uint64_t cachedBytes{0}; + for (const auto regionSize : regionSizes_) { + cachedBytes += regionSize; + } VELOX_SSD_CACHE_LOG(INFO) << fmt::format( - "Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}", + "Starting shard {} from checkpoint with {} entries, {} cached data, {} regions with {} free, with checksum write {}, read verification {}, checkpoint file {}", shardId_, entries_.size(), + succinctBytes(cachedBytes), numRegions_, writableRegions_.size(), checksumEnabled_ ? "enabled" : "disabled", diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 0ae44358dbcb..55be2f0f8410 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -200,6 +200,10 @@ struct SsdCacheStats { return result; } + void clear() { + *this = SsdCacheStats(); + } + /// Snapshot stats tsan_atomic entriesCached{0}; tsan_atomic regionsCached{0}; @@ -301,13 +305,27 @@ class SsdFile { /// Erases 'key' bool erase(RawFileCacheKey key); - /// Copies the data in 'ssdPins' into 'pins'. Coalesces IO for nearby /// entries if they are in ascending order and near enough. CoalesceIoStats load( const std::vector& ssdPins, const std::vector& pins); + /// Adds 'stats_' to 'stats'. + void updateStats(SsdCacheStats& stats) const; + + /// Remove cached entries of files in the fileNum set 'filesToRemove'. If + /// successful, return true, and 'filesRetained' contains entries that should + /// not be removed, ex., from pinned regions. Otherwise, return false and + /// 'filesRetained' could be ignored. + bool removeFileEntries( + const folly::F14FastSet& filesToRemove, + folly::F14FastSet& filesRetained); + + int32_t shardId() const { + return shardId_; + } + /// Increments the pin count of the region of 'offset'. void pinRegion(uint64_t offset); @@ -322,35 +340,10 @@ class SsdFile { VELOX_CHECK_GT(regionPins_[regionIndex(offset)], 0); } - /// Returns the region number corresponding to offset. - static int32_t regionIndex(uint64_t offset) { - return offset / kRegionSize; - } - - /// Updates the read count of a region. - void regionRead(int32_t region, int32_t size) { - tracker_.regionRead(region, size); - } - int32_t maxRegions() const { return maxRegions_; } - int32_t shardId() const { - return shardId_; - } - - /// Adds 'stats_' to 'stats'. - void updateStats(SsdCacheStats& stats) const; - - /// Remove cached entries of files in the fileNum set 'filesToRemove'. If - /// successful, return true, and 'filesRetained' contains entries that should - /// not be removed, ex., from pinned regions. Otherwise, return false and - /// 'filesRetained' could be ignored. - bool removeFileEntries( - const folly::F14FastSet& filesToRemove, - folly::F14FastSet& filesRetained); - /// Writes a checkpoint state that can be recovered from. The checkpoint is /// serialized on 'mutex_'. If 'force' is false, rechecks that at least /// 'checkpointIntervalBytes_' have been written since last checkpoint and @@ -399,10 +392,6 @@ class SsdFile { return entries_; } - SsdCacheStats testingStats() const { - return stats_; - } - bool testingChecksumReadVerificationEnabled() const { return checksumReadVerificationEnabled_; } @@ -416,6 +405,21 @@ class SsdFile { static constexpr int kMaxErasedSizePct = 50; + // Updates the read count of a region. + void regionRead(int32_t region, int32_t size) { + tracker_.regionRead(region, size); + } + + // Returns the region number corresponding to 'offset'. + static int32_t regionIndex(uint64_t offset) { + return offset / kRegionSize; + } + + // Returns the offset within a region corresponding to 'offset'. + static int32_t regionOffset(uint64_t offset) { + return offset % kRegionSize; + } + // The first 4 bytes of a checkpoint file contains version string to indicate // if checksum write is enabled or not. std::string checkpointVersion() const { diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 017de42d932c..fd2835596ad3 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -427,7 +427,8 @@ TEST_F(SsdFileTest, checkpoint) { // Test removeFileEntries. folly::F14FastSet filesToRemove{fileName_.id()}; folly::F14FastSet filesRetained{}; - auto stats = ssdFile_->testingStats(); + SsdCacheStats stats; + ssdFile_->updateStats(stats); EXPECT_EQ(stats.entriesAgedOut, 0); EXPECT_EQ(stats.regionsAgedOut, 0); EXPECT_EQ(stats.regionsEvicted, 0); @@ -440,7 +441,8 @@ TEST_F(SsdFileTest, checkpoint) { numEntriesFound = checkEntries(allEntries); EXPECT_EQ(numEntriesFound, allEntries.size()); auto prevStats = stats; - stats = ssdFile_->testingStats(); + stats.clear(); + ssdFile_->updateStats(stats); EXPECT_EQ(stats.entriesAgedOut - prevStats.entriesAgedOut, 0); EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 0); EXPECT_EQ(stats.regionsEvicted - prevStats.regionsEvicted, 0); @@ -455,7 +457,8 @@ TEST_F(SsdFileTest, checkpoint) { numEntriesFound = checkEntries(allEntries); EXPECT_EQ(numEntriesFound, 0); prevStats = stats; - stats = ssdFile_->testingStats(); + stats.clear(); + ssdFile_->updateStats(stats); EXPECT_EQ( stats.entriesAgedOut - prevStats.entriesAgedOut, allEntries.size() - 16); EXPECT_EQ(stats.regionsAgedOut - prevStats.regionsAgedOut, 16); @@ -494,7 +497,9 @@ TEST_F(SsdFileTest, fileCorruption) { populateCache(allEntries); // All entries can be found. EXPECT_EQ(checkEntries(allEntries), allEntries.size()); - EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0); + SsdCacheStats stats; + ssdFile_->updateStats(stats); + EXPECT_EQ(stats.readSsdCorruptions, 0); // Corrupt the SSD file, initialize the cache from checkpoint without read // verification. @@ -505,17 +510,20 @@ TEST_F(SsdFileTest, fileCorruption) { EXPECT_EQ(checkEntries({allEntries.begin(), allEntries.begin() + 100}), 100); EXPECT_EQ( checkEntries({allEntries.end() - 100, allEntries.end()}, false), 100); - // Corrupt the SSD file, initialize the cache from checkpoint with read // verification enabled. ssdFile_->checkpoint(true); initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true); // Entries at the front are still loadable. EXPECT_EQ(checkEntries({allEntries.begin(), allEntries.begin() + 100}), 100); - EXPECT_EQ(ssdFile_->testingStats().readSsdCorruptions, 0); + stats.clear(); + ssdFile_->updateStats(stats); + EXPECT_EQ(stats.readSsdCorruptions, 0); // The last 1/10 entries are corrupted and cannot be loaded. VELOX_ASSERT_THROW(checkEntries(allEntries), "Corrupt SSD cache entry"); - EXPECT_GT(ssdFile_->testingStats().readSsdCorruptions, 0); + stats.clear(); + ssdFile_->updateStats(stats); + EXPECT_GT(stats.readSsdCorruptions, 0); // New entries can be written. populateCache(allEntries); @@ -523,10 +531,14 @@ TEST_F(SsdFileTest, fileCorruption) { // lost. ssdFile_->checkpoint(true); corruptSsdFile(ssdFile_->getCheckpointFilePath()); - EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 0); + stats.clear(); + ssdFile_->updateStats(stats); + EXPECT_EQ(stats.readCheckpointErrors, 0); initializeSsdFile(kSsdSize, checkpointIntervalBytes, true, true); EXPECT_EQ(checkEntries(allEntries), 0); - EXPECT_EQ(ssdFile_->testingStats().readCheckpointErrors, 1); + stats.clear(); + ssdFile_->updateStats(stats); + EXPECT_EQ(stats.readCheckpointErrors, 1); // New entries can be written. populateCache(allEntries); } @@ -603,14 +615,40 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) { // All entries can be found. EXPECT_EQ(checkEntries(allEntries), allEntries.size()); + SsdCacheStats stats; + ssdFile_->updateStats(stats); + VELOX_CHECK_GT(stats.bytesCached, 0); + VELOX_CHECK_GT(stats.regionsCached, 0); + VELOX_CHECK_GT(stats.entriesCached, 0); + // Try reinitializing cache from checkpoint with read verification // enabled/disabled. ssdFile_->checkpoint(true); + + SsdCacheStats statsAfterCheckpoint; + ssdFile_->updateStats(statsAfterCheckpoint); + ASSERT_EQ(statsAfterCheckpoint.bytesCached, stats.bytesCached); + ASSERT_EQ(statsAfterCheckpoint.regionsCached, stats.regionsCached); + ASSERT_EQ(statsAfterCheckpoint.entriesCached, stats.entriesCached); + initializeSsdFile( kSsdSize, checkpointIntervalBytes, testData.writeEnabledOnRecovery, testData.readVerificationEnabledOnRecovery); + + SsdCacheStats statsAfterRecover; + ssdFile_->updateStats(statsAfterRecover); + if (testData.expectedCheckpointOnRecovery) { + ASSERT_EQ(statsAfterRecover.bytesCached, stats.bytesCached); + ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached); + ASSERT_EQ(statsAfterRecover.entriesCached, stats.entriesCached); + } else { + ASSERT_EQ(statsAfterRecover.bytesCached, 0); + ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached); + ASSERT_EQ(statsAfterRecover.entriesCached, 0); + } + EXPECT_EQ( ssdFile_->testingChecksumReadVerificationEnabled(), testData.expectedReadVerificationEnabledOnRecovery); @@ -626,6 +664,83 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) { } } +TEST_F(SsdFileTest, recoverWithEvictedEntries) { + constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize; + const uint64_t checkpointIntervalBytes = 5 * SsdFile::kRegionSize; + const auto retainFile = + StringIdLease(fileIds(), "recoverWithEvictedEntries.Retained"); + const auto evictFile = + StringIdLease(fileIds(), "recoverWithEvictedEntries.Evicted"); + initializeCache(kSsdSize, checkpointIntervalBytes); + + std::vector allEntries; + uint32_t retainedCacheEntries{0}; + uint64_t retainedCacheSize{0}; + for (auto startOffset = 0; startOffset <= kSsdSize / 2 - SsdFile::kRegionSize; + startOffset += SsdFile::kRegionSize) { + auto pins = makePins( + retainFile.id(), + startOffset, + 4096, + 2048 * 1025, + SsdFile::kRegionSize / 2); + for (const auto& pin : pins) { + ++retainedCacheEntries; + retainedCacheSize += pin.entry()->size(); + } + ssdFile_->write(pins); + readAndCheckPins(pins); + } + + uint32_t evictedCacheEntries{0}; + uint64_t evictedCacheSize{0}; + for (auto startOffset = kSsdSize / 2; + startOffset <= kSsdSize - SsdFile::kRegionSize; + startOffset += SsdFile::kRegionSize) { + auto pins = makePins( + evictFile.id(), + startOffset + SsdFile::kRegionSize, + 4096, + 2048 * 1025, + SsdFile::kRegionSize / 2); + for (const auto& pin : pins) { + ++evictedCacheEntries; + evictedCacheSize += pin.entry()->size(); + } + ssdFile_->write(pins); + readAndCheckPins(pins); + } + + SsdCacheStats stats; + ssdFile_->updateStats(stats); + ASSERT_EQ(stats.bytesCached, retainedCacheSize + evictedCacheSize); + ASSERT_EQ(stats.regionsCached, 9); + ASSERT_EQ(stats.entriesCached, retainedCacheEntries + evictedCacheEntries); + + // Remove one file from the ssd cache. + folly::F14FastSet retainedFileIds; + ssdFile_->removeFileEntries({evictFile.id()}, retainedFileIds); + ASSERT_TRUE(retainedFileIds.empty()); + + stats.clear(); + ssdFile_->updateStats(stats); + // NOTE: remove file entries might erase region which has space utilization + // below certain threshold. + ASSERT_LE(stats.bytesCached, retainedCacheSize); + ASSERT_LE(stats.regionsCached, 9); + ASSERT_LE(stats.entriesCached, retainedCacheEntries); + + // Re-initialize SSD file from checkpoint. + ssdFile_->checkpoint(true); + initializeSsdFile(kSsdSize, checkpointIntervalBytes); + + SsdCacheStats statsAfterRecovery; + ssdFile_->updateStats(statsAfterRecovery); + ASSERT_EQ(statsAfterRecovery.bytesCached, stats.bytesCached); + ASSERT_EQ(statsAfterRecovery.regionsCached, stats.regionsCached); + ASSERT_EQ(statsAfterRecovery.entriesCached, stats.entriesCached); +} + TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) { constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize; @@ -637,7 +752,8 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) { ssdFile_->write(pins); ASSERT_EQ(pins.size(), 1); pins.back().entry()->setExclusiveToShared(); - auto stats = ssdFile_->testingStats(); + SsdCacheStats stats; + ssdFile_->updateStats(stats); ASSERT_EQ(stats.readWithoutChecksumChecks, 0); std::vector entries; @@ -658,17 +774,22 @@ TEST_F(SsdFileTest, ssdReadWithoutChecksumCheck) { ASSERT_EQ(cache_->refreshStats().numEntries, 0); ASSERT_EQ(checkEntries(entries), entries.size()); - ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0); + stats.clear(); + ssdFile_->updateStats(stats); + ASSERT_EQ(stats.readWithoutChecksumChecks, 0); cache_->clear(); ASSERT_EQ(cache_->refreshStats().numEntries, 0); + stats.clear(); #ifndef NDEBUG VELOX_ASSERT_THROW(checkEntries(shortEntries), ""); - ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 0); + ssdFile_->updateStats(stats); + ASSERT_EQ(stats.readWithoutChecksumChecks, 0); #else ASSERT_EQ(checkEntries(shortEntries), shortEntries.size()); - ASSERT_EQ(ssdFile_->testingStats().readWithoutChecksumChecks, 1); + ssdFile_->updateStats(stats); + ASSERT_EQ(stats.readWithoutChecksumChecks, 1); #endif }