From 787730c859d83fd82b3087ae2edf189f8f2e698f Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Wed, 16 Oct 2024 14:11:22 -0700 Subject: [PATCH] Add an ingestion option to not fill block cache (#13067) Summary: add `IngestExternalFileOptions::fill_cache` to allow users to ingest files without loading index/filter/data and other blocks into block cache during file ingestion. This can be useful when users are ingesting files into a CF that is not available to readers yet. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13067 Test Plan: * unit test: `ExternalSSTFileTest.NoBlockCache` * ran one round of crash test with fill_cache disabled: `python3 ./tools/db_crashtest.py --simple blackbox --ops_per_thread=1000000 --interval=30 --ingest_external_file_one_in=200 --level0_stop_writes_trigger=200 --level0_slowdown_writes_trigger=100 --sync_fault_injection=0 --disable_wal=0 --manual_wal_flush_one_in=0` Reviewed By: jowlyzhang Differential Revision: D64356424 Pulled By: cbi42 fbshipit-source-id: b380c26f5987238e1ed7d42ceef0390cfaa0b8e2 --- db/db_impl/db_impl.cc | 7 ++- db/external_sst_file_ingestion_job.cc | 20 ++++--- db/external_sst_file_test.cc | 58 +++++++++++++++++-- db_stress_tool/no_batched_ops_stress.cc | 4 +- include/rocksdb/options.h | 8 +++ table/block_based/block_based_table_reader.cc | 1 + table/block_based/block_based_table_reader.h | 2 + .../new_features/ingest-fill-cache.md | 1 + 8 files changed, 85 insertions(+), 16 deletions(-) create mode 100644 unreleased_history/new_features/ingest-fill-cache.md diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 261593423ad..3262be71902 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5829,7 +5829,6 @@ Status DBImpl::IngestExternalFile( Status DBImpl::IngestExternalFiles( const std::vector& args) { // TODO: plumb Env::IOActivity, Env::IOPriority - const ReadOptions read_options; const WriteOptions write_options; if (args.empty()) { @@ -5855,6 +5854,10 @@ Status DBImpl::IngestExternalFiles( snprintf(err_msg, 128, "external_files[%zu] is empty", i); return Status::InvalidArgument(err_msg); } + if (i && args[i].options.fill_cache != args[i - 1].options.fill_cache) { + return Status::InvalidArgument( + "fill_cache should be the same across ingestion options."); + } } for (const auto& arg : args) { const IngestExternalFileOptions& ingest_opts = arg.options; @@ -6042,6 +6045,8 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { + ReadOptions read_options; + read_options.fill_cache = args[0].options.fill_cache; autovector cfds_to_commit; autovector mutable_cf_options_list; autovector> edit_lists; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7bf8e7849bc..8e429e397ae 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -567,13 +567,9 @@ void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() { file_ingesting_compactions_.push_back(new Compaction( cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options, mutable_db_options_, {input}, output_level, - MaxFileSizeForLevel( - mutable_cf_options, output_level, - cfd_->ioptions()->compaction_style) /* output file size - limit, - * not applicable - */ - , + /* output file size limit not applicable */ + MaxFileSizeForLevel(mutable_cf_options, output_level, + cfd_->ioptions()->compaction_style), LLONG_MAX /* max compaction bytes, not applicable */, 0 /* output path ID, not applicable */, mutable_cf_options.compression, mutable_cf_options.compression_opts, @@ -727,7 +723,10 @@ Status ExternalSstFileIngestionJob::ResetTableReader( new RandomAccessFileReader(std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_)); table_reader->reset(); + ReadOptions ro; + ro.fill_cache = ingestion_options_.fill_cache; status = cfd_->ioptions()->table_factory->NewTableReader( + ro, TableReaderOptions( *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor, env_options_, cfd_->internal_comparator(), @@ -739,7 +738,9 @@ Status ExternalSstFileIngestionJob::ResetTableReader( /*cur_file_num*/ new_file_number, /* unique_id */ {}, /* largest_seqno */ 0, /* tail_size */ 0, user_defined_timestamps_persisted), - std::move(sst_file_reader), file_to_ingest->file_size, table_reader); + std::move(sst_file_reader), file_to_ingest->file_size, table_reader, + // No need to prefetch index/filter if caching is not needed. + /*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache); return status; } @@ -885,6 +886,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( // TODO: plumb Env::IOActivity, Env::IOPriority ReadOptions ro; ro.readahead_size = ingestion_options_.verify_checksums_readahead_size; + ro.fill_cache = ingestion_options_.fill_cache; status = table_reader->VerifyChecksum( ro, TableReaderCaller::kExternalSSTIngestion); if (!status.ok()) { @@ -895,6 +897,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( ParsedInternalKey key; // TODO: plumb Env::IOActivity, Env::IOPriority ReadOptions ro; + ro.fill_cache = ingestion_options_.fill_cache; std::unique_ptr iter(table_reader->NewIterator( ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion)); @@ -1064,6 +1067,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( Arena arena; // TODO: plumb Env::IOActivity, Env::IOPriority ReadOptions ro; + ro.fill_cache = ingestion_options_.fill_cache; ro.total_order_seek = true; int target_level = 0; auto* vstorage = cfd_->current()->storage_info(); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index ce01455d0ce..de261af7a01 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include + #include #include @@ -150,7 +152,7 @@ class ExternalSSTFileTest bool verify_checksums_before_ingest = true, bool ingest_behind = false, bool sort_data = false, std::map* true_data = nullptr, - ColumnFamilyHandle* cfh = nullptr) { + ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) { // Generate a file id if not provided if (file_id == -1) { file_id = last_file_id_ + 1; @@ -194,6 +196,7 @@ class ExternalSSTFileTest ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false; ifo.verify_checksums_before_ingest = verify_checksums_before_ingest; ifo.ingest_behind = ingest_behind; + ifo.fill_cache = fill_cache; if (cfh) { s = db_->IngestExternalFile(cfh, {file_path}, ifo); } else { @@ -267,15 +270,15 @@ class ExternalSSTFileTest bool verify_checksums_before_ingest = true, bool ingest_behind = false, bool sort_data = false, std::map* true_data = nullptr, - ColumnFamilyHandle* cfh = nullptr) { + ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) { std::vector> file_data; for (auto& k : keys) { file_data.emplace_back(Key(k), Key(k) + std::to_string(file_id)); } - return GenerateAndAddExternalFile(options, file_data, file_id, - allow_global_seqno, write_global_seqno, - verify_checksums_before_ingest, - ingest_behind, sort_data, true_data, cfh); + return GenerateAndAddExternalFile( + options, file_data, file_id, allow_global_seqno, write_global_seqno, + verify_checksums_before_ingest, ingest_behind, sort_data, true_data, + cfh, fill_cache); } Status DeprecatedAddFile(const std::vector& files, @@ -314,6 +317,49 @@ TEST_F(ExternalSSTFileTest, ComparatorMismatch) { ASSERT_NOK(DeprecatedAddFile({file})); } +TEST_F(ExternalSSTFileTest, NoBlockCache) { + LRUCacheOptions co; + co.capacity = 32 << 20; + std::shared_ptr cache = NewLRUCache(co); + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + table_options.cache_index_and_filter_blocks = true; + Options options = CurrentOptions(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + + size_t usage_before_ingestion = cache->GetUsage(); + std::map true_data; + // Ingest with fill_cache = true + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, false, false, true, + false, false, &true_data, nullptr, + /*fill_cache=*/true)); + ASSERT_EQ(FilesPerLevel(), "0,0,0,0,0,0,1"); + EXPECT_GT(cache->GetUsage(), usage_before_ingestion); + + TablePropertiesCollection tp; + ASSERT_OK(db_->GetPropertiesOfAllTables(&tp)); + for (const auto& entry : tp) { + EXPECT_GT(entry.second->index_size, 0); + EXPECT_GT(entry.second->filter_size, 0); + } + + usage_before_ingestion = cache->GetUsage(); + // Ingest with fill_cache = false + ASSERT_OK(GenerateAndAddExternalFile(options, {3, 4}, -1, false, false, true, + false, false, &true_data, nullptr, + /*fill_cache=*/false)); + EXPECT_EQ(usage_before_ingestion, cache->GetUsage()); + + tp.clear(); + ASSERT_OK(db_->GetPropertiesOfAllTables(&tp)); + for (const auto& entry : tp) { + EXPECT_GT(entry.second->index_size, 0); + EXPECT_GT(entry.second->filter_size, 0); + } +} + TEST_F(ExternalSSTFileTest, Basic) { do { Options options = CurrentOptions(); diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 1e628d7d2f6..7ee567cb88e 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -2057,11 +2057,13 @@ class NonBatchedOpsStressTest : public StressTest { ingest_options.verify_checksums_before_ingest = thread->rand.OneInOpt(2); ingest_options.verify_checksums_readahead_size = thread->rand.OneInOpt(2) ? 1024 * 1024 : 0; + ingest_options.fill_cache = thread->rand.OneInOpt(4); ingest_options_oss << "move_files: " << ingest_options.move_files << ", verify_checksums_before_ingest: " << ingest_options.verify_checksums_before_ingest << ", verify_checksums_readahead_size: " - << ingest_options.verify_checksums_readahead_size; + << ingest_options.verify_checksums_readahead_size + << ", fill_cache: " << ingest_options.fill_cache; s = db_->IngestExternalFile(column_families_[column_family], {sst_filename}, ingest_options); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index eabab112bef..1ead96c2a16 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2271,6 +2271,14 @@ struct IngestExternalFileOptions { // RepairDB() may not recover these files correctly, potentially leading to // data loss. bool allow_db_generated_files = false; + + // Controls whether data and metadata blocks (e.g. index, filter) read during + // file ingestion will be added to block cache. + // Users may wish to set this to false when bulk loading into a CF that is not + // available for reads yet. + // When ingesting to multiple families, this option should be the same across + // ingestion options. + bool fill_cache = true; }; enum TraceFilterType : uint64_t { diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index d07fd4ef50f..35d387ae9f1 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -653,6 +653,7 @@ Status BlockBasedTable::Open( ro.rate_limiter_priority = read_options.rate_limiter_priority; ro.verify_checksums = read_options.verify_checksums; ro.io_activity = read_options.io_activity; + ro.fill_cache = read_options.fill_cache; // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 1aadc62e017..34519f2572c 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -500,6 +500,8 @@ class BlockBasedTable : public TableReader { InternalIterator* meta_iter, const InternalKeyComparator& internal_comparator, BlockCacheLookupContext* lookup_context); + // If index and filter blocks do not need to be pinned, `prefetch_all` + // determines whether they will be read and add to cache. Status PrefetchIndexAndFilterBlocks( const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, diff --git a/unreleased_history/new_features/ingest-fill-cache.md b/unreleased_history/new_features/ingest-fill-cache.md new file mode 100644 index 00000000000..f20d6c09d27 --- /dev/null +++ b/unreleased_history/new_features/ingest-fill-cache.md @@ -0,0 +1 @@ +* Add a new file ingestion option `IngestExternalFileOptions::fill_cache` to support not adding blocks from ingested files into block cache during file ingestion. \ No newline at end of file