From 273b3eadf0ad06acaaeaf30efc35be5ab7588a9c Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Mon, 19 Aug 2024 11:22:43 -0700 Subject: [PATCH] Add Remote Compaction Installation Callback Function (#12940) Summary: Add an optional callback function upon remote compaction temp output installation. This will be internally used for setting the final status in the Offload Infra. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12940 Test Plan: Unit Test added ``` ./compaction_service_test ``` _Also internally tested by manually merging into internal code base_ Reviewed By: anand1976 Differential Revision: D61419157 Pulled By: jaykorean fbshipit-source-id: 66831685bc403949c26bfc65840dd1900d2a5a67 --- db/compaction/compaction_service_job.cc | 28 +++++++++++-------- db/compaction/compaction_service_test.cc | 15 ++++++++++ include/rocksdb/options.h | 4 +++ .../callback_fn_remote_compaction.md | 2 ++ 4 files changed, 38 insertions(+), 11 deletions(-) create mode 100644 unreleased_history/public_api_changes/callback_fn_remote_compaction.md diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 7f6b872cb2b..3b56d057b46 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -140,9 +140,13 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( return compaction_status; } + // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to + // read the result. Consider this as an installation failure if (!s.ok()) { sub_compact->status = s; compaction_result.status.PermitUncheckedError(); + db_options_.compaction_service->OnInstallation( + response.scheduled_job_id, CompactionServiceJobStatus::kFailure); return CompactionServiceJobStatus::kFailure; } sub_compact->status = compaction_result.status; @@ -154,18 +158,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( is_first_one = false; } - ROCKS_LOG_INFO(db_options_.info_log, - "[%s] [JOB %d] Receive remote compaction result, output path: " - "%s, files: %s", - compaction_input.column_family.name.c_str(), job_id_, - compaction_result.output_path.c_str(), - output_files_oss.str().c_str()); - - if (!s.ok()) { - sub_compact->status = s; - return CompactionServiceJobStatus::kFailure; - } + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Received remote compaction result, output path: " + "%s, files: %s", + compaction_input.column_family.name.c_str(), job_id_, + compaction_result.output_path.c_str(), output_files_oss.str().c_str()); + // Installation Starts for (const auto& file : compaction_result.output_files) { uint64_t file_num = versions_->NewFileNumber(); auto src_file = compaction_result.output_path + "/" + file.file_name; @@ -174,6 +174,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); if (!s.ok()) { sub_compact->status = s; + db_options_.compaction_service->OnInstallation( + response.scheduled_job_id, CompactionServiceJobStatus::kFailure); return CompactionServiceJobStatus::kFailure; } @@ -182,6 +184,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); if (!s.ok()) { sub_compact->status = s; + db_options_.compaction_service->OnInstallation( + response.scheduled_job_id, CompactionServiceJobStatus::kFailure); return CompactionServiceJobStatus::kFailure; } meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, @@ -206,6 +210,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read); RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES, compaction_result.bytes_written); + db_options_.compaction_service->OnInstallation( + response.scheduled_job_id, CompactionServiceJobStatus::kSuccess); return CompactionServiceJobStatus::kSuccess; } diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 812a658dcff..8aacf2b6d2e 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -108,6 +108,11 @@ class MyTestCompactionService : public CompactionService { } } + void OnInstallation(const std::string& /*scheduled_job_id*/, + CompactionServiceJobStatus status) override { + final_updated_status_ = status; + } + int GetCompactionNum() { return compaction_num_.load(); } CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; } @@ -136,6 +141,10 @@ class MyTestCompactionService : public CompactionService { void SetCanceled(bool canceled) { canceled_ = canceled; } + CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() { + return final_updated_status_.load(); + } + private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; @@ -158,6 +167,8 @@ class MyTestCompactionService : public CompactionService { std::vector> table_properties_collector_factories_; std::atomic_bool canceled_{false}; + std::atomic final_updated_status_{ + CompactionServiceJobStatus::kUseLocal}; }; class CompactionServiceTest : public DBTestBase { @@ -255,6 +266,8 @@ TEST_F(CompactionServiceTest, BasicCompactions) { auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); + ASSERT_EQ(CompactionServiceJobStatus::kSuccess, + my_cs->GetFinalCompactionServiceJobStatus()); // make sure the compaction statistics is only recorded on the remote side ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1); @@ -437,6 +450,8 @@ TEST_F(CompactionServiceTest, InvalidResult) { Slice end(end_str); Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_FALSE(s.ok()); + ASSERT_EQ(CompactionServiceJobStatus::kFailure, + my_cs->GetFinalCompactionServiceJobStatus()); } TEST_F(CompactionServiceTest, SubCompaction) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index db2079ac693..8223c4a1338 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -512,6 +512,10 @@ class CompactionService : public Customizable { return CompactionServiceJobStatus::kUseLocal; } + // Optional callback function upon Installation. + virtual void OnInstallation(const std::string& /*scheduled_job_id*/, + CompactionServiceJobStatus /*status*/) {} + // Deprecated. Please implement Schedule() and Wait() API to handle remote // compaction diff --git a/unreleased_history/public_api_changes/callback_fn_remote_compaction.md b/unreleased_history/public_api_changes/callback_fn_remote_compaction.md new file mode 100644 index 00000000000..be21461cd0f --- /dev/null +++ b/unreleased_history/public_api_changes/callback_fn_remote_compaction.md @@ -0,0 +1,2 @@ +Adds optional installation callback function for remote compaction +