From e3024e7b58fad26da5dc271a87121908cc79a8ca Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Tue, 24 Dec 2024 16:27:35 -0800 Subject: [PATCH] Verify flushed data are recovered upon reopen in crash test (#12787) Summary: **Context/Summary:** This is to solve https://github.com/facebook/rocksdb/issues/12152. We persist the largest flushed seqno before crash just like how we persist the ExpectedState. And we verify the db lates seqno after recovery is no smaller than this flushed seqno. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12787 Test Plan: - Manually observe that the persisted sequence after flush completion is used to verify db's latest sequence - python3 tools/db_crashtest.py --simple blackbox --interval=30 - CI Reviewed By: archang19 Differential Revision: D58860150 Pulled By: hx235 fbshipit-source-id: 99cb4403964d0737908855f92af7327867079e3e --- db_stress_tool/db_stress_listener.h | 7 +- db_stress_tool/db_stress_shared_state.h | 25 +++++- db_stress_tool/db_stress_test_base.cc | 14 ++- db_stress_tool/expected_state.cc | 112 +++++++++++++++++------- db_stress_tool/expected_state.h | 53 ++++++++++- 5 files changed, 169 insertions(+), 42 deletions(-) diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index 52830dc6e00..35c70b5a103 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -55,12 +55,13 @@ class DbStressListener : public EventListener { DbStressListener(const std::string& db_name, const std::vector& db_paths, const std::vector& column_families, - Env* env) + Env* env, SharedState* shared) : db_name_(db_name), db_paths_(db_paths), column_families_(column_families), num_pending_file_creations_(0), - unique_ids_(db_name, env) {} + unique_ids_(db_name, env), + shared_(shared) {} const char* Name() const override { return kClassName(); } static const char* kClassName() { return "DBStressListener"; } @@ -74,6 +75,7 @@ class DbStressListener : public EventListener { if (fault_fs_guard) { fault_fs_guard->DisableAllThreadLocalErrorInjection(); } + shared_->SetPersistedSeqno(info.largest_seqno); } void OnFlushBegin(DB* /*db*/, @@ -358,6 +360,7 @@ class DbStressListener : public EventListener { std::vector column_families_; std::atomic num_pending_file_creations_; UniqueIdVerifier unique_ids_; + SharedState* shared_; }; } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index cdd9f71708b..5d9fb34ac10 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -89,11 +89,17 @@ class SharedState { // expected state. Only then should we permit bypassing the below feature // compatibility checks. if (!FLAGS_expected_values_dir.empty()) { - if (!std::atomic{}.is_lock_free()) { - status = Status::InvalidArgument( - "Cannot use --expected_values_dir on platforms without lock-free " - "std::atomic"); + if (!std::atomic{}.is_lock_free() || + !std::atomic{}.is_lock_free()) { + std::ostringstream status_s; + status_s << "Cannot use --expected_values_dir on platforms without " + "lock-free " + << (!std::atomic{}.is_lock_free() + ? "std::atomic" + : "std::atomic"); + status = Status::InvalidArgument(status_s.str()); } + if (status.ok() && FLAGS_clear_column_family_one_in > 0) { status = Status::InvalidArgument( "Cannot use --expected_values_dir on when " @@ -260,6 +266,16 @@ class SharedState { return expected_state_manager_->ClearColumnFamily(cf); } + void SetPersistedSeqno(SequenceNumber seqno) { + MutexLock l(&persist_seqno_mu_); + return expected_state_manager_->SetPersistedSeqno(seqno); + } + + SequenceNumber GetPersistedSeqno() { + MutexLock l(&persist_seqno_mu_); + return expected_state_manager_->GetPersistedSeqno(); + } + // Prepare a Put that will be started but not finish yet // This is useful for crash-recovery testing when the process may crash // before updating the corresponding expected value @@ -396,6 +412,7 @@ class SharedState { port::Mutex mu_; port::CondVar cv_; + port::Mutex persist_seqno_mu_; const uint32_t seed_; const int64_t max_key_; const uint32_t log2_keys_per_lock_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3f7b8e64621..52123749a71 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3472,8 +3472,9 @@ void StressTest::Open(SharedState* shared, bool reopen) { } options_.listeners.clear(); - options_.listeners.emplace_back(new DbStressListener( - FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env)); + options_.listeners.emplace_back( + new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors, + db_stress_listener_env, shared)); RegisterAdditionalListeners(); // If this is for DB reopen, error injection may have been enabled. @@ -3714,6 +3715,15 @@ void StressTest::Open(SharedState* shared, bool reopen) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } + + if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) { + fprintf(stderr, + "DB of latest sequence number %" PRIu64 + "did not recover to the persisted " + "sequence number %" PRIu64 " from last DB session\n", + db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno()); + exit(1); + } } void StressTest::Reopen(ThreadState* thread) { diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index 22b31b23d17..80ba18a94c2 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -132,10 +132,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key, } } -FileExpectedState::FileExpectedState(std::string expected_state_file_path, - size_t max_key, size_t num_column_families) +FileExpectedState::FileExpectedState( + const std::string& expected_state_file_path, + const std::string& expected_persisted_seqno_file_path, size_t max_key, + size_t num_column_families) : ExpectedState(max_key, num_column_families), - expected_state_file_path_(expected_state_file_path) {} + expected_state_file_path_(expected_state_file_path), + expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {} Status FileExpectedState::Open(bool create) { size_t expected_values_size = GetValuesLen(); @@ -144,30 +147,53 @@ Status FileExpectedState::Open(bool create) { Status status; if (create) { - std::unique_ptr wfile; - const EnvOptions soptions; - status = default_env->NewWritableFile(expected_state_file_path_, &wfile, - soptions); - if (status.ok()) { - std::string buf(expected_values_size, '\0'); - status = wfile->Append(buf); - } - } - if (status.ok()) { - status = default_env->NewMemoryMappedFileBuffer( - expected_state_file_path_, &expected_state_mmap_buffer_); - } - if (status.ok()) { - assert(expected_state_mmap_buffer_->GetLen() == expected_values_size); - values_ = static_cast*>( - expected_state_mmap_buffer_->GetBase()); - assert(values_ != nullptr); - if (create) { - Reset(); - } - } else { + status = CreateFile(default_env, EnvOptions(), expected_state_file_path_, + std::string(expected_values_size, '\0')); + if (!status.ok()) { + return status; + } + + status = CreateFile(default_env, EnvOptions(), + expected_persisted_seqno_file_path_, + std::string(sizeof(std::atomic), '\0')); + + if (!status.ok()) { + return status; + } + } + + status = MemoryMappedFile(default_env, expected_state_file_path_, + expected_state_mmap_buffer_, expected_values_size); + if (!status.ok()) { assert(values_ == nullptr); + return status; + } + + values_ = static_cast*>( + expected_state_mmap_buffer_->GetBase()); + assert(values_ != nullptr); + if (create) { + Reset(); + } + + // TODO(hx235): Find a way to mmap persisted seqno and expected state into the + // same LATEST file so we can obselete the logic to handle this extra file for + // persisted seqno + status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_, + expected_persisted_seqno_mmap_buffer_, + sizeof(std::atomic)); + if (!status.ok()) { + assert(persisted_seqno_ == nullptr); + return status; + } + + persisted_seqno_ = static_cast*>( + expected_persisted_seqno_mmap_buffer_->GetBase()); + assert(persisted_seqno_ != nullptr); + if (create) { + persisted_seqno_->store(0, std::memory_order_relaxed); } + return status; } @@ -200,6 +226,9 @@ ExpectedStateManager::~ExpectedStateManager() = default; const std::string FileExpectedStateManager::kLatestBasename = "LATEST"; const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state"; const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace"; +const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST"; +const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix = + ".seqno"; const std::string FileExpectedStateManager::kTempFilenamePrefix = "."; const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp"; @@ -264,13 +293,17 @@ Status FileExpectedStateManager::Open() { std::string expected_state_file_path = GetPathForFilename(kLatestBasename + kStateFilenameSuffix); + std::string expected_persisted_seqno_file_path = GetPathForFilename( + kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix); bool found = false; if (s.ok()) { Status exists_status = Env::Default()->FileExists(expected_state_file_path); if (exists_status.ok()) { found = true; } else if (exists_status.IsNotFound()) { - found = false; + assert(Env::Default() + ->FileExists(expected_persisted_seqno_file_path) + .IsNotFound()); } else { s = exists_status; } @@ -282,8 +315,12 @@ Status FileExpectedStateManager::Open() { // the incomplete expected values file. std::string temp_expected_state_file_path = GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix); - FileExpectedState temp_expected_state(temp_expected_state_file_path, - max_key_, num_column_families_); + std::string temp_expected_persisted_seqno_file_path = + GetTempPathForFilename(kPersistedSeqnoBasename + + kPersistedSeqnoFilenameSuffix); + FileExpectedState temp_expected_state( + temp_expected_state_file_path, temp_expected_persisted_seqno_file_path, + max_key_, num_column_families_); if (s.ok()) { s = temp_expected_state.Open(true /* create */); } @@ -291,11 +328,17 @@ Status FileExpectedStateManager::Open() { s = Env::Default()->RenameFile(temp_expected_state_file_path, expected_state_file_path); } + if (s.ok()) { + s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path, + expected_persisted_seqno_file_path); + } } if (s.ok()) { - latest_.reset(new FileExpectedState(std::move(expected_state_file_path), - max_key_, num_column_families_)); + latest_.reset( + new FileExpectedState(std::move(expected_state_file_path), + std::move(expected_persisted_seqno_file_path), + max_key_, num_column_families_)); s = latest_->Open(false /* create */); } return s; @@ -660,6 +703,9 @@ Status FileExpectedStateManager::Restore(DB* db) { Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path, &trace_reader); + std::string persisted_seqno_file_path = GetPathForFilename( + kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix); + if (s.ok()) { // We are going to replay on top of "`seqno`.state" to create a new // "LATEST.state". Start off by creating a tempfile so we can later make the @@ -674,7 +720,8 @@ Status FileExpectedStateManager::Restore(DB* db) { std::unique_ptr state; std::unique_ptr handler; if (s.ok()) { - state.reset(new FileExpectedState(latest_file_temp_path, max_key_, + state.reset(new FileExpectedState(latest_file_temp_path, + persisted_seqno_file_path, max_key_, num_column_families_)); s = state->Open(false /* create */); } @@ -720,7 +767,8 @@ Status FileExpectedStateManager::Restore(DB* db) { nullptr /* dbg */); } if (s.ok()) { - latest_.reset(new FileExpectedState(latest_file_path, max_key_, + latest_.reset(new FileExpectedState(latest_file_path, + persisted_seqno_file_path, max_key_, num_column_families_)); s = latest_->Open(false /* create */); } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index bab546fa426..e72a80adeaa 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -38,6 +38,18 @@ class ExpectedState { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf); + // Requires external locking + void SetPersistedSeqno(SequenceNumber seqno) { + persisted_seqno_->store( + std::max(persisted_seqno_->load(std::memory_order_relaxed), seqno), + std::memory_order_relaxed); + } + + // Requires external locking + SequenceNumber GetPersistedSeqno() { + return persisted_seqno_->load(std::memory_order_relaxed); + } + // Prepare a Put that will be started but not finished yet // This is useful for crash-recovery testing when the process may crash // before updating the corresponding expected value @@ -123,21 +135,50 @@ class ExpectedState { void Reset(); std::atomic* values_; + std::atomic* persisted_seqno_; }; // A `FileExpectedState` implements `ExpectedState` backed by a file. class FileExpectedState : public ExpectedState { public: - explicit FileExpectedState(std::string expected_state_file_path, - size_t max_key, size_t num_column_families); + explicit FileExpectedState( + const std::string& expected_state_file_path, + const std::string& expected_persisted_seqno_file_path, size_t max_key, + size_t num_column_families); // Requires external locking preventing concurrent execution with any other // member function. Status Open(bool create) override; private: + static Status CreateFile(Env* env, const EnvOptions& options, + const std::string& file_path, + const std::string& content) { + std::unique_ptr wfile; + Status status = env->NewWritableFile(file_path, &wfile, options); + if (status.ok()) { + status = wfile->Append(content); + } + return status; + } + + static Status MemoryMappedFile( + Env* env, const std::string& file_path, + std::unique_ptr& memory_mapped_file_buffer, + std::size_t size) { + Status status = + env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer); + if (status.ok()) { + assert(memory_mapped_file_buffer->GetLen() == size); + } + (void)size; + return status; + } + const std::string expected_state_file_path_; + const std::string expected_persisted_seqno_file_path_; std::unique_ptr expected_state_mmap_buffer_; + std::unique_ptr expected_persisted_seqno_mmap_buffer_; }; // An `AnonExpectedState` implements `ExpectedState` backed by a memory @@ -195,6 +236,12 @@ class ExpectedStateManager { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); } + void SetPersistedSeqno(SequenceNumber seqno) { + return latest_->SetPersistedSeqno(seqno); + } + + SequenceNumber GetPersistedSeqno() { return latest_->GetPersistedSeqno(); } + // See ExpectedState::PreparePut() PendingExpectedValue PreparePut(int cf, int64_t key) { return latest_->PreparePut(cf, key); @@ -289,6 +336,8 @@ class FileExpectedStateManager : public ExpectedStateManager { static const std::string kLatestBasename; static const std::string kStateFilenameSuffix; static const std::string kTraceFilenameSuffix; + static const std::string kPersistedSeqnoBasename; + static const std::string kPersistedSeqnoFilenameSuffix; static const std::string kTempFilenamePrefix; static const std::string kTempFilenameSuffix;