diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index 6fa4b8d014313f0..64e51c77641311a 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -27,11 +27,14 @@ namespace doris { uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { uint32_t score = 0; + uint32_t level0_score = 0; bool base_rowset_exist = false; const int64_t point = tablet->cumulative_layer_point(); + int64_t level0_total_size = 0; RowsetMetaSharedPtr first_meta; int64_t first_version = INT64_MAX; + std::list checked_rs_metas; // NOTE: tablet._meta_lock is hold auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); // check the base rowset and collect the rowsets of cumulative part @@ -50,6 +53,12 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( } else { // collect the rowsets of cumulative part score += rs_meta->get_compaction_score(); + if (rs_meta->compaction_level() == 0) { + level0_total_size += rs_meta->total_disk_size(); + level0_score += rs_meta->get_compaction_score(); + } else { + checked_rs_metas.push_back(rs_meta); + } } } @@ -64,7 +73,64 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } - return score; + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size + int64_t compaction_goal_size_mbytes = + tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); + if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) { + return score; + } + + // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold + if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { + return score; + } + + // Condition 3: level1 achieve compaction_goal_size + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { + return a->version().first < b->version().first; + }); + int32_t rs_meta_count = 0; + int64_t continuous_size = 0; + for (const auto& rs_meta : checked_rs_metas) { + rs_meta_count++; + continuous_size += rs_meta->total_disk_size(); + if (rs_meta_count >= 2) { + if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { + return score; + } + } + } + } + + int64_t now = UnixMillis(); + int64_t last_cumu = tablet->last_cumu_compaction_success_time(); + if (last_cumu != 0) { + int64_t cumu_interval = now - last_cumu; + + // Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second + if (cumu_interval > + (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { + return score; + } + } else if (score > 0) { + // If the compaction process has not been successfully executed, + // the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met + tablet->set_last_cumu_compaction_success_time(now); + } + + // Condition 5: If there is a continuous set of empty rowsets, prioritize merging. + std::vector input_rowsets; + std::vector candidate_rowsets = + tablet->pick_candidate_rowsets_to_cumulative_compaction(); + tablet->calc_consecutive_empty_rowsets( + &input_rowsets, candidate_rowsets, + tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); + if (!input_rowsets.empty()) { + return score; + } + + return 0; } void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(