Skip to content

Commit

Permalink
Add an ingestion option to not fill block cache (#13067)
Browse files Browse the repository at this point in the history
Summary:
add `IngestExternalFileOptions::fill_cache` to allow users to ingest files without loading index/filter/data and other blocks into block cache during file ingestion. This can be useful when users are ingesting files into a CF that is not available to readers yet.

Pull Request resolved: #13067

Test Plan:
* unit test: `ExternalSSTFileTest.NoBlockCache`
* ran one round of crash test with fill_cache disabled: `python3 ./tools/db_crashtest.py --simple blackbox --ops_per_thread=1000000 --interval=30 --ingest_external_file_one_in=200 --level0_stop_writes_trigger=200 --level0_slowdown_writes_trigger=100 --sync_fault_injection=0 --disable_wal=0 --manual_wal_flush_one_in=0`

Reviewed By: jowlyzhang

Differential Revision: D64356424

Pulled By: cbi42

fbshipit-source-id: b380c26f5987238e1ed7d42ceef0390cfaa0b8e2
  • Loading branch information
cbi42 authored and facebook-github-bot committed Oct 16, 2024
1 parent ecc084d commit 787730c
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 16 deletions.
7 changes: 6 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5829,7 +5829,6 @@ Status DBImpl::IngestExternalFile(
Status DBImpl::IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) {
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
const WriteOptions write_options;

if (args.empty()) {
Expand All @@ -5855,6 +5854,10 @@ Status DBImpl::IngestExternalFiles(
snprintf(err_msg, 128, "external_files[%zu] is empty", i);
return Status::InvalidArgument(err_msg);
}
if (i && args[i].options.fill_cache != args[i - 1].options.fill_cache) {
return Status::InvalidArgument(
"fill_cache should be the same across ingestion options.");
}
}
for (const auto& arg : args) {
const IngestExternalFileOptions& ingest_opts = arg.options;
Expand Down Expand Up @@ -6042,6 +6045,8 @@ Status DBImpl::IngestExternalFiles(
}
}
if (status.ok()) {
ReadOptions read_options;
read_options.fill_cache = args[0].options.fill_cache;
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
Expand Down
20 changes: 12 additions & 8 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,9 @@ void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
file_ingesting_compactions_.push_back(new Compaction(
cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options,
mutable_db_options_, {input}, output_level,
MaxFileSizeForLevel(
mutable_cf_options, output_level,
cfd_->ioptions()->compaction_style) /* output file size
limit,
* not applicable
*/
,
/* output file size limit not applicable */
MaxFileSizeForLevel(mutable_cf_options, output_level,
cfd_->ioptions()->compaction_style),
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compression,
mutable_cf_options.compression_opts,
Expand Down Expand Up @@ -727,7 +723,10 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
new RandomAccessFileReader(std::move(sst_file), external_file,
nullptr /*Env*/, io_tracer_));
table_reader->reset();
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
status = cfd_->ioptions()->table_factory->NewTableReader(
ro,
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
Expand All @@ -739,7 +738,9 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
/*cur_file_num*/ new_file_number,
/* unique_id */ {}, /* largest_seqno */ 0,
/* tail_size */ 0, user_defined_timestamps_persisted),
std::move(sst_file_reader), file_to_ingest->file_size, table_reader);
std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
// No need to prefetch index/filter if caching is not needed.
/*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
return status;
}

Expand Down Expand Up @@ -885,6 +886,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro;
ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
ro.fill_cache = ingestion_options_.fill_cache;
status = table_reader->VerifyChecksum(
ro, TableReaderCaller::kExternalSSTIngestion);
if (!status.ok()) {
Expand All @@ -895,6 +897,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ParsedInternalKey key;
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
Expand Down Expand Up @@ -1064,6 +1067,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
Arena arena;
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
ro.total_order_seek = true;
int target_level = 0;
auto* vstorage = cfd_->current()->storage_info();
Expand Down
58 changes: 52 additions & 6 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include <table/block_based/block_based_table_factory.h>

#include <functional>
#include <memory>

Expand Down Expand Up @@ -150,7 +152,7 @@ class ExternalSSTFileTest
bool verify_checksums_before_ingest = true, bool ingest_behind = false,
bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) {
// Generate a file id if not provided
if (file_id == -1) {
file_id = last_file_id_ + 1;
Expand Down Expand Up @@ -194,6 +196,7 @@ class ExternalSSTFileTest
ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false;
ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
ifo.ingest_behind = ingest_behind;
ifo.fill_cache = fill_cache;
if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else {
Expand Down Expand Up @@ -267,15 +270,15 @@ class ExternalSSTFileTest
bool verify_checksums_before_ingest = true, bool ingest_behind = false,
bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + std::to_string(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, write_global_seqno,
verify_checksums_before_ingest,
ingest_behind, sort_data, true_data, cfh);
return GenerateAndAddExternalFile(
options, file_data, file_id, allow_global_seqno, write_global_seqno,
verify_checksums_before_ingest, ingest_behind, sort_data, true_data,
cfh, fill_cache);
}

Status DeprecatedAddFile(const std::vector<std::string>& files,
Expand Down Expand Up @@ -314,6 +317,49 @@ TEST_F(ExternalSSTFileTest, ComparatorMismatch) {
ASSERT_NOK(DeprecatedAddFile({file}));
}

TEST_F(ExternalSSTFileTest, NoBlockCache) {
LRUCacheOptions co;
co.capacity = 32 << 20;
std::shared_ptr<Cache> cache = NewLRUCache(co);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
table_options.cache_index_and_filter_blocks = true;
Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);

size_t usage_before_ingestion = cache->GetUsage();
std::map<std::string, std::string> true_data;
// Ingest with fill_cache = true
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, false, false, true,
false, false, &true_data, nullptr,
/*fill_cache=*/true));
ASSERT_EQ(FilesPerLevel(), "0,0,0,0,0,0,1");
EXPECT_GT(cache->GetUsage(), usage_before_ingestion);

TablePropertiesCollection tp;
ASSERT_OK(db_->GetPropertiesOfAllTables(&tp));
for (const auto& entry : tp) {
EXPECT_GT(entry.second->index_size, 0);
EXPECT_GT(entry.second->filter_size, 0);
}

usage_before_ingestion = cache->GetUsage();
// Ingest with fill_cache = false
ASSERT_OK(GenerateAndAddExternalFile(options, {3, 4}, -1, false, false, true,
false, false, &true_data, nullptr,
/*fill_cache=*/false));
EXPECT_EQ(usage_before_ingestion, cache->GetUsage());

tp.clear();
ASSERT_OK(db_->GetPropertiesOfAllTables(&tp));
for (const auto& entry : tp) {
EXPECT_GT(entry.second->index_size, 0);
EXPECT_GT(entry.second->filter_size, 0);
}
}

TEST_F(ExternalSSTFileTest, Basic) {
do {
Options options = CurrentOptions();
Expand Down
4 changes: 3 additions & 1 deletion db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2057,11 +2057,13 @@ class NonBatchedOpsStressTest : public StressTest {
ingest_options.verify_checksums_before_ingest = thread->rand.OneInOpt(2);
ingest_options.verify_checksums_readahead_size =
thread->rand.OneInOpt(2) ? 1024 * 1024 : 0;
ingest_options.fill_cache = thread->rand.OneInOpt(4);
ingest_options_oss << "move_files: " << ingest_options.move_files
<< ", verify_checksums_before_ingest: "
<< ingest_options.verify_checksums_before_ingest
<< ", verify_checksums_readahead_size: "
<< ingest_options.verify_checksums_readahead_size;
<< ingest_options.verify_checksums_readahead_size
<< ", fill_cache: " << ingest_options.fill_cache;
s = db_->IngestExternalFile(column_families_[column_family],
{sst_filename}, ingest_options);
}
Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,14 @@ struct IngestExternalFileOptions {
// RepairDB() may not recover these files correctly, potentially leading to
// data loss.
bool allow_db_generated_files = false;

// Controls whether data and metadata blocks (e.g. index, filter) read during
// file ingestion will be added to block cache.
// Users may wish to set this to false when bulk loading into a CF that is not
// available for reads yet.
// When ingesting to multiple families, this option should be the same across
// ingestion options.
bool fill_cache = true;
};

enum TraceFilterType : uint64_t {
Expand Down
1 change: 1 addition & 0 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ Status BlockBasedTable::Open(
ro.rate_limiter_priority = read_options.rate_limiter_priority;
ro.verify_checksums = read_options.verify_checksums;
ro.io_activity = read_options.io_activity;
ro.fill_cache = read_options.fill_cache;

// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
Expand Down
2 changes: 2 additions & 0 deletions table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ class BlockBasedTable : public TableReader {
InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator,
BlockCacheLookupContext* lookup_context);
// If index and filter blocks do not need to be pinned, `prefetch_all`
// determines whether they will be read and add to cache.
Status PrefetchIndexAndFilterBlocks(
const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table,
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/new_features/ingest-fill-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Add a new file ingestion option `IngestExternalFileOptions::fill_cache` to support not adding blocks from ingested files into block cache during file ingestion.

0 comments on commit 787730c

Please sign in to comment.