diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index 52830dc6e00..35c70b5a103 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -55,12 +55,13 @@ class DbStressListener : public EventListener { DbStressListener(const std::string& db_name, const std::vector& db_paths, const std::vector& column_families, - Env* env) + Env* env, SharedState* shared) : db_name_(db_name), db_paths_(db_paths), column_families_(column_families), num_pending_file_creations_(0), - unique_ids_(db_name, env) {} + unique_ids_(db_name, env), + shared_(shared) {} const char* Name() const override { return kClassName(); } static const char* kClassName() { return "DBStressListener"; } @@ -74,6 +75,7 @@ class DbStressListener : public EventListener { if (fault_fs_guard) { fault_fs_guard->DisableAllThreadLocalErrorInjection(); } + shared_->SetPersistedSeqno(info.largest_seqno); } void OnFlushBegin(DB* /*db*/, @@ -358,6 +360,7 @@ class DbStressListener : public EventListener { std::vector column_families_; std::atomic num_pending_file_creations_; UniqueIdVerifier unique_ids_; + SharedState* shared_; }; } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index cdd9f71708b..5d9fb34ac10 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -89,11 +89,17 @@ class SharedState { // expected state. Only then should we permit bypassing the below feature // compatibility checks. if (!FLAGS_expected_values_dir.empty()) { - if (!std::atomic{}.is_lock_free()) { - status = Status::InvalidArgument( - "Cannot use --expected_values_dir on platforms without lock-free " - "std::atomic"); + if (!std::atomic{}.is_lock_free() || + !std::atomic{}.is_lock_free()) { + std::ostringstream status_s; + status_s << "Cannot use --expected_values_dir on platforms without " + "lock-free " + << (!std::atomic{}.is_lock_free() + ? "std::atomic" + : "std::atomic"); + status = Status::InvalidArgument(status_s.str()); } + if (status.ok() && FLAGS_clear_column_family_one_in > 0) { status = Status::InvalidArgument( "Cannot use --expected_values_dir on when " @@ -260,6 +266,16 @@ class SharedState { return expected_state_manager_->ClearColumnFamily(cf); } + void SetPersistedSeqno(SequenceNumber seqno) { + MutexLock l(&persist_seqno_mu_); + return expected_state_manager_->SetPersistedSeqno(seqno); + } + + SequenceNumber GetPersistedSeqno() { + MutexLock l(&persist_seqno_mu_); + return expected_state_manager_->GetPersistedSeqno(); + } + // Prepare a Put that will be started but not finish yet // This is useful for crash-recovery testing when the process may crash // before updating the corresponding expected value @@ -396,6 +412,7 @@ class SharedState { port::Mutex mu_; port::CondVar cv_; + port::Mutex persist_seqno_mu_; const uint32_t seed_; const int64_t max_key_; const uint32_t log2_keys_per_lock_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3f7b8e64621..52123749a71 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3472,8 +3472,9 @@ void StressTest::Open(SharedState* shared, bool reopen) { } options_.listeners.clear(); - options_.listeners.emplace_back(new DbStressListener( - FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env)); + options_.listeners.emplace_back( + new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors, + db_stress_listener_env, shared)); RegisterAdditionalListeners(); // If this is for DB reopen, error injection may have been enabled. @@ -3714,6 +3715,15 @@ void StressTest::Open(SharedState* shared, bool reopen) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } + + if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) { + fprintf(stderr, + "DB of latest sequence number %" PRIu64 + "did not recover to the persisted " + "sequence number %" PRIu64 " from last DB session\n", + db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno()); + exit(1); + } } void StressTest::Reopen(ThreadState* thread) { diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index 22b31b23d17..80ba18a94c2 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -132,10 +132,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key, } } -FileExpectedState::FileExpectedState(std::string expected_state_file_path, - size_t max_key, size_t num_column_families) +FileExpectedState::FileExpectedState( + const std::string& expected_state_file_path, + const std::string& expected_persisted_seqno_file_path, size_t max_key, + size_t num_column_families) : ExpectedState(max_key, num_column_families), - expected_state_file_path_(expected_state_file_path) {} + expected_state_file_path_(expected_state_file_path), + expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {} Status FileExpectedState::Open(bool create) { size_t expected_values_size = GetValuesLen(); @@ -144,30 +147,53 @@ Status FileExpectedState::Open(bool create) { Status status; if (create) { - std::unique_ptr wfile; - const EnvOptions soptions; - status = default_env->NewWritableFile(expected_state_file_path_, &wfile, - soptions); - if (status.ok()) { - std::string buf(expected_values_size, '\0'); - status = wfile->Append(buf); - } - } - if (status.ok()) { - status = default_env->NewMemoryMappedFileBuffer( - expected_state_file_path_, &expected_state_mmap_buffer_); - } - if (status.ok()) { - assert(expected_state_mmap_buffer_->GetLen() == expected_values_size); - values_ = static_cast*>( - expected_state_mmap_buffer_->GetBase()); - assert(values_ != nullptr); - if (create) { - Reset(); - } - } else { + status = CreateFile(default_env, EnvOptions(), expected_state_file_path_, + std::string(expected_values_size, '\0')); + if (!status.ok()) { + return status; + } + + status = CreateFile(default_env, EnvOptions(), + expected_persisted_seqno_file_path_, + std::string(sizeof(std::atomic), '\0')); + + if (!status.ok()) { + return status; + } + } + + status = MemoryMappedFile(default_env, expected_state_file_path_, + expected_state_mmap_buffer_, expected_values_size); + if (!status.ok()) { assert(values_ == nullptr); + return status; + } + + values_ = static_cast*>( + expected_state_mmap_buffer_->GetBase()); + assert(values_ != nullptr); + if (create) { + Reset(); + } + + // TODO(hx235): Find a way to mmap persisted seqno and expected state into the + // same LATEST file so we can obselete the logic to handle this extra file for + // persisted seqno + status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_, + expected_persisted_seqno_mmap_buffer_, + sizeof(std::atomic)); + if (!status.ok()) { + assert(persisted_seqno_ == nullptr); + return status; + } + + persisted_seqno_ = static_cast*>( + expected_persisted_seqno_mmap_buffer_->GetBase()); + assert(persisted_seqno_ != nullptr); + if (create) { + persisted_seqno_->store(0, std::memory_order_relaxed); } + return status; } @@ -200,6 +226,9 @@ ExpectedStateManager::~ExpectedStateManager() = default; const std::string FileExpectedStateManager::kLatestBasename = "LATEST"; const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state"; const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace"; +const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST"; +const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix = + ".seqno"; const std::string FileExpectedStateManager::kTempFilenamePrefix = "."; const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp"; @@ -264,13 +293,17 @@ Status FileExpectedStateManager::Open() { std::string expected_state_file_path = GetPathForFilename(kLatestBasename + kStateFilenameSuffix); + std::string expected_persisted_seqno_file_path = GetPathForFilename( + kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix); bool found = false; if (s.ok()) { Status exists_status = Env::Default()->FileExists(expected_state_file_path); if (exists_status.ok()) { found = true; } else if (exists_status.IsNotFound()) { - found = false; + assert(Env::Default() + ->FileExists(expected_persisted_seqno_file_path) + .IsNotFound()); } else { s = exists_status; } @@ -282,8 +315,12 @@ Status FileExpectedStateManager::Open() { // the incomplete expected values file. std::string temp_expected_state_file_path = GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix); - FileExpectedState temp_expected_state(temp_expected_state_file_path, - max_key_, num_column_families_); + std::string temp_expected_persisted_seqno_file_path = + GetTempPathForFilename(kPersistedSeqnoBasename + + kPersistedSeqnoFilenameSuffix); + FileExpectedState temp_expected_state( + temp_expected_state_file_path, temp_expected_persisted_seqno_file_path, + max_key_, num_column_families_); if (s.ok()) { s = temp_expected_state.Open(true /* create */); } @@ -291,11 +328,17 @@ Status FileExpectedStateManager::Open() { s = Env::Default()->RenameFile(temp_expected_state_file_path, expected_state_file_path); } + if (s.ok()) { + s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path, + expected_persisted_seqno_file_path); + } } if (s.ok()) { - latest_.reset(new FileExpectedState(std::move(expected_state_file_path), - max_key_, num_column_families_)); + latest_.reset( + new FileExpectedState(std::move(expected_state_file_path), + std::move(expected_persisted_seqno_file_path), + max_key_, num_column_families_)); s = latest_->Open(false /* create */); } return s; @@ -660,6 +703,9 @@ Status FileExpectedStateManager::Restore(DB* db) { Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path, &trace_reader); + std::string persisted_seqno_file_path = GetPathForFilename( + kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix); + if (s.ok()) { // We are going to replay on top of "`seqno`.state" to create a new // "LATEST.state". Start off by creating a tempfile so we can later make the @@ -674,7 +720,8 @@ Status FileExpectedStateManager::Restore(DB* db) { std::unique_ptr state; std::unique_ptr handler; if (s.ok()) { - state.reset(new FileExpectedState(latest_file_temp_path, max_key_, + state.reset(new FileExpectedState(latest_file_temp_path, + persisted_seqno_file_path, max_key_, num_column_families_)); s = state->Open(false /* create */); } @@ -720,7 +767,8 @@ Status FileExpectedStateManager::Restore(DB* db) { nullptr /* dbg */); } if (s.ok()) { - latest_.reset(new FileExpectedState(latest_file_path, max_key_, + latest_.reset(new FileExpectedState(latest_file_path, + persisted_seqno_file_path, max_key_, num_column_families_)); s = latest_->Open(false /* create */); } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index bab546fa426..e72a80adeaa 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -38,6 +38,18 @@ class ExpectedState { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf); + // Requires external locking + void SetPersistedSeqno(SequenceNumber seqno) { + persisted_seqno_->store( + std::max(persisted_seqno_->load(std::memory_order_relaxed), seqno), + std::memory_order_relaxed); + } + + // Requires external locking + SequenceNumber GetPersistedSeqno() { + return persisted_seqno_->load(std::memory_order_relaxed); + } + // Prepare a Put that will be started but not finished yet // This is useful for crash-recovery testing when the process may crash // before updating the corresponding expected value @@ -123,21 +135,50 @@ class ExpectedState { void Reset(); std::atomic* values_; + std::atomic* persisted_seqno_; }; // A `FileExpectedState` implements `ExpectedState` backed by a file. class FileExpectedState : public ExpectedState { public: - explicit FileExpectedState(std::string expected_state_file_path, - size_t max_key, size_t num_column_families); + explicit FileExpectedState( + const std::string& expected_state_file_path, + const std::string& expected_persisted_seqno_file_path, size_t max_key, + size_t num_column_families); // Requires external locking preventing concurrent execution with any other // member function. Status Open(bool create) override; private: + static Status CreateFile(Env* env, const EnvOptions& options, + const std::string& file_path, + const std::string& content) { + std::unique_ptr wfile; + Status status = env->NewWritableFile(file_path, &wfile, options); + if (status.ok()) { + status = wfile->Append(content); + } + return status; + } + + static Status MemoryMappedFile( + Env* env, const std::string& file_path, + std::unique_ptr& memory_mapped_file_buffer, + std::size_t size) { + Status status = + env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer); + if (status.ok()) { + assert(memory_mapped_file_buffer->GetLen() == size); + } + (void)size; + return status; + } + const std::string expected_state_file_path_; + const std::string expected_persisted_seqno_file_path_; std::unique_ptr expected_state_mmap_buffer_; + std::unique_ptr expected_persisted_seqno_mmap_buffer_; }; // An `AnonExpectedState` implements `ExpectedState` backed by a memory @@ -195,6 +236,12 @@ class ExpectedStateManager { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); } + void SetPersistedSeqno(SequenceNumber seqno) { + return latest_->SetPersistedSeqno(seqno); + } + + SequenceNumber GetPersistedSeqno() { return latest_->GetPersistedSeqno(); } + // See ExpectedState::PreparePut() PendingExpectedValue PreparePut(int cf, int64_t key) { return latest_->PreparePut(cf, key); @@ -289,6 +336,8 @@ class FileExpectedStateManager : public ExpectedStateManager { static const std::string kLatestBasename; static const std::string kStateFilenameSuffix; static const std::string kTraceFilenameSuffix; + static const std::string kPersistedSeqnoBasename; + static const std::string kPersistedSeqnoFilenameSuffix; static const std::string kTempFilenamePrefix; static const std::string kTempFilenameSuffix;