Skip to content

Commit

Permalink
Merge branch 'master' into increase-max_broker_concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Dec 9, 2024
2 parents 7de9574 + 2c279f2 commit a92c919
Show file tree
Hide file tree
Showing 509 changed files with 22,014 additions and 3,562 deletions.
4 changes: 3 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,11 +1630,13 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
dropped_tablet->tablet_uid());
LOG_INFO("successfully drop tablet")
.tag("signature", req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id);
.tag("tablet_id", drop_tablet_req.tablet_id)
.tag("replica_id", drop_tablet_req.replica_id);
} else {
LOG_WARNING("failed to drop tablet")
.tag("signature", req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id)
.tag("replica_id", drop_tablet_req.replica_id)
.error(status);
}

Expand Down
10 changes: 6 additions & 4 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "util/uuid_generator.h"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
Expand Down Expand Up @@ -375,11 +376,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
Expand Down Expand Up @@ -490,8 +489,10 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
double process_memory_usage =
cast_set<double>(doris::GlobalMemoryArbitrator::process_memory_usage());
bool memory_usage_high =
process_memory_usage > cast_set<double>(MemInfo::soft_mem_limit()) * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
Expand Down Expand Up @@ -621,4 +622,5 @@ void CloudCumulativeCompaction::do_lease() {
}
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/compaction.h"

namespace doris {
#include "common/compile_check_begin.h"

class CloudCumulativeCompaction : public CloudCompactionMixin {
public:
Expand Down Expand Up @@ -60,4 +61,5 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {
Version _last_delete_version {-1, -1};
};

#include "common/compile_check_end.h"
} // namespace doris
12 changes: 7 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "olap/tablet_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
Expand All @@ -48,7 +49,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}

int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -114,8 +115,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
size_t new_compaction_score = *compaction_score;
while (rs_begin != input_rowsets->end()) {
auto& rs_meta = (*rs_begin)->rowset_meta();
int current_level = _level_size(rs_meta->total_disk_size());
int remain_level = _level_size(total_size - rs_meta->total_disk_size());
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, input rowsets contain current rowset
// and process return; otherwise, input rowsets do not contain current rowset.
if (current_level <= remain_level) {
Expand Down Expand Up @@ -185,7 +186,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
}

int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
int64_t promotion_size = int64_t(t->base_size() * _promotion_ratio);
int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
// promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
return promotion_size > _promotion_size ? _promotion_size
: promotion_size < _promotion_min_size ? _promotion_min_size
Expand Down Expand Up @@ -215,7 +216,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
: last_cumulative_point;
}

int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -377,4 +378,5 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
return output_rowset->end_version() + 1;
}

#include "common/compile_check_end.h"
} // namespace doris
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/rowset/rowset_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

class Tablet;
struct Version;
Expand All @@ -44,7 +45,7 @@ class CloudCumulativeCompactionPolicy {

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;

virtual int32_t pick_input_rowsets(CloudTablet* tablet,
virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -71,7 +72,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
return 0;
}

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand Down Expand Up @@ -106,7 +107,7 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -115,4 +116,5 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
bool allow_delete = false) override;
};

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "util/stopwatch.hpp"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

namespace {
Expand Down Expand Up @@ -177,4 +178,5 @@ void CloudDeleteBitmapAction::handle(HttpRequest* req) {
}
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "olap/tablet.h"

namespace doris {
#include "common/compile_check_begin.h"
class HttpRequest;

class ExecEnv;
Expand All @@ -52,4 +53,5 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
CloudStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
} // namespace doris
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {
#include "common/compile_check_begin.h"

CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
CloudStorageEngine& engine, const TCalcDeleteBitmapRequest& cal_delete_bitmap_req,
Expand Down Expand Up @@ -227,7 +228,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
}
}
auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
LOG(INFO) << "calculate delete bitmap successfully on tablet"
LOG(INFO) << "finish calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
Expand Down Expand Up @@ -325,4 +326,5 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
return status;
}

#include "common/compile_check_end.h"
} // namespace doris
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "util/thrift_rpc_helper.h"

namespace doris::cloud {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
Expand Down Expand Up @@ -717,7 +718,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
}
for (size_t i = 0; i < rowset_ids.size(); i++) {
for (int i = 0; i < rowset_ids.size(); i++) {
RowsetId rst_id;
rst_id.init(rowset_ids[i]);
delete_bitmap->merge(
Expand Down Expand Up @@ -757,10 +758,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
RowsetMetaPB doris_rs_meta =
RowsetMetaPB doris_rs_meta_tmp =
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
*existed_rs_meta = std::make_shared<RowsetMeta>();
(*existed_rs_meta)->init_from_pb(doris_rs_meta);
(*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp);
}
return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg());
}
Expand Down Expand Up @@ -1286,4 +1287,5 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
return total_inverted_index_size;
}

#include "common/compile_check_end.h"
} // namespace doris::cloud
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "util/s3_util.h"

namespace doris {
#include "common/compile_check_begin.h"

class DeleteBitmap;
class StreamLoadContext;
Expand Down Expand Up @@ -124,4 +125,5 @@ class CloudMetaMgr {
};

} // namespace cloud
#include "common/compile_check_end.h"
} // namespace doris
34 changes: 19 additions & 15 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "util/parse_util.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace std::literals;

Expand Down Expand Up @@ -166,7 +167,8 @@ Status CloudStorageEngine::open() {

_memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
// Use file cache disks number
_memtable_flush_executor->init(io::FileCacheFactory::instance()->get_cache_instance_size());
_memtable_flush_executor->init(
cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));

_calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init();
Expand Down Expand Up @@ -321,7 +323,7 @@ void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
for (const auto& rowset : rowsets) {
int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
if (rowset->newest_write_timestamp() + ttl_seconds <= UnixSeconds()) continue;
for (int64_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
auto hash = Segment::file_cache_key(rowset->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(hash);
file_cache->update_ttl_atime(hash);
Expand Down Expand Up @@ -350,11 +352,11 @@ void CloudStorageEngine::sync_storage_vault() {

for (auto& [id, vault_info, path_format] : vault_infos) {
auto fs = get_filesystem(id);
auto st = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id, path_format}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
vault_info);
if (!st.ok()) [[unlikely]] {
auto status = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id, path_format}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
vault_info);
if (!status.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
}
Expand Down Expand Up @@ -504,13 +506,13 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() {
/// If it is not cleaned up, the reference count of the tablet will always be greater than 1,
/// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep)
for (const auto& tablet : tablets_compaction) {
Status st = submit_compaction_task(tablet, compaction_type);
if (st.ok()) continue;
if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
Status status = submit_compaction_task(tablet, compaction_type);
if (status.ok()) continue;
if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
!status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
VLOG_DEBUG_IS_ON) {
LOG(WARNING) << "failed to submit compaction task for tablet: "
<< tablet->tablet_id() << ", err: " << st;
<< tablet->tablet_id() << ", err: " << status;
}
}
interval = config::generate_compaction_tasks_interval_ms;
Expand Down Expand Up @@ -544,7 +546,8 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
int num_cumu =
std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
[](int a, auto& b) { return a + b.second.size(); });
int num_base = submitted_base_compactions.size() + submitted_full_compactions.size();
int num_base =
cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
int n = thread_per_disk - num_cumu - num_base;
if (compaction_type == CompactionType::BASE_COMPACTION) {
// We need to reserve at least one thread for cumulative compaction,
Expand Down Expand Up @@ -822,7 +825,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// cumu
std::string_view cumu = "CumulativeCompaction";
rapidjson::Value cumu_key;
cumu_key.SetString(cumu.data(), cumu.length(), root.GetAllocator());
cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
rapidjson::Document cumu_arr;
cumu_arr.SetArray();
for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
Expand All @@ -834,7 +837,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// base
std::string_view base = "BaseCompaction";
rapidjson::Value base_key;
base_key.SetString(base.data(), base.length(), root.GetAllocator());
base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
rapidjson::Document base_arr;
base_arr.SetArray();
for (auto& [tablet_id, _] : _submitted_base_compactions) {
Expand All @@ -857,4 +860,5 @@ std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compac
return _cumulative_compaction_policies.at(compaction_policy);
}

#include "common/compile_check_end.h"
} // namespace doris
Loading

0 comments on commit a92c919

Please sign in to comment.