Skip to content

Commit

Permalink
Add Remote Compaction Installation Callback Function (#12940)
Browse files Browse the repository at this point in the history
Summary:
Add an optional callback function upon remote compaction temp output installation. This will be internally used for setting the final status in the Offload Infra.

Pull Request resolved: #12940

Test Plan:
Unit Test added
```
./compaction_service_test
```

_Also internally tested by manually merging into internal code base_

Reviewed By: anand1976

Differential Revision: D61419157

Pulled By: jaykorean

fbshipit-source-id: 66831685bc403949c26bfc65840dd1900d2a5a67
  • Loading branch information
jaykorean authored and facebook-github-bot committed Aug 19, 2024
1 parent 295326b commit 273b3ea
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
28 changes: 17 additions & 11 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
return compaction_status;
}

// CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
// read the result. Consider this as an installation failure
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;
Expand All @@ -154,18 +158,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
is_first_one = false;
}

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Receive remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_result.output_path.c_str(),
output_files_oss.str().c_str());

if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());

// Installation Starts
for (const auto& file : compaction_result.output_files) {
uint64_t file_num = versions_->NewFileNumber();
auto src_file = compaction_result.output_path + "/" + file.file_name;
Expand All @@ -174,6 +174,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}

Expand All @@ -182,6 +184,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
Expand All @@ -206,6 +210,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
return CompactionServiceJobStatus::kSuccess;
}

Expand Down
15 changes: 15 additions & 0 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class MyTestCompactionService : public CompactionService {
}
}

void OnInstallation(const std::string& /*scheduled_job_id*/,
CompactionServiceJobStatus status) override {
final_updated_status_ = status;
}

int GetCompactionNum() { return compaction_num_.load(); }

CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
Expand Down Expand Up @@ -136,6 +141,10 @@ class MyTestCompactionService : public CompactionService {

void SetCanceled(bool canceled) { canceled_ = canceled; }

CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() {
return final_updated_status_.load();
}

private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
Expand All @@ -158,6 +167,8 @@ class MyTestCompactionService : public CompactionService {
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
table_properties_collector_factories_;
std::atomic_bool canceled_{false};
std::atomic<CompactionServiceJobStatus> final_updated_status_{
CompactionServiceJobStatus::kUseLocal};
};

class CompactionServiceTest : public DBTestBase {
Expand Down Expand Up @@ -255,6 +266,8 @@ TEST_F(CompactionServiceTest, BasicCompactions) {

auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
ASSERT_EQ(CompactionServiceJobStatus::kSuccess,
my_cs->GetFinalCompactionServiceJobStatus());

// make sure the compaction statistics is only recorded on the remote side
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
Expand Down Expand Up @@ -437,6 +450,8 @@ TEST_F(CompactionServiceTest, InvalidResult) {
Slice end(end_str);
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_FALSE(s.ok());
ASSERT_EQ(CompactionServiceJobStatus::kFailure,
my_cs->GetFinalCompactionServiceJobStatus());
}

TEST_F(CompactionServiceTest, SubCompaction) {
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ class CompactionService : public Customizable {
return CompactionServiceJobStatus::kUseLocal;
}

// Optional callback function upon Installation.
virtual void OnInstallation(const std::string& /*scheduled_job_id*/,
CompactionServiceJobStatus /*status*/) {}

// Deprecated. Please implement Schedule() and Wait() API to handle remote
// compaction

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Adds optional installation callback function for remote compaction

0 comments on commit 273b3ea

Please sign in to comment.