From dc34a0ff1eb6fa5d2acbf830588850dcd3451a30 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Tue, 5 Nov 2024 15:44:56 -0800 Subject: [PATCH] Add some checks for the file ingestion flow (#13100) Summary: This PR does a few misc things for file ingestion flow: - Add an invalid argument status return for the combination of `allow_global_seqno = false` and external files' key range overlap in `Prepare` stage. - Add a MemTables status check for when column family is flushed before `Run`. - Replace the column family dropped check with an assertion after thread enters the write queue and before it exits the write queue, since dropping column family can only happen in the single threaded write queue too and we already checked once after enter write queue. - Add an `ExternalSstFileIngestionJob::GetColumnFamilyData` API. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13100 Test Plan: Added unit tests, and stress tested the ingestion path Reviewed By: hx235 Differential Revision: D65180472 Pulled By: jowlyzhang fbshipit-source-id: 180145dd248a7507a13a543481b135e5a31ebe2d --- db/db_impl/db_impl.cc | 62 +++++++++++++-------------- db/external_sst_file_basic_test.cc | 54 +++++++++++++++++++++++ db/external_sst_file_ingestion_job.cc | 18 +++++++- db/external_sst_file_ingestion_job.h | 8 ++++ db/memtable_list.cc | 6 +-- db/memtable_list.h | 4 ++ 6 files changed, 114 insertions(+), 38 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d9cd321f37c..dbf6e46a233 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5961,9 +5961,9 @@ Status DBImpl::IngestExternalFiles( uint64_t start_file_number = next_file_number; for (size_t i = 1; i != num_cfs; ++i) { start_file_number += args[i - 1].external_files.size(); - auto* cfd = - static_cast(args[i].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + SuperVersion* super_version = + ingestion_jobs[i].GetColumnFamilyData()->GetReferencedSuperVersion( + this); Status es = ingestion_jobs[i].Prepare( args[i].external_files, args[i].files_checksums, args[i].files_checksum_func_names, args[i].file_temperature, @@ -5977,9 +5977,9 @@ Status DBImpl::IngestExternalFiles( TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0"); TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"); { - auto* cfd = - static_cast(args[0].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + SuperVersion* super_version = + ingestion_jobs[0].GetColumnFamilyData()->GetReferencedSuperVersion( + this); Status es = ingestion_jobs[0].Prepare( args[0].external_files, args[0].files_checksums, args[0].files_checksum_func_names, args[0].file_temperature, @@ -6030,8 +6030,7 @@ Status DBImpl::IngestExternalFiles( bool at_least_one_cf_need_flush = false; std::vector need_flush(num_cfs, false); for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); if (cfd->IsDropped()) { // TODO (yanqin) investigate whether we should abort ingestion or // proceed with other non-dropped column families. @@ -6063,12 +6062,10 @@ Status DBImpl::IngestExternalFiles( for (size_t i = 0; i != num_cfs; ++i) { if (need_flush[i]) { mutex_.Unlock(); - auto* cfd = - static_cast(args[i].column_family) - ->cfd(); - status = FlushMemTable(cfd, flush_opts, - FlushReason::kExternalFileIngestion, - true /* entered_write_thread */); + status = + FlushMemTable(ingestion_jobs[i].GetColumnFamilyData(), + flush_opts, FlushReason::kExternalFileIngestion, + true /* entered_write_thread */); mutex_.Lock(); if (!status.ok()) { break; @@ -6076,6 +6073,13 @@ Status DBImpl::IngestExternalFiles( } } } + if (status.ok()) { + for (size_t i = 0; i != num_cfs; ++i) { + if (immutable_db_options_.atomic_flush || need_flush[i]) { + ingestion_jobs[i].SetFlushedBeforeRun(); + } + } + } } // Run ingestion jobs. if (status.ok()) { @@ -6096,11 +6100,8 @@ Status DBImpl::IngestExternalFiles( autovector> edit_lists; uint32_t num_entries = 0; for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); - if (cfd->IsDropped()) { - continue; - } + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); + assert(!cfd->IsDropped()); cfds_to_commit.push_back(cfd); mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); autovector edit_list; @@ -6150,20 +6151,16 @@ Status DBImpl::IngestExternalFiles( if (status.ok()) { for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); - if (!cfd->IsDropped()) { - InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i], - *cfd->GetLatestMutableCFOptions()); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); + assert(!cfd->IsDropped()); + InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i], + *cfd->GetLatestMutableCFOptions()); #ifndef NDEBUG - if (0 == i && num_cfs > 1) { - TEST_SYNC_POINT( - "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0"); - TEST_SYNC_POINT( - "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"); - } -#endif // !NDEBUG + if (0 == i && num_cfs > 1) { + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:0"); + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"); } +#endif // !NDEBUG } } else if (versions_->io_status().IsIOError()) { // Error while writing to MANIFEST. @@ -6205,8 +6202,7 @@ Status DBImpl::IngestExternalFiles( } if (status.ok()) { for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); if (!cfd->IsDropped()) { NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]); } diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 1c57102c3c8..cccf84efdee 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1818,6 +1818,9 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) { } IngestExternalFileOptions ifo; + ifo.allow_global_seqno = false; + ASSERT_NOK(db_->IngestExternalFile(files, ifo)); + ifo.allow_global_seqno = true; ASSERT_OK(db_->IngestExternalFile(files, ifo)); ASSERT_EQ(Get("a"), "a1"); ASSERT_EQ(Get("i"), "i2"); @@ -2575,6 +2578,57 @@ TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) { ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1); } +TEST_F(ExternalSSTFileBasicTest, ConcurrentIngestionAndDropColumnFamily) { + int kNumCFs = 10; + Options options = CurrentOptions(); + CreateColumnFamilies({"cf_0", "cf_1", "cf_2", "cf_3", "cf_4", "cf_5", "cf_6", + "cf_7", "cf_8", "cf_9"}, + options); + + IngestExternalFileArg ingest_arg; + IngestExternalFileOptions ifo; + std::string external_file = sst_files_dir_ + "/file_to_ingest.sst"; + SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()}; + ASSERT_OK(sst_file_writer.Open(external_file)); + ASSERT_OK(sst_file_writer.Put("key", "value")); + ASSERT_OK(sst_file_writer.Finish()); + ifo.move_files = false; + ingest_arg.external_files = {external_file}; + ingest_arg.options = ifo; + + std::vector threads; + threads.reserve(2 * kNumCFs); + std::atomic success_ingestion_count = 0; + std::atomic failed_ingestion_count = 0; + for (int i = 0; i < kNumCFs; i++) { + threads.emplace_back( + [this, i]() { ASSERT_OK(db_->DropColumnFamily(handles_[i])); }); + threads.emplace_back([this, i, ingest_arg, &success_ingestion_count, + &failed_ingestion_count]() { + IngestExternalFileArg arg_copy = ingest_arg; + arg_copy.column_family = handles_[i]; + Status s = db_->IngestExternalFiles({arg_copy}); + ReadOptions ropts; + std::string value; + if (s.ok()) { + ASSERT_OK(db_->Get(ropts, handles_[i], "key", &value)); + ASSERT_EQ("value", value); + success_ingestion_count.fetch_add(1); + } else { + ASSERT_TRUE(db_->Get(ropts, handles_[i], "key", &value).IsNotFound()); + failed_ingestion_count.fetch_add(1); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + ASSERT_EQ(kNumCFs, success_ingestion_count + failed_ingestion_count); + Close(); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 63a5f6fc8d2..61588b2453f 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -95,6 +95,14 @@ Status ExternalSstFileIngestionJob::Prepare( "behind mode."); } + // Overlapping files need at least two different sequence numbers. If settings + // disables global seqno, ingestion will fail anyway, so fail fast in prepare. + if (!ingestion_options_.allow_global_seqno && files_overlap_) { + return Status::InvalidArgument( + "Global seqno is required, but disabled (because external files key " + "range overlaps)."); + } + if (ucmp_->timestamp_size() > 0 && files_overlap_) { return Status::NotSupported( "Files with overlapping ranges cannot be ingested to column " @@ -387,8 +395,16 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, // REQUIRES: we have become the only writer by entering both write_thread_ and // nonmem_write_thread_ Status ExternalSstFileIngestionJob::Run() { - Status status; SuperVersion* super_version = cfd_->GetSuperVersion(); + // If column family is flushed after Prepare and before Run, we should have a + // specific state of Memtables. The mutable Memtable should be empty, and the + // immutable Memtable list should be empty. + if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 || + super_version->mem->GetDataSize() != 0)) { + return Status::TryAgain( + "Inconsistent memtable state detected when flushed before run."); + } + Status status; #ifndef NDEBUG // We should never run the job with a memtable that is overlapping // with the files we are ingesting diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index df66e9e918a..4a853afed97 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -212,6 +212,8 @@ class ExternalSstFileIngestionJob { ~ExternalSstFileIngestionJob() { UnregisterRange(); } + ColumnFamilyData* GetColumnFamilyData() const { return cfd_; } + // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, const std::vector& files_checksums, @@ -229,6 +231,8 @@ class ExternalSstFileIngestionJob { // Thread-safe Status NeedsFlush(bool* flush_needed, SuperVersion* super_version); + void SetFlushedBeforeRun() { flushed_before_run_ = true; } + // Will execute the ingestion job and prepare edit() to be applied. // REQUIRES: Mutex held Status Run(); @@ -371,6 +375,10 @@ class ExternalSstFileIngestionJob { bool need_generate_file_checksum_{true}; std::shared_ptr io_tracer_; + // Flag indicating whether the column family is flushed after `Prepare` and + // before `Run`. + bool flushed_before_run_{false}; + // Below are variables used in (un)registering range for this ingestion job // // FileMetaData used in inputs of compactions equivalent to this ingestion diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 5f4029af2e6..4e1b38b81fc 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -92,14 +92,12 @@ void MemTableListVersion::Unref(autovector* to_delete) { } int MemTableList::NumNotFlushed() const { - int size = static_cast(current_->memlist_.size()); + int size = current_->NumNotFlushed(); assert(num_flush_not_started_ <= size); return size; } -int MemTableList::NumFlushed() const { - return static_cast(current_->memlist_history_.size()); -} +int MemTableList::NumFlushed() const { return current_->NumFlushed(); } // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. diff --git a/db/memtable_list.h b/db/memtable_list.h index a1e9311aff8..48075768cfc 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -146,6 +146,10 @@ class MemTableListVersion { uint64_t GetID() const { return id_; } + int NumNotFlushed() const { return static_cast(memlist_.size()); } + + int NumFlushed() const { return static_cast(memlist_history_.size()); } + private: friend class MemTableList;