Skip to content

Commit

Permalink
[branch-3.2][Feature] Add compaction grafana metric (backport #30446)
Browse files Browse the repository at this point in the history
-e
Signed-off-by: 周沛辰 <[email protected]>
  • Loading branch information
choubenson authored and dirtysalt committed Aug 8, 2024
1 parent 797a071 commit b6869b3
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 9 deletions.
16 changes: 12 additions & 4 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ Status CompactionAction::do_compaction(uint64_t tablet_id, const string& compact

auto* mem_tracker = GlobalEnv::GetInstance()->compaction_mem_tracker();
if (tablet->updates() != nullptr) {
StarRocksMetrics::instance()->update_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_update_compaction_task_num.increment(1);
DeferOp op([&] { StarRocksMetrics::instance()->running_update_compaction_task_num.increment(-1); });
Status res;
if (rowset_ids_string.empty()) {
RETURN_IF_ERROR(tablet->updates()->compaction(mem_tracker));
res = tablet->updates()->compaction(mem_tracker);
} else {
vector<string> id_str_list = strings::Split(rowset_ids_string, ",", strings::SkipEmpty());
vector<uint32_t> rowset_ids;
Expand All @@ -138,7 +142,13 @@ Status CompactionAction::do_compaction(uint64_t tablet_id, const string& compact
if (rowset_ids.empty()) {
return Status::InvalidArgument(fmt::format("empty argument. rowset_ids:{}", rowset_ids_string));
}
RETURN_IF_ERROR(tablet->updates()->compaction(mem_tracker, rowset_ids));
res = tablet->updates()->compaction(mem_tracker, rowset_ids);
}
if (!res.ok()) {
StarRocksMetrics::instance()->update_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to perform update compaction. res=" << res.get_error_msg()
<< ", tablet=" << tablet->full_name();
return res;
}
return Status::OK();
}
Expand All @@ -149,7 +159,6 @@ Status CompactionAction::do_compaction(uint64_t tablet_id, const string& compact
}

if (compaction_type == to_string(CompactionType::CUMULATIVE_COMPACTION)) {
StarRocksMetrics::instance()->cumulative_compaction_request_total.increment(1);
if (config::enable_size_tiered_compaction_strategy) {
if (tablet->need_compaction()) {
auto compaction_task = tablet->create_compaction_task();
Expand Down Expand Up @@ -183,7 +192,6 @@ Status CompactionAction::do_compaction(uint64_t tablet_id, const string& compact
}
tablet->set_last_cumu_compaction_failure_time(0);
} else if (compaction_type == to_string(CompactionType::BASE_COMPACTION)) {
StarRocksMetrics::instance()->base_compaction_request_total.increment(1);
if (config::enable_size_tiered_compaction_strategy) {
if (tablet->force_base_compaction()) {
auto compaction_task = tablet->create_compaction_task();
Expand Down
13 changes: 12 additions & 1 deletion be/src/storage/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ Status BaseCompaction::compact() {
return Status::InvalidArgument("base compaction input parameter error.");
}

StarRocksMetrics::instance()->base_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_base_compaction_task_num.increment(1);
std::unique_lock lock(_tablet->get_base_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
return Status::OK();
}
int64_t start_time = UnixMillis();
TRACE("got base compaction lock");

// 1. pick rowsets to compact
Expand All @@ -44,7 +47,10 @@ Status BaseCompaction::compact() {
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());

MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(_mem_tracker);
DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); });
DeferOp op([&] {
tls_thread_status.set_mem_tracker(prev_tracker);
StarRocksMetrics::instance()->running_base_compaction_task_num.increment(-1);
});

// 2. do base compaction, merge rowsets
RETURN_IF_ERROR(do_compaction());
Expand All @@ -54,8 +60,13 @@ Status BaseCompaction::compact() {
_state = CompactionState::SUCCESS;

// 4. add metric to base compaction
int64_t end_time = UnixMillis();
int64_t cost_time = end_time - start_time;
StarRocksMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size());
StarRocksMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size);
StarRocksMetrics::instance()->base_compaction_task_cost_time_ms.set_value(cost_time);
StarRocksMetrics::instance()->base_compaction_task_byte_per_second.set_value(_input_rowsets_size /
(cost_time / 1000.0 + 1));
TRACE("save base compaction metrics");

return Status::OK();
Expand Down
26 changes: 26 additions & 0 deletions be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ void CompactionManager::update_candidates(std::vector<CompactionCandidate> candi
bool has_erase = false;
for (auto& candidate : candidates) {
if (candidate.tablet->tablet_id() == iter->tablet->tablet_id()) {
if (iter->type == CompactionType::BASE_COMPACTION) {
StarRocksMetrics::instance()->wait_base_compaction_task_num.increment(-1);
} else {
StarRocksMetrics::instance()->wait_cumulative_compaction_task_num.increment(-1);
}
iter = _compaction_candidates.erase(iter);
has_erase = true;
break;
Expand All @@ -159,6 +164,11 @@ void CompactionManager::update_candidates(std::vector<CompactionCandidate> candi
if (candidate.tablet->enable_compaction()) {
VLOG(1) << "update candidate " << candidate.tablet->tablet_id() << " type "
<< starrocks::to_string(candidate.type) << " score " << candidate.score;
if (candidate.type == CompactionType::BASE_COMPACTION) {
StarRocksMetrics::instance()->wait_base_compaction_task_num.increment(1);
} else {
StarRocksMetrics::instance()->wait_cumulative_compaction_task_num.increment(1);
}
_compaction_candidates.emplace(std::move(candidate));
}
}
Expand All @@ -176,6 +186,11 @@ void CompactionManager::remove_candidate(int64_t tablet_id) {
std::lock_guard lg(_candidates_mutex);
for (auto iter = _compaction_candidates.begin(); iter != _compaction_candidates.end();) {
if (tablet_id == iter->tablet->tablet_id()) {
if (iter->type == CompactionType::BASE_COMPACTION) {
StarRocksMetrics::instance()->wait_base_compaction_task_num.increment(-1);
} else {
StarRocksMetrics::instance()->wait_cumulative_compaction_task_num.increment(-1);
}
iter = _compaction_candidates.erase(iter);
break;
} else {
Expand Down Expand Up @@ -266,6 +281,11 @@ bool CompactionManager::pick_candidate(CompactionCandidate* candidate) {
*candidate = *iter;
_compaction_candidates.erase(iter);
_last_score = candidate->score;
if (candidate->type == CompactionType::BASE_COMPACTION) {
StarRocksMetrics::instance()->wait_base_compaction_task_num.increment(-1);
} else {
StarRocksMetrics::instance()->wait_cumulative_compaction_task_num.increment(-1);
}
return true;
}
iter++;
Expand Down Expand Up @@ -356,9 +376,13 @@ bool CompactionManager::register_task(CompactionTask* compaction_task) {
if (compaction_task->compaction_type() == CUMULATIVE_COMPACTION) {
_data_dir_to_cumulative_task_num_map[data_dir]++;
_cumulative_compaction_concurrency++;
StarRocksMetrics::instance()->cumulative_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_cumulative_compaction_task_num.increment(1);
} else {
_data_dir_to_base_task_num_map[data_dir]++;
_base_compaction_concurrency++;
StarRocksMetrics::instance()->base_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_base_compaction_task_num.increment(1);
}
return true;
}
Expand All @@ -378,9 +402,11 @@ void CompactionManager::unregister_task(CompactionTask* compaction_task) {
if (compaction_task->compaction_type() == CUMULATIVE_COMPACTION) {
_data_dir_to_cumulative_task_num_map[data_dir]--;
_cumulative_compaction_concurrency--;
StarRocksMetrics::instance()->running_cumulative_compaction_task_num.increment(-1);
} else {
_data_dir_to_base_task_num_map[data_dir]--;
_base_compaction_concurrency--;
StarRocksMetrics::instance()->running_base_compaction_task_num.increment(-1);
}
}
if (iter->second.empty()) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ bool CompactionTask::should_stop() const {
void CompactionTask::_success_callback() {
set_compaction_task_state(COMPACTION_SUCCESS);
// for compatible, update compaction time
int64_t cost_time = UnixMillis() - _task_info.start_time;
if (_task_info.compaction_type == CUMULATIVE_COMPACTION) {
_tablet->set_last_cumu_compaction_success_time(UnixMillis());
_tablet->set_last_cumu_compaction_failure_status(TStatusCode::OK);
Expand All @@ -158,9 +159,15 @@ void CompactionTask::_success_callback() {
if (_task_info.compaction_type == CUMULATIVE_COMPACTION) {
StarRocksMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size());
StarRocksMetrics::instance()->cumulative_compaction_bytes_total.increment(_task_info.input_rowsets_size);
StarRocksMetrics::instance()->cumulative_compaction_task_cost_time_ms.set_value(cost_time);
StarRocksMetrics::instance()->cumulative_compaction_task_byte_per_second.set_value(
_task_info.input_rowsets_size / (cost_time / 1000.0 + 1));
} else {
StarRocksMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size());
StarRocksMetrics::instance()->base_compaction_bytes_total.increment(_task_info.input_rowsets_size);
StarRocksMetrics::instance()->base_compaction_task_cost_time_ms.set_value(cost_time);
StarRocksMetrics::instance()->base_compaction_task_byte_per_second.set_value(_task_info.input_rowsets_size /
(cost_time / 1000.0 + 1));
}

// preload the rowset
Expand All @@ -178,8 +185,10 @@ void CompactionTask::_failure_callback(const Status& st) {
if (_task_info.compaction_type == CUMULATIVE_COMPACTION) {
_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
_tablet->set_last_cumu_compaction_failure_status(st.code());
StarRocksMetrics::instance()->cumulative_compaction_request_failed.increment(1);
} else {
_tablet->set_last_base_compaction_failure_time(UnixMillis());
StarRocksMetrics::instance()->base_compaction_request_failed.increment(1);
}
LOG(WARNING) << "compaction task:" << _task_info.task_id << ", tablet:" << _task_info.tablet_id << " failed.";
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ Status CumulativeCompaction::compact() {
return Status::InvalidArgument("cumulative compaction input parameter error.");
}

StarRocksMetrics::instance()->cumulative_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_cumulative_compaction_task_num.increment(1);
DeferOp op([&] { StarRocksMetrics::instance()->running_cumulative_compaction_task_num.increment(-1); });
std::unique_lock lock(_tablet->get_cumulative_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
return Status::OK();
}
int64_t start_time = UnixMillis();
TRACE("got cumulative compaction lock");

// 1.calculate cumulative point
Expand All @@ -62,8 +66,13 @@ Status CumulativeCompaction::compact() {
}

// 6. add metric to cumulative compaction
int64_t end_time = UnixMillis();
int64_t cost_time = end_time - start_time;
StarRocksMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size());
StarRocksMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size);
StarRocksMetrics::instance()->cumulative_compaction_task_cost_time_ms.set_value(cost_time);
StarRocksMetrics::instance()->cumulative_compaction_task_byte_per_second.set_value(_input_rowsets_size /
(cost_time / 1000.0 + 1));
TRACE("save cumulative compaction metrics");

return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/rowset_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ class RowsetMergerImpl : public RowsetMerger {
&total_rows, &total_chunk, &stats);
}
timer.stop();

// update compaction metric
float divided = 1000 * 1000 * 1000;
StarRocksMetrics::instance()->update_compaction_task_cost_time_ns.set_value(timer.elapsed_time());
StarRocksMetrics::instance()->update_compaction_task_byte_per_second.set_value(
total_input_size / (timer.elapsed_time() / divided + 1));
StarRocksMetrics::instance()->update_compaction_deltas_total.increment(rowsets.size());
StarRocksMetrics::instance()->update_compaction_bytes_total.increment(total_input_size);
StarRocksMetrics::instance()->update_compaction_outputs_total.increment(1);
Expand Down
6 changes: 2 additions & 4 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,6 @@ Status StorageEngine::_perform_cumulative_compaction(DataDir* data_dir,
}
TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);

StarRocksMetrics::instance()->cumulative_compaction_request_total.increment(1);

std::unique_ptr<MemTracker> mem_tracker =
std::make_unique<MemTracker>(MemTracker::COMPACTION, -1, "", _options.compaction_mem_tracker);
CumulativeCompaction cumulative_compaction(mem_tracker.get(), best_tablet);
Expand Down Expand Up @@ -941,8 +939,6 @@ Status StorageEngine::_perform_base_compaction(DataDir* data_dir, std::pair<int3
}
TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);

StarRocksMetrics::instance()->base_compaction_request_total.increment(1);

std::unique_ptr<MemTracker> mem_tracker =
std::make_unique<MemTracker>(MemTracker::COMPACTION, -1, "", _options.compaction_mem_tracker);
BaseCompaction base_compaction(mem_tracker.get(), best_tablet);
Expand Down Expand Up @@ -1003,13 +999,15 @@ Status StorageEngine::_perform_update_compaction(DataDir* data_dir) {
int64_t duration_ns = 0;
{
StarRocksMetrics::instance()->update_compaction_request_total.increment(1);
StarRocksMetrics::instance()->running_update_compaction_task_num.increment(1);
SCOPED_RAW_TIMER(&duration_ns);

std::unique_ptr<MemTracker> mem_tracker =
std::make_unique<MemTracker>(MemTracker::COMPACTION, -1, "", _options.compaction_mem_tracker);
res = best_tablet->updates()->compaction(mem_tracker.get());
}
StarRocksMetrics::instance()->update_compaction_duration_us.increment(duration_ns / 1000);
StarRocksMetrics::instance()->running_update_compaction_task_num.increment(-1);
if (!res.ok()) {
StarRocksMetrics::instance()->update_compaction_request_failed.increment(1);
LOG(WARNING) << "failed to perform update compaction. res=" << res.to_string()
Expand Down
13 changes: 13 additions & 0 deletions be/src/util/starrocks_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_ENGINE_REQUEST_METRIC(base_compaction, failed, base_compaction_request_failed);
REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, total, cumulative_compaction_request_total);
REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, failed, cumulative_compaction_request_failed);
REGISTER_ENGINE_REQUEST_METRIC(update_compaction, total, update_compaction_request_total);
REGISTER_ENGINE_REQUEST_METRIC(update_compaction, failed, update_compaction_request_failed);

REGISTER_ENGINE_REQUEST_METRIC(publish, total, publish_task_request_total);
REGISTER_ENGINE_REQUEST_METRIC(publish, failed, publish_task_failed_total);
Expand Down Expand Up @@ -196,6 +198,17 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_STARROCKS_METRIC(tablet_cumulative_max_compaction_score);
REGISTER_STARROCKS_METRIC(tablet_base_max_compaction_score);
REGISTER_STARROCKS_METRIC(tablet_update_max_compaction_score);
REGISTER_STARROCKS_METRIC(wait_cumulative_compaction_task_num);
REGISTER_STARROCKS_METRIC(wait_base_compaction_task_num);
REGISTER_STARROCKS_METRIC(running_cumulative_compaction_task_num);
REGISTER_STARROCKS_METRIC(running_base_compaction_task_num);
REGISTER_STARROCKS_METRIC(running_update_compaction_task_num);
REGISTER_STARROCKS_METRIC(cumulative_compaction_task_cost_time_ms);
REGISTER_STARROCKS_METRIC(base_compaction_task_cost_time_ms);
REGISTER_STARROCKS_METRIC(update_compaction_task_cost_time_ns);
REGISTER_STARROCKS_METRIC(base_compaction_task_byte_per_second);
REGISTER_STARROCKS_METRIC(cumulative_compaction_task_byte_per_second);
REGISTER_STARROCKS_METRIC(update_compaction_task_byte_per_second);

REGISTER_STARROCKS_METRIC(max_tablet_rowset_num);

Expand Down
Loading

0 comments on commit b6869b3

Please sign in to comment.