Skip to content

Commit

Permalink
Fix a bug that can retain old WAL longer than needed (#13127)
Browse files Browse the repository at this point in the history
Summary:
The bug only happens for transaction db with 2pc. The main change is in `MemTableList::TryInstallMemtableFlushResults`. Before this fix, `memtables_to_flush` may not include all flushed memtables, and it causes the min_log_number for the flush to be incorrect. The code path for calculating min_log_number is `MemTableList::TryInstallMemtableFlushResults() -> GetDBRecoveryEditForObsoletingMemTables() -> PrecomputeMinLogNumberToKeep2PC() -> FindMinPrepLogReferencedByMemTable()`. Inside `FindMinPrepLogReferencedByMemTable()`, we need to exclude all memtables being flushed.

The PR also includes some documentation changes.

Pull Request resolved: #13127

Test Plan: added a new unit that fails before this change.

Reviewed By: ltamasi

Differential Revision: D65679270

Pulled By: cbi42

fbshipit-source-id: 611f34bd6ef4cba51f8b54cb1be416887b5a9c5e
  • Loading branch information
cbi42 authored and facebook-github-bot committed Nov 11, 2024
1 parent 1f0ccd9 commit 925435b
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 14 deletions.
1 change: 1 addition & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
// Memtables to be flushed by this job.
// Ordered by increasing memtable id, i.e., oldest memtable first.
autovector<ReadOnlyMemTable*> mems_;
VersionEdit* edit_;
Version* base_;
Expand Down
25 changes: 14 additions & 11 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
// (in that order) that have finished flushing. Memtables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
autovector<ReadOnlyMemTable*> memtables_to_flush;
// enumerate from the last (earliest) element to see how many batch finished
Expand All @@ -563,6 +562,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
break;
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
// Oldest memtable in a new batch.
batch_file_number = m->file_number_;
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
Expand All @@ -578,17 +578,17 @@ Status MemTableList::TryInstallMemtableFlushResults(
}

edit_list.push_back(&m->edit_);
memtables_to_flush.push_back(m);
std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
if (info != nullptr) {
committed_flush_jobs_info->push_back(std::move(info));
}
}
batch_count++;
memtables_to_flush.push_back(m);
}

size_t num_mem_to_flush = memtables_to_flush.size();
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
if (num_mem_to_flush > 0) {
VersionEdit edit;
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (memtables_to_flush.size() == memlist.size()) {
Expand All @@ -612,9 +612,9 @@ Status MemTableList::TryInstallMemtableFlushResults(
nullptr);
edit_list.push_back(&edit);

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
const auto manifest_write_cb = [this, cfd, num_mem_to_flush, log_buffer,
to_delete, mu](const Status& status) {
RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer,
RemoveMemTablesOrRestoreFlags(status, cfd, num_mem_to_flush, log_buffer,
to_delete, mu);
};
if (write_edits) {
Expand All @@ -627,7 +627,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
// If write_edit is false (e.g: successful mempurge),
// then remove old memtables, wake up manifest write queue threads,
// and don't commit anything to the manifest file.
RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer,
RemoveMemTablesOrRestoreFlags(s, cfd, num_mem_to_flush, log_buffer,
to_delete, mu);
// Note: cfd->SetLogNumber is only called when a VersionEdit
// is written to MANIFEST. When mempurge is succesful, we skip
Expand Down Expand Up @@ -735,7 +735,7 @@ void MemTableList::InstallNewVersion() {
}

void MemTableList::RemoveMemTablesOrRestoreFlags(
const Status& s, ColumnFamilyData* cfd, size_t batch_count,
const Status& s, ColumnFamilyData* cfd, size_t num_mem_to_flush,
LogBuffer* log_buffer, autovector<ReadOnlyMemTable*>* to_delete,
InstrumentedMutex* mu) {
assert(mu);
Expand Down Expand Up @@ -764,8 +764,11 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
// read full data as long as column family handle is not deleted, even if
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
while (num_mem_to_flush-- > 0) {
ReadOnlyMemTable* m = current_->memlist_.back();
// TODO: The logging can be redundant when we flush multiple memtables
// into one SST file. We should only check the edit_ of the oldest
// memtable in the group in that case.
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit flush result of table #%" PRIu64
Expand All @@ -787,7 +790,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
for (auto it = current_->memlist_.rbegin(); num_mem_to_flush-- > 0; ++it) {
ReadOnlyMemTable* m = *it;
// commit failed. setup state so that we can flush again.
if (m->edit_.GetBlobFileAdditions().empty()) {
Expand Down Expand Up @@ -816,7 +819,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
}

uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) {
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) const {
uint64_t min_log = 0;

for (auto& m : current_->memlist_) {
Expand Down
8 changes: 5 additions & 3 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,13 @@ class MemTableList {

size_t* current_memory_usage() { return &current_memory_usage_; }

// Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest.
// Returns the WAL number of the oldest WAL that contains a prepared
// transaction that corresponds to the content in this MemTableList,
// after memtables listed in `memtables_to_flush` are flushed and their
// status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection(
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush =
nullptr);
nullptr) const;

uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_;
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/bug_fixes/old_wal_2pc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Fix a bug for transaction db with 2pc where an old WAL may be retained longer than needed (#13127).
40 changes: 40 additions & 0 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2202,6 +2202,46 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
delete cfa;
delete cfb;
}

TEST_P(TransactionTest, TwoPhaseLogMultiMemtableFlush) {
// Test that min log number to keep is tracked correctly when
// multiple memtables are flushed together.
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// So that two immutable memtable won't stall writes.
ASSERT_OK(db->SetOptions({{"max_write_buffer_number", "4"}}));
// Pause flush.
ASSERT_OK(db->PauseBackgroundWork());

WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
TransactionOptions topts;
Transaction* txn1 = db->BeginTransaction(wopts, topts);
ASSERT_OK(txn1->Put("key1", "val1"));
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
delete txn1;

ASSERT_OK(db_impl->TEST_SwitchMemtable());

Transaction* txn2 = db->BeginTransaction(wopts, topts);
ASSERT_OK(txn2->Put("key2", "val2"));
ASSERT_OK(txn2->SetName("xid2"));
ASSERT_OK(txn2->Prepare());
ASSERT_OK(txn2->Commit());
delete txn2;

ASSERT_OK(db_impl->TEST_SwitchMemtable());

ASSERT_OK(db->ContinueBackgroundWork());
ASSERT_OK(db->Flush({}));

uint64_t cur_wal_num = db_impl->TEST_GetCurrentLogNumber();
// All non-active WALs should be obsolete.
ASSERT_EQ(cur_wal_num, db_impl->MinLogNumberToKeep());
}

/*
* 1) use prepare to keep first log around to determine starting sequence
* during recovery.
Expand Down

0 comments on commit 925435b

Please sign in to comment.