Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect WAL hole #13226

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2071,23 +2071,26 @@ class DBImpl : public DB {
bool read_only, int job_id, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
PredecessorWALInfo& predecessor_wal_info);

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname, bool* const old_log_record,
Status* const reporter_status,
DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,

bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record, Status* const reporter_status,
DBOpenLogReporter* reporter, std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Expand Down Expand Up @@ -2116,8 +2119,13 @@ class DBImpl : public DB {
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);

void FinishLogFileProcessing(SequenceNumber const* const next_sequence,
const Status& status);
Status UpdatePredecessorWALInfo(uint64_t wal_number,
const SequenceNumber last_seqno_observed,
const std::string& fname,
PredecessorWALInfo& predecessor_wal_info);

void FinishLogFileProcessing(const Status& status,
const SequenceNumber* next_sequence);

// Return `Status::Corruption()` when `stop_replay_for_corruption == true` and
// exits inconsistency between SST and WAL data
Expand Down Expand Up @@ -2309,7 +2317,8 @@ class DBImpl : public DB {
const WriteOptions& write_options,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size,
SequenceNumber sequence);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down Expand Up @@ -2554,6 +2563,7 @@ class DBImpl : public DB {

IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num,
uint64_t recycle_log_number, size_t preallocate_block_size,
const PredecessorWALInfo& predecessor_wal_info,
log::Writer** new_log);

// Validate self-consistency of DB options
Expand Down
126 changes: 94 additions & 32 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ Status DBImpl::Recover(
}
}

if (immutable_db_options_.track_and_verify_wals && !is_new_db &&
!immutable_db_options_.best_efforts_recovery && wal_files.empty()) {
return Status::Corruption("Opening an existing DB with no WAL files");
}

if (immutable_db_options_.track_and_verify_wals_in_manifest) {
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
Expand Down Expand Up @@ -1189,14 +1194,15 @@ Status DBImpl::ProcessLogFiles(
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
PredecessorWALInfo predecessor_wal_info;

for (auto wal_number : wal_numbers) {
if (status.ok()) {
status =
ProcessLogFile(wal_number, min_wal_number, is_retry, read_only,
job_id, next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed);
status = ProcessLogFile(
wal_number, min_wal_number, is_retry, read_only, job_id,
next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed, predecessor_wal_info);
}
}

Expand All @@ -1217,7 +1223,8 @@ Status DBImpl::ProcessLogFile(
int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
PredecessorWALInfo& predecessor_wal_info) {
assert(stop_replay_by_wal_filter);

// Variable initialization starts
Expand All @@ -1244,6 +1251,11 @@ Status DBImpl::ProcessLogFile(
uint64_t record_checksum;
const UnorderedMap<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();

// We need to track `last_seqno_observed` in addition to `next_sequence` since
// `last_seqno_observed != *next_sequence` when there are multiple key-value
// pairs in one WAL entry
SequenceNumber last_seqno_observed = 0;
// Variable initialization ends

if (wal_number < min_wal_number) {
Expand All @@ -1264,14 +1276,19 @@ Status DBImpl::ProcessLogFile(
}

Status init_status = InitializeLogReader(
wal_number, is_retry, fname, &old_log_record, &status, &reporter, reader);
wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number,
predecessor_wal_info, &old_log_record, &status, &reporter, reader);

// FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases
if (!init_status.ok()) {
assert(status.ok());
status.PermitUncheckedError();
return init_status;
} else if (reader == nullptr) {
// TODO(hx235): remove this case since it's confusing
assert(status.ok());
// Fail initializing log reader for one log file with an ok status.
// Try next one.
return status;
}

Expand All @@ -1296,9 +1313,9 @@ Status DBImpl::ProcessLogFile(
// FIXME(hx235): consolidate `process_status` and `status`
Status process_status = ProcessLogRecord(
record, reader, running_ts_sz, wal_number, fname, read_only, job_id,
logFileDropped, &reporter, &record_checksum, next_sequence,
stop_replay_for_corruption, &status, stop_replay_by_wal_filter,
version_edits, flushed);
logFileDropped, &reporter, &record_checksum, &last_seqno_observed,
next_sequence, stop_replay_for_corruption, &status,
stop_replay_by_wal_filter, version_edits, flushed);

if (!process_status.ok()) {
return process_status;
Expand All @@ -1311,13 +1328,19 @@ Status DBImpl::ProcessLogFile(
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
*next_sequence);

if (status.ok()) {
status = UpdatePredecessorWALInfo(wal_number, last_seqno_observed, fname,
predecessor_wal_info);
}

if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}

FinishLogFileProcessing(next_sequence, status);
FinishLogFileProcessing(status, next_sequence);

return status;
}

Expand All @@ -1332,12 +1355,12 @@ void DBImpl::SetupLogFileProcessing(uint64_t wal_number) {
static_cast<int>(immutable_db_options_.wal_recovery_mode));
}

Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname,
bool* const old_log_record,
Status* const reporter_status,
DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
assert(reporter);
Expand Down Expand Up @@ -1375,9 +1398,11 @@ Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
reader.reset(new log::Reader(immutable_db_options_.info_log,
std::move(file_reader), reporter,
true /*checksum*/, wal_number));
reader.reset(new log::Reader(
immutable_db_options_.info_log, std::move(file_reader), reporter,
true /*checksum*/, wal_number,
immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption,
min_wal_number, predecessor_wal_info));
return status;
}

Expand All @@ -1386,11 +1411,12 @@ Status DBImpl::ProcessLogRecord(
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(reporter);
assert(last_seqno_observed);
assert(stop_replay_for_corruption);
assert(status);
assert(stop_replay_by_wal_filter);
Expand All @@ -1416,17 +1442,18 @@ Status DBImpl::ProcessLogRecord(
}
assert(batch_to_use);

SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);
if (sequence > kMaxSequenceNumber) {
*last_seqno_observed = WriteBatchInternal::Sequence(batch_to_use);

if (*last_seqno_observed > kMaxSequenceNumber) {
reporter->Corruption(
record.size(),
Status::Corruption("sequence " + std::to_string(sequence) +
Status::Corruption("sequence " + std::to_string(*last_seqno_observed) +
" is too large"));
assert(process_status.ok());
return process_status;
}

MaybeReviseStopReplayForCorruption(sequence, next_sequence,
MaybeReviseStopReplayForCorruption(*last_seqno_observed, next_sequence,
stop_replay_for_corruption);
if (*stop_replay_for_corruption) {
logFileDropped();
Expand Down Expand Up @@ -1630,8 +1657,30 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord(
return status;
}
}
void DBImpl::FinishLogFileProcessing(SequenceNumber const* const next_sequence,
const Status& status) {

Status DBImpl::UpdatePredecessorWALInfo(
uint64_t wal_number, const SequenceNumber last_seqno_observed,
const std::string& fname, PredecessorWALInfo& predecessor_wal_info) {
uint64_t bytes;

Status s = env_->GetFileSize(fname, &bytes);
if (!s.ok()) {
return s;
}

SequenceNumber mock_seqno = kMaxSequenceNumber;
[[maybe_unused]] std::pair<uint64_t, SequenceNumber*> pair =
std::make_pair(wal_number, &mock_seqno);
TEST_SYNC_POINT_CALLBACK("DBImpl::UpdatePredecessorWALInfo", &pair);
predecessor_wal_info = PredecessorWALInfo(
wal_number, bytes,
mock_seqno != kMaxSequenceNumber ? mock_seqno : last_seqno_observed);

return s;
}

void DBImpl::FinishLogFileProcessing(const Status& status,
const SequenceNumber* next_sequence) {
if (status.ok()) {
assert(next_sequence);
flush_scheduler_.Clear();
Expand Down Expand Up @@ -2193,6 +2242,7 @@ Status DB::OpenAndTrimHistory(
IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
const PredecessorWALInfo& predecessor_wal_info,
log::Writer** new_log) {
IOStatus io_s;
std::unique_ptr<FSWritableFile> lfile;
Expand Down Expand Up @@ -2236,9 +2286,15 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression);
immutable_db_options_.wal_compression,
immutable_db_options_.track_and_verify_wals);
io_s = (*new_log)->AddCompressionTypeRecord(write_options);
if (io_s.ok()) {
io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options,
predecessor_wal_info);
}
}

return io_s;
}

Expand Down Expand Up @@ -2331,8 +2387,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
log::Writer* new_log = nullptr;
const size_t preallocate_block_size =
impl->GetWalPreallocateBlockSize(max_write_buffer_size);
// TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL
// created during DB open with predecessor WALs from previous DB session due
// to `avoid_flush_during_recovery == true`. This can protect the last WAL
// recovered.
s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/,
preallocate_block_size, &new_log);
preallocate_block_size,
PredecessorWALInfo() /* predecessor_wal_info */,
&new_log);
if (s.ok()) {
// Prevent log files created by previous instance from being recycled.
// They might be in alive_log_file_, and might get recycled otherwise.
Expand Down Expand Up @@ -2367,7 +2429,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, recovered_seq);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(write_options, false);
Expand Down
20 changes: 15 additions & 5 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
const WriteOptions& write_options,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
SequenceNumber sequence) {
assert(log_size != nullptr);

Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
Expand All @@ -1584,7 +1585,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (!io_s.ok()) {
return io_s;
}
io_s = log_writer->AddRecord(write_options, log_entry);
io_s = log_writer->AddRecord(write_options, log_entry, sequence);

if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
Expand Down Expand Up @@ -1634,7 +1635,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -1760,7 +1761,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used,
&log_size, log_file_number_size);
&log_size, log_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -2443,10 +2444,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
mutex_.Unlock();
if (creating_new_log) {
PredecessorWALInfo info;
log_write_mutex_.Lock();
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
info = PredecessorWALInfo(cur_log_writer->get_log_number(),
cur_log_writer->file()->GetFileSize(),
cur_log_writer->GetLastSeqnoRecorded());
}
log_write_mutex_.Unlock();
// TODO: Write buffer size passed in should be max of all CF's instead
// of mutable_cf_options.write_buffer_size.
io_s = CreateWAL(write_options, new_log_number, recycle_log_number,
preallocate_block_size, &new_log);
preallocate_block_size, info, &new_log);
if (s.ok()) {
s = io_s;
}
Expand Down
Loading
Loading