Skip to content

Commit

Permalink
Merge branch 'master' into DORIS-17763-2
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jan 22, 2025
2 parents cf43ffc + d7c99f8 commit e5c3d03
Show file tree
Hide file tree
Showing 2,148 changed files with 54,933 additions and 13,593 deletions.
8 changes: 5 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ github:
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
- cloud_p0 (Doris Cloud Regression)
#required_pull_request_reviews:
# dismiss_stale_reviews: true
# required_approving_review_count: 1

required_pull_request_reviews:
require_code_owner_reviews: true
required_approving_review_count: 1
dismiss_stale_reviews: true

branch-2.1:
required_status_checks:
Expand Down
18 changes: 17 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
be/src/io/* @platoneko @gavinchou @dataroaring

# for more syntax help and usage example
# https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners#codeowners-syntax

be/src/io/ @gavinchou @dataroaring
be/src/agent/be_exec_version_manager.cpp @BiteTheDDDDt
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman
**/pom.xml @CalvinKirs @morningman
fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @dataroaring @morningman @yiguolei @xiaokang
fe/fe-core/src/main/java/org/apache/doris/cloud/ @luwei16 @gavinchou @dearding @dataroaring
cloud/src/meta-service/*job* @luwei16 @gavinchou @dataroaring
cloud/src/meta-service/*meta_service_txn* @swjtu-zhanglei @gavinchou @dataroaring
cloud/src/meta-service/*resource* @deardeng @gavinchou @dataroaring
cloud/src/resource-manager/* @deardeng @gavinchou @dataroaring
cloud/src/recycler/* @swjtu-zhanglei @gavinchou @dataroaring
be/src/cloud/*compact* @luwei16 @gavinchou @dataroaring
be/src/cloud/*schema* @luwei16 @gavinchou @dataroaring
cloud/ @gavinchou @dataroaring @w41ter
be/src/cloud/ @gavinchou @dataroaring
gensrc/proto/olap_file.proto @gavinchou @dataroaring @yiguolei
gensrc/proto/cloud.proto @gavinchou @dataroaring @w41ter
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples
sh_checker_exclude: .git .github ^docker/compilation ^docker/runtime ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ cloud/cmake-build*/
cloud/ut_build*/

## tools
tools/ssb-tools/ssb-data/
tools/ssb-tools/ssb-dbgen/
tools/ssb-tools/bin/ssb-data/
tools/ssb-tools/bin/ssb-dbgen/
tools/ssb-tools/bin/*.tar.gz
tools/**/TPC-H_Tools_v*.zip
tools/**/TPC-H_Tools_v*/
tools/**/tpc-h_v*.docx
Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Doris
Copyright 2018-2024 The Apache Software Foundation
Copyright 2018-2025 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ See how to compile 🔗[Compilation](https://doris.apache.org/docs/dev/install/

### 📮 Install

See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/standard-deployment)
See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/cluster-deployment/standard-deployment)

## 🧩 Components

Expand Down
2 changes: 0 additions & 2 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@

namespace doris {

constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
Expand Down
45 changes: 41 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -481,6 +482,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -1657,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
MonotonicStopWatch watch;
watch.start();
auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
std::ostringstream rowset_ids_stream;
bool found = false;
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
continue;
}
found = true;
auto clean_rowsets = tablet->get_snapshot_rowset(true);
// Get first 10 rowset IDs as comma-separated string, just for log
int count = 0;
for (const auto& rowset : clean_rowsets) {
if (count >= 10) break;
if (count > 0) {
rowset_ids_stream << ",";
}
rowset_ids_stream << rowset->rowset_id().to_string();
count++;
}

CloudTablet::recycle_cached_data(std::move(clean_rowsets));
break;
}

if (!found) {
LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
<< ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
<< " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
<< static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

Expand Down
146 changes: 9 additions & 137 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -221,146 +222,17 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
return 0;
}

input_rowsets->clear();
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();

int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;

for (const auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
total_size = 0;
continue;
}
}

*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

transient_size += 1;
input_rowsets->push_back(rowset);

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
input_rowsets->clear();
*compaction_score = 0;
total_size = 0;
continue;
}
return transient_size;
} else if (
*compaction_score >=
config::compaction_max_rowset_count) { // If the number of rowsets is too large: FDB_ERROR_CODE_TXN_TOO_LARGE
return transient_size;
}
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
return transient_size;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}

// Condition 3: level1 achieve compaction_goal_size
std::vector<RowsetSharedPtr> level1_rowsets;
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
int64_t continuous_size = 0;
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
level1_rowsets.push_back(rowset);
continuous_size += rs_meta->total_disk_size();
if (level1_rowsets.size() >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
return transient_size;
}
}

input_rowsets->clear();
// Condition 5: If their are many empty rowsets, maybe should be compacted
tablet->calc_consecutive_empty_rowsets(
input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets->empty()) {
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
<< ", there are too many consecutive empty rowsets, size is "
<< input_rowsets->size();
return 0;
}
*compaction_score = 0;

return 0;
return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
input_rowsets, last_delete_version, compaction_score, allow_delete);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t first_level = 0;
for (size_t i = 0; i < input_rowsets.size(); i++) {
int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
if (i == 0) {
first_level = cur_level;
} else {
if (first_level != cur_level) {
LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level
<< ", cur_level: " << cur_level;
}
}
}
return first_level + 1;
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) {
return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
input_rowsets, output_rowset);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
Expand Down
18 changes: 15 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class CloudCumulativeCompactionPolicy {
Version& last_delete_version,
int64_t last_cumulative_point) = 0;

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;
virtual int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) = 0;

virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -52,6 +54,8 @@ class CloudCumulativeCompactionPolicy {
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) = 0;

virtual std::string name() = 0;
};

class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
Expand All @@ -68,7 +72,9 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override {
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {
return 0;
}

Expand All @@ -80,6 +86,8 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "size_based"; }

private:
int64_t _level_size(const int64_t size);

Expand All @@ -105,7 +113,9 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override;

int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -114,6 +124,8 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "time_series"; }
};

#include "common/compile_check_end.h"
Expand Down
Loading

0 comments on commit e5c3d03

Please sign in to comment.