Skip to content

Commit

Permalink
Merge branch 'master' into compiler_check
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 authored Jan 6, 2025
2 parents 7ae0ef6 + a539e22 commit 15cae01
Show file tree
Hide file tree
Showing 217 changed files with 18,676 additions and 960 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ See how to compile 🔗[Compilation](https://doris.apache.org/docs/dev/install/

### 📮 Install

See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/standard-deployment)
See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/cluster-deployment/standard-deployment)

## 🧩 Components

Expand Down
146 changes: 9 additions & 137 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -221,146 +222,17 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
return 0;
}

input_rowsets->clear();
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();

int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;

for (const auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
total_size = 0;
continue;
}
}

*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

transient_size += 1;
input_rowsets->push_back(rowset);

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
input_rowsets->clear();
*compaction_score = 0;
total_size = 0;
continue;
}
return transient_size;
} else if (
*compaction_score >=
config::compaction_max_rowset_count) { // If the number of rowsets is too large: FDB_ERROR_CODE_TXN_TOO_LARGE
return transient_size;
}
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
return transient_size;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}

// Condition 3: level1 achieve compaction_goal_size
std::vector<RowsetSharedPtr> level1_rowsets;
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
int64_t continuous_size = 0;
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
level1_rowsets.push_back(rowset);
continuous_size += rs_meta->total_disk_size();
if (level1_rowsets.size() >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
return transient_size;
}
}

input_rowsets->clear();
// Condition 5: If their are many empty rowsets, maybe should be compacted
tablet->calc_consecutive_empty_rowsets(
input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets->empty()) {
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
<< ", there are too many consecutive empty rowsets, size is "
<< input_rowsets->size();
return 0;
}
*compaction_score = 0;

return 0;
return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
input_rowsets, last_delete_version, compaction_score, allow_delete);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t first_level = 0;
for (size_t i = 0; i < input_rowsets.size(); i++) {
int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
if (i == 0) {
first_level = cur_level;
} else {
if (first_level != cur_level) {
LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level
<< ", cur_level: " << cur_level;
}
}
}
return first_level + 1;
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) {
return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
input_rowsets, output_rowset);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
Expand Down
18 changes: 15 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class CloudCumulativeCompactionPolicy {
Version& last_delete_version,
int64_t last_cumulative_point) = 0;

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;
virtual int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) = 0;

virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -52,6 +54,8 @@ class CloudCumulativeCompactionPolicy {
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) = 0;

virtual std::string name() = 0;
};

class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
Expand All @@ -68,7 +72,9 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override {
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {
return 0;
}

Expand All @@ -80,6 +86,8 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "size_based"; }

private:
int64_t _level_size(const int64_t size);

Expand All @@ -105,7 +113,9 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override;

int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -114,6 +124,8 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "time_series"; }
};

#include "common/compile_check_end.h"
Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ class MetaServiceProxy {
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
static std::unique_ptr<MetaServiceProxy[]> proxies;

if (config::meta_service_endpoint.empty()) {
return Status::InvalidArgument(
"Meta service endpoint is empty. Please configure manually or wait for "
"heartbeat to obtain.");
}
std::call_once(
proxies_flag, +[]() {
if (config::meta_service_connection_pooled) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

Expand Down Expand Up @@ -136,6 +137,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

Expand Down Expand Up @@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

Expand Down Expand Up @@ -287,6 +290,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/clucene
30 changes: 25 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

// Currently, updates are only made in the time_series.
update_compaction_level();

RETURN_IF_ERROR(modify_rowsets());

auto* cumu_policy = tablet()->cumulative_compaction_policy();
Expand Down Expand Up @@ -1317,6 +1320,15 @@ bool CompactionMixin::_check_if_includes_input_rowsets(
input_rowset_ids.begin(), input_rowset_ids.end());
}

void CompactionMixin::update_compaction_level() {
auto* cumu_policy = tablet()->cumulative_compaction_policy();
if (cumu_policy && cumu_policy->name() == CUMULATIVE_TIME_SERIES_POLICY) {
int64_t compaction_level =
cumu_policy->get_compaction_level(tablet(), _input_rowsets, _output_rowset);
_output_rowset->rowset_meta()->set_compaction_level(compaction_level);
}
}

Status Compaction::check_correctness() {
// 1. check row number
if (_input_row_num != _output_rowset->num_rows() + _stats.merged_rows + _stats.filtered_rows) {
Expand Down Expand Up @@ -1395,6 +1407,9 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
<< _output_rowset->rowset_meta()->rowset_id().to_string();
})

// Currently, updates are only made in the time_series.
update_compaction_level();

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand Down Expand Up @@ -1440,11 +1455,6 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx.newest_write_timestamp = _newest_write_timestamp;
ctx.write_type = DataWriteType::TYPE_COMPACTION;

auto compaction_policy = _tablet->tablet_meta()->compaction_policy();
if (_tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
ctx.compaction_level = _engine.cumu_compaction_policy(compaction_policy)
->new_compaction_level(_input_rowsets);
}
// We presume that the data involved in cumulative compaction is sufficiently 'hot'
// and should always be retained in the cache.
// TODO(gavin): Ensure that the retention of hot data is implemented with precision.
Expand Down Expand Up @@ -1472,4 +1482,14 @@ void CloudCompactionMixin::garbage_collection() {
}
}

void CloudCompactionMixin::update_compaction_level() {
auto compaction_policy = _tablet->tablet_meta()->compaction_policy();
auto cumu_policy = _engine.cumu_compaction_policy(compaction_policy);
if (cumu_policy && cumu_policy->name() == CUMULATIVE_TIME_SERIES_POLICY) {
int64_t compaction_level =
cumu_policy->get_compaction_level(cloud_tablet(), _input_rowsets, _output_rowset);
_output_rowset->rowset_meta()->set_compaction_level(compaction_level);
}
}

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class CompactionMixin : public Compaction {

bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;

void update_compaction_level();

PendingRowsetGuard _pending_rs_guard;
};

Expand Down Expand Up @@ -209,6 +211,8 @@ class CloudCompactionMixin : public Compaction {
virtual Status modify_rowsets();

int64_t get_compaction_permits();

void update_compaction_level();
};

} // namespace doris
4 changes: 0 additions & 4 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ Status CumulativeCompaction::execute_compact() {

RETURN_IF_ERROR(CompactionMixin::execute_compact());
DCHECK_EQ(_state, CompactionState::SUCCESS);
if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), _input_rowsets,
_output_rowset);
}

tablet()->cumulative_compaction_policy()->update_cumulative_point(
tablet(), _input_rowsets, _output_rowset, _last_delete_version);
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class CumulativeCompactionPolicy {
int64_t* cumulative_point) = 0;

// Updates the compaction level of a tablet after a compaction operation.
virtual void update_compaction_level(Tablet* tablet,
virtual int64_t get_compaction_level(Tablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) = 0;

Expand Down Expand Up @@ -154,8 +154,10 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol
/// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;

void update_compaction_level(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {}
int64_t get_compaction_level(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {
return 0;
}

std::string_view name() override { return CUMULATIVE_SIZE_BASED_POLICY; }

Expand Down
Loading

0 comments on commit 15cae01

Please sign in to comment.