Skip to content

Commit

Permalink
Add some checks for the file ingestion flow (#13100)
Browse files Browse the repository at this point in the history
Summary:
This PR does a few misc things for file ingestion flow:

- Add an invalid argument status return for the combination of `allow_global_seqno = false` and external files' key range overlap in `Prepare` stage.
- Add a MemTables status check for when column family is flushed before `Run`.
- Replace the column family dropped check with an assertion after thread enters the write queue and before it exits the write queue, since dropping column family can only happen in the single threaded write queue too and we already checked once after enter write queue.
- Add an `ExternalSstFileIngestionJob::GetColumnFamilyData` API.

Pull Request resolved: #13100

Test Plan: Added unit tests, and stress tested the ingestion path

Reviewed By: hx235

Differential Revision: D65180472

Pulled By: jowlyzhang

fbshipit-source-id: 180145dd248a7507a13a543481b135e5a31ebe2d
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Nov 5, 2024
1 parent 8089eae commit dc34a0f
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 38 deletions.
62 changes: 29 additions & 33 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5961,9 +5961,9 @@ Status DBImpl::IngestExternalFiles(
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
SuperVersion* super_version =
ingestion_jobs[i].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[i].Prepare(
args[i].external_files, args[i].files_checksums,
args[i].files_checksum_func_names, args[i].file_temperature,
Expand All @@ -5977,9 +5977,9 @@ Status DBImpl::IngestExternalFiles(
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
{
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
SuperVersion* super_version =
ingestion_jobs[0].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[0].Prepare(
args[0].external_files, args[0].files_checksums,
args[0].files_checksum_func_names, args[0].file_temperature,
Expand Down Expand Up @@ -6030,8 +6030,7 @@ Status DBImpl::IngestExternalFiles(
bool at_least_one_cf_need_flush = false;
std::vector<bool> need_flush(num_cfs, false);
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (cfd->IsDropped()) {
// TODO (yanqin) investigate whether we should abort ingestion or
// proceed with other non-dropped column families.
Expand Down Expand Up @@ -6063,19 +6062,24 @@ Status DBImpl::IngestExternalFiles(
for (size_t i = 0; i != num_cfs; ++i) {
if (need_flush[i]) {
mutex_.Unlock();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
status =
FlushMemTable(ingestion_jobs[i].GetColumnFamilyData(),
flush_opts, FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
break;
}
}
}
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
if (immutable_db_options_.atomic_flush || need_flush[i]) {
ingestion_jobs[i].SetFlushedBeforeRun();
}
}
}
}
// Run ingestion jobs.
if (status.ok()) {
Expand All @@ -6096,11 +6100,8 @@ Status DBImpl::IngestExternalFiles(
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (cfd->IsDropped()) {
continue;
}
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
cfds_to_commit.push_back(cfd);
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
autovector<VersionEdit*> edit_list;
Expand Down Expand Up @@ -6150,20 +6151,16 @@ Status DBImpl::IngestExternalFiles(

if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
}
#endif // !NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
}
#endif // !NDEBUG
}
} else if (versions_->io_status().IsIOError()) {
// Error while writing to MANIFEST.
Expand Down Expand Up @@ -6205,8 +6202,7 @@ Status DBImpl::IngestExternalFiles(
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (!cfd->IsDropped()) {
NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
}
Expand Down
54 changes: 54 additions & 0 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,9 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
}

IngestExternalFileOptions ifo;
ifo.allow_global_seqno = false;
ASSERT_NOK(db_->IngestExternalFile(files, ifo));
ifo.allow_global_seqno = true;
ASSERT_OK(db_->IngestExternalFile(files, ifo));
ASSERT_EQ(Get("a"), "a1");
ASSERT_EQ(Get("i"), "i2");
Expand Down Expand Up @@ -2575,6 +2578,57 @@ TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1);
}

TEST_F(ExternalSSTFileBasicTest, ConcurrentIngestionAndDropColumnFamily) {
int kNumCFs = 10;
Options options = CurrentOptions();
CreateColumnFamilies({"cf_0", "cf_1", "cf_2", "cf_3", "cf_4", "cf_5", "cf_6",
"cf_7", "cf_8", "cf_9"},
options);

IngestExternalFileArg ingest_arg;
IngestExternalFileOptions ifo;
std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
ASSERT_OK(sst_file_writer.Open(external_file));
ASSERT_OK(sst_file_writer.Put("key", "value"));
ASSERT_OK(sst_file_writer.Finish());
ifo.move_files = false;
ingest_arg.external_files = {external_file};
ingest_arg.options = ifo;

std::vector<std::thread> threads;
threads.reserve(2 * kNumCFs);
std::atomic<int> success_ingestion_count = 0;
std::atomic<int> failed_ingestion_count = 0;
for (int i = 0; i < kNumCFs; i++) {
threads.emplace_back(
[this, i]() { ASSERT_OK(db_->DropColumnFamily(handles_[i])); });
threads.emplace_back([this, i, ingest_arg, &success_ingestion_count,
&failed_ingestion_count]() {
IngestExternalFileArg arg_copy = ingest_arg;
arg_copy.column_family = handles_[i];
Status s = db_->IngestExternalFiles({arg_copy});
ReadOptions ropts;
std::string value;
if (s.ok()) {
ASSERT_OK(db_->Get(ropts, handles_[i], "key", &value));
ASSERT_EQ("value", value);
success_ingestion_count.fetch_add(1);
} else {
ASSERT_TRUE(db_->Get(ropts, handles_[i], "key", &value).IsNotFound());
failed_ingestion_count.fetch_add(1);
}
});
}

for (auto& t : threads) {
t.join();
}

ASSERT_EQ(kNumCFs, success_ingestion_count + failed_ingestion_count);
Close();
}

INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),
Expand Down
18 changes: 17 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ Status ExternalSstFileIngestionJob::Prepare(
"behind mode.");
}

// Overlapping files need at least two different sequence numbers. If settings
// disables global seqno, ingestion will fail anyway, so fail fast in prepare.
if (!ingestion_options_.allow_global_seqno && files_overlap_) {
return Status::InvalidArgument(
"Global seqno is required, but disabled (because external files key "
"range overlaps).");
}

if (ucmp_->timestamp_size() > 0 && files_overlap_) {
return Status::NotSupported(
"Files with overlapping ranges cannot be ingested to column "
Expand Down Expand Up @@ -387,8 +395,16 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ExternalSstFileIngestionJob::Run() {
Status status;
SuperVersion* super_version = cfd_->GetSuperVersion();
// If column family is flushed after Prepare and before Run, we should have a
// specific state of Memtables. The mutable Memtable should be empty, and the
// immutable Memtable list should be empty.
if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 ||
super_version->mem->GetDataSize() != 0)) {
return Status::TryAgain(
"Inconsistent memtable state detected when flushed before run.");
}
Status status;
#ifndef NDEBUG
// We should never run the job with a memtable that is overlapping
// with the files we are ingesting
Expand Down
8 changes: 8 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ class ExternalSstFileIngestionJob {

~ExternalSstFileIngestionJob() { UnregisterRange(); }

ColumnFamilyData* GetColumnFamilyData() const { return cfd_; }

// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
const std::vector<std::string>& files_checksums,
Expand All @@ -229,6 +231,8 @@ class ExternalSstFileIngestionJob {
// Thread-safe
Status NeedsFlush(bool* flush_needed, SuperVersion* super_version);

void SetFlushedBeforeRun() { flushed_before_run_ = true; }

// Will execute the ingestion job and prepare edit() to be applied.
// REQUIRES: Mutex held
Status Run();
Expand Down Expand Up @@ -371,6 +375,10 @@ class ExternalSstFileIngestionJob {
bool need_generate_file_checksum_{true};
std::shared_ptr<IOTracer> io_tracer_;

// Flag indicating whether the column family is flushed after `Prepare` and
// before `Run`.
bool flushed_before_run_{false};

// Below are variables used in (un)registering range for this ingestion job
//
// FileMetaData used in inputs of compactions equivalent to this ingestion
Expand Down
6 changes: 2 additions & 4 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,12 @@ void MemTableListVersion::Unref(autovector<ReadOnlyMemTable*>* to_delete) {
}

int MemTableList::NumNotFlushed() const {
int size = static_cast<int>(current_->memlist_.size());
int size = current_->NumNotFlushed();
assert(num_flush_not_started_ <= size);
return size;
}

int MemTableList::NumFlushed() const {
return static_cast<int>(current_->memlist_history_.size());
}
int MemTableList::NumFlushed() const { return current_->NumFlushed(); }

// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
Expand Down
4 changes: 4 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ class MemTableListVersion {

uint64_t GetID() const { return id_; }

int NumNotFlushed() const { return static_cast<int>(memlist_.size()); }

int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }

private:
friend class MemTableList;

Expand Down

0 comments on commit dc34a0f

Please sign in to comment.