Skip to content

Commit

Permalink
Verify flushed data are recovered upon reopen in crash test (facebook…
Browse files Browse the repository at this point in the history
…#12787)

Summary:
**Context/Summary:**

This is to solve facebook#12152. We persist the largest flushed seqno before crash just like how we persist the ExpectedState. And we verify the db lates seqno after recovery is no smaller than this flushed seqno.

Pull Request resolved: facebook#12787

Test Plan:
- Manually observe that the persisted sequence after flush completion is used to verify db's latest sequence
- python3 tools/db_crashtest.py --simple blackbox --interval=30
- CI

Reviewed By: archang19

Differential Revision: D58860150

Pulled By: hx235

fbshipit-source-id: 99cb4403964d0737908855f92af7327867079e3e
  • Loading branch information
hx235 authored and facebook-github-bot committed Dec 25, 2024
1 parent 30d5162 commit e3024e7
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 42 deletions.
7 changes: 5 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ class DbStressListener : public EventListener {
DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families,
Env* env)
Env* env, SharedState* shared)
: db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0),
unique_ids_(db_name, env) {}
unique_ids_(db_name, env),
shared_(shared) {}

const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DBStressListener"; }
Expand All @@ -74,6 +75,7 @@ class DbStressListener : public EventListener {
if (fault_fs_guard) {
fault_fs_guard->DisableAllThreadLocalErrorInjection();
}
shared_->SetPersistedSeqno(info.largest_seqno);
}

void OnFlushBegin(DB* /*db*/,
Expand Down Expand Up @@ -358,6 +360,7 @@ class DbStressListener : public EventListener {
std::vector<ColumnFamilyDescriptor> column_families_;
std::atomic<int> num_pending_file_creations_;
UniqueIdVerifier unique_ids_;
SharedState* shared_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
25 changes: 21 additions & 4 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ class SharedState {
// expected state. Only then should we permit bypassing the below feature
// compatibility checks.
if (!FLAGS_expected_values_dir.empty()) {
if (!std::atomic<uint32_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on platforms without lock-free "
"std::atomic<uint32_t>");
if (!std::atomic<uint32_t>{}.is_lock_free() ||
!std::atomic<uint64_t>{}.is_lock_free()) {
std::ostringstream status_s;
status_s << "Cannot use --expected_values_dir on platforms without "
"lock-free "
<< (!std::atomic<uint32_t>{}.is_lock_free()
? "std::atomic<uint32_t>"
: "std::atomic<uint64_t>");
status = Status::InvalidArgument(status_s.str());
}

if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_dir on when "
Expand Down Expand Up @@ -260,6 +266,16 @@ class SharedState {
return expected_state_manager_->ClearColumnFamily(cf);
}

void SetPersistedSeqno(SequenceNumber seqno) {
MutexLock l(&persist_seqno_mu_);
return expected_state_manager_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() {
MutexLock l(&persist_seqno_mu_);
return expected_state_manager_->GetPersistedSeqno();
}

// Prepare a Put that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -396,6 +412,7 @@ class SharedState {

port::Mutex mu_;
port::CondVar cv_;
port::Mutex persist_seqno_mu_;
const uint32_t seed_;
const int64_t max_key_;
const uint32_t log2_keys_per_lock_;
Expand Down
14 changes: 12 additions & 2 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3472,8 +3472,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}

options_.listeners.clear();
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors,
db_stress_listener_env, shared));
RegisterAdditionalListeners();

// If this is for DB reopen, error injection may have been enabled.
Expand Down Expand Up @@ -3714,6 +3715,15 @@ void StressTest::Open(SharedState* shared, bool reopen) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}

if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) {
fprintf(stderr,
"DB of latest sequence number %" PRIu64
"did not recover to the persisted "
"sequence number %" PRIu64 " from last DB session\n",
db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno());
exit(1);
}
}

void StressTest::Reopen(ThreadState* thread) {
Expand Down
112 changes: 80 additions & 32 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
}
}

FileExpectedState::FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families)
FileExpectedState::FileExpectedState(
const std::string& expected_state_file_path,
const std::string& expected_persisted_seqno_file_path, size_t max_key,
size_t num_column_families)
: ExpectedState(max_key, num_column_families),
expected_state_file_path_(expected_state_file_path) {}
expected_state_file_path_(expected_state_file_path),
expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {}

Status FileExpectedState::Open(bool create) {
size_t expected_values_size = GetValuesLen();
Expand All @@ -144,30 +147,53 @@ Status FileExpectedState::Open(bool create) {

Status status;
if (create) {
std::unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
status = default_env->NewWritableFile(expected_state_file_path_, &wfile,
soptions);
if (status.ok()) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
}
}
if (status.ok()) {
status = default_env->NewMemoryMappedFileBuffer(
expected_state_file_path_, &expected_state_mmap_buffer_);
}
if (status.ok()) {
assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
}
} else {
status = CreateFile(default_env, EnvOptions(), expected_state_file_path_,
std::string(expected_values_size, '\0'));
if (!status.ok()) {
return status;
}

status = CreateFile(default_env, EnvOptions(),
expected_persisted_seqno_file_path_,
std::string(sizeof(std::atomic<SequenceNumber>), '\0'));

if (!status.ok()) {
return status;
}
}

status = MemoryMappedFile(default_env, expected_state_file_path_,
expected_state_mmap_buffer_, expected_values_size);
if (!status.ok()) {
assert(values_ == nullptr);
return status;
}

values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
assert(values_ != nullptr);
if (create) {
Reset();
}

// TODO(hx235): Find a way to mmap persisted seqno and expected state into the
// same LATEST file so we can obselete the logic to handle this extra file for
// persisted seqno
status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_,
expected_persisted_seqno_mmap_buffer_,
sizeof(std::atomic<SequenceNumber>));
if (!status.ok()) {
assert(persisted_seqno_ == nullptr);
return status;
}

persisted_seqno_ = static_cast<std::atomic<SequenceNumber>*>(
expected_persisted_seqno_mmap_buffer_->GetBase());
assert(persisted_seqno_ != nullptr);
if (create) {
persisted_seqno_->store(0, std::memory_order_relaxed);
}

return status;
}

Expand Down Expand Up @@ -200,6 +226,9 @@ ExpectedStateManager::~ExpectedStateManager() = default;
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST";
const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix =
".seqno";
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";

Expand Down Expand Up @@ -264,13 +293,17 @@ Status FileExpectedStateManager::Open() {

std::string expected_state_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string expected_persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);
bool found = false;
if (s.ok()) {
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
if (exists_status.ok()) {
found = true;
} else if (exists_status.IsNotFound()) {
found = false;
assert(Env::Default()
->FileExists(expected_persisted_seqno_file_path)
.IsNotFound());
} else {
s = exists_status;
}
Expand All @@ -282,20 +315,30 @@ Status FileExpectedStateManager::Open() {
// the incomplete expected values file.
std::string temp_expected_state_file_path =
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
FileExpectedState temp_expected_state(temp_expected_state_file_path,
max_key_, num_column_families_);
std::string temp_expected_persisted_seqno_file_path =
GetTempPathForFilename(kPersistedSeqnoBasename +
kPersistedSeqnoFilenameSuffix);
FileExpectedState temp_expected_state(
temp_expected_state_file_path, temp_expected_persisted_seqno_file_path,
max_key_, num_column_families_);
if (s.ok()) {
s = temp_expected_state.Open(true /* create */);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_state_file_path,
expected_state_file_path);
}
if (s.ok()) {
s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path,
expected_persisted_seqno_file_path);
}
}

if (s.ok()) {
latest_.reset(new FileExpectedState(std::move(expected_state_file_path),
max_key_, num_column_families_));
latest_.reset(
new FileExpectedState(std::move(expected_state_file_path),
std::move(expected_persisted_seqno_file_path),
max_key_, num_column_families_));
s = latest_->Open(false /* create */);
}
return s;
Expand Down Expand Up @@ -660,6 +703,9 @@ Status FileExpectedStateManager::Restore(DB* db) {
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);

std::string persisted_seqno_file_path = GetPathForFilename(
kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);

if (s.ok()) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
Expand All @@ -674,7 +720,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
std::unique_ptr<ExpectedState> state;
std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
if (s.ok()) {
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
state.reset(new FileExpectedState(latest_file_temp_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = state->Open(false /* create */);
}
Expand Down Expand Up @@ -720,7 +767,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
nullptr /* dbg */);
}
if (s.ok()) {
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
latest_.reset(new FileExpectedState(latest_file_path,
persisted_seqno_file_path, max_key_,
num_column_families_));
s = latest_->Open(false /* create */);
}
Expand Down
53 changes: 51 additions & 2 deletions db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class ExpectedState {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf);

// Requires external locking
void SetPersistedSeqno(SequenceNumber seqno) {
persisted_seqno_->store(
std::max(persisted_seqno_->load(std::memory_order_relaxed), seqno),
std::memory_order_relaxed);
}

// Requires external locking
SequenceNumber GetPersistedSeqno() {
return persisted_seqno_->load(std::memory_order_relaxed);
}

// Prepare a Put that will be started but not finished yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down Expand Up @@ -123,21 +135,50 @@ class ExpectedState {
void Reset();

std::atomic<uint32_t>* values_;
std::atomic<SequenceNumber>* persisted_seqno_;
};

// A `FileExpectedState` implements `ExpectedState` backed by a file.
class FileExpectedState : public ExpectedState {
public:
explicit FileExpectedState(std::string expected_state_file_path,
size_t max_key, size_t num_column_families);
explicit FileExpectedState(
const std::string& expected_state_file_path,
const std::string& expected_persisted_seqno_file_path, size_t max_key,
size_t num_column_families);

// Requires external locking preventing concurrent execution with any other
// member function.
Status Open(bool create) override;

private:
static Status CreateFile(Env* env, const EnvOptions& options,
const std::string& file_path,
const std::string& content) {
std::unique_ptr<WritableFile> wfile;
Status status = env->NewWritableFile(file_path, &wfile, options);
if (status.ok()) {
status = wfile->Append(content);
}
return status;
}

static Status MemoryMappedFile(
Env* env, const std::string& file_path,
std::unique_ptr<MemoryMappedFileBuffer>& memory_mapped_file_buffer,
std::size_t size) {
Status status =
env->NewMemoryMappedFileBuffer(file_path, &memory_mapped_file_buffer);
if (status.ok()) {
assert(memory_mapped_file_buffer->GetLen() == size);
}
(void)size;
return status;
}

const std::string expected_state_file_path_;
const std::string expected_persisted_seqno_file_path_;
std::unique_ptr<MemoryMappedFileBuffer> expected_state_mmap_buffer_;
std::unique_ptr<MemoryMappedFileBuffer> expected_persisted_seqno_mmap_buffer_;
};

// An `AnonExpectedState` implements `ExpectedState` backed by a memory
Expand Down Expand Up @@ -195,6 +236,12 @@ class ExpectedStateManager {
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }

void SetPersistedSeqno(SequenceNumber seqno) {
return latest_->SetPersistedSeqno(seqno);
}

SequenceNumber GetPersistedSeqno() { return latest_->GetPersistedSeqno(); }

// See ExpectedState::PreparePut()
PendingExpectedValue PreparePut(int cf, int64_t key) {
return latest_->PreparePut(cf, key);
Expand Down Expand Up @@ -289,6 +336,8 @@ class FileExpectedStateManager : public ExpectedStateManager {
static const std::string kLatestBasename;
static const std::string kStateFilenameSuffix;
static const std::string kTraceFilenameSuffix;
static const std::string kPersistedSeqnoBasename;
static const std::string kPersistedSeqnoFilenameSuffix;
static const std::string kTempFilenamePrefix;
static const std::string kTempFilenameSuffix;

Expand Down

0 comments on commit e3024e7

Please sign in to comment.