Skip to content

Commit

Permalink
Merge branch 'master' into drop_file
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 27, 2024
2 parents d64ad9c + 805520b commit 20511bd
Show file tree
Hide file tree
Showing 199 changed files with 4,402 additions and 2,422 deletions.
6 changes: 1 addition & 5 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ github:
- cloud_p0 (Doris Cloud Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Broker
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- Build Third Party Libraries (macOS-arm64)
Expand All @@ -80,7 +79,6 @@ github:
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- FE UT (Doris FE UT)
Expand All @@ -103,7 +101,6 @@ github:
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand All @@ -128,7 +125,6 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand Down
108 changes: 90 additions & 18 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr = std::make_shared<CloudTabletCalcDeleteBitmapTask>(
_engine, this, tablet_id, transaction_id, version);
_engine, this, tablet_id, transaction_id, version, partition.sub_txn_ids);
if (has_compaction_stats) {
tablet_calc_delete_bitmap_ptr->set_compaction_stats(
partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i],
Expand Down Expand Up @@ -107,12 +107,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {

CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
int64_t transaction_id, int64_t version)
int64_t transaction_id, int64_t version, const std::vector<int64_t>& sub_txn_ids)
: _engine(engine),
_engine_calc_delete_bitmap_task(engine_task),
_tablet_id(tablet_id),
_transaction_id(transaction_id),
_version(version) {
_version(version),
_sub_txn_ids(sub_txn_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
Expand Down Expand Up @@ -189,6 +190,60 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
return error_st;
}

int64_t t3 = MonotonicMicros();
Status status;
if (_sub_txn_ids.empty()) {
status = _handle_rowset(tablet, _version);
} else {
std::stringstream ss;
for (const auto& sub_txn_id : _sub_txn_ids) {
ss << sub_txn_id << ", ";
}
LOG(INFO) << "start calc delete bitmap for txn_id=" << _transaction_id << ", sub_txn_ids=["
<< ss.str() << "], table_id=" << tablet->table_id()
<< ", partition_id=" << tablet->partition_id() << ", tablet_id=" << _tablet_id
<< ", start_version=" << _version;
std::vector<RowsetSharedPtr> invisible_rowsets;
DeleteBitmapPtr tablet_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_meta()->delete_bitmap());
for (int i = 0; i < _sub_txn_ids.size(); ++i) {
int64_t sub_txn_id = _sub_txn_ids[i];
int64_t version = _version + i;
LOG(INFO) << "start calc delete bitmap for txn_id=" << _transaction_id
<< ", sub_txn_id=" << sub_txn_id << ", table_id=" << tablet->table_id()
<< ", partition_id=" << tablet->partition_id() << ", tablet_id=" << _tablet_id
<< ", start_version=" << _version << ", cur_version=" << version;
status = _handle_rowset(tablet, version, sub_txn_id, &invisible_rowsets,
tablet_delete_bitmap);
if (!status.ok()) {
LOG(INFO) << "failed to calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id()
<< ", transaction_id=" << _transaction_id << ", sub_txn_id=" << sub_txn_id
<< ", tablet_id=" << tablet->tablet_id() << ", start version=" << _version
<< ", cur_version=" << version << ", status=" << status;
return status;
}
DCHECK(invisible_rowsets.size() == i + 1);
}
}
auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
<< ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", total_update_delete_bitmap_time_us=" << total_update_delete_bitmap_time_us
<< ", res=" << status;
return status;
}

Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
std::shared_ptr<CloudTablet> tablet, int64_t version, int64_t sub_txn_id,
std::vector<RowsetSharedPtr>* invisible_rowsets,
DeleteBitmapPtr tablet_delete_bitmap) const {
int64_t transaction_id = sub_txn_id == -1 ? _transaction_id : sub_txn_id;
std::string txn_str = "txn_id=" + std::to_string(_transaction_id) +
(sub_txn_id == -1 ? "" : ", sub_txn_id=" + std::to_string(sub_txn_id));
RowsetSharedPtr rowset;
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
Expand All @@ -197,59 +252,76 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
int64_t txn_expiration;
TxnPublishInfo previous_publish_info;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info, &publish_status, &previous_publish_info);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", " << txn_str
<< ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

int64_t t3 = MonotonicMicros();
rowset->set_version(Version(_version, _version));
rowset->set_version(Version(version, version));
TabletTxnInfo txn_info;
txn_info.rowset = rowset;
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
txn_info.publish_status = publish_status;
txn_info.publish_info = {.publish_version = _version,
txn_info.publish_info = {.publish_version = version,
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
int64_t update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
_version == previous_publish_info.publish_version &&
version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
_ms_cumulative_compaction_cnt == previous_publish_info.cumulative_compaction_cnt &&
_ms_cumulative_point == previous_publish_info.cumulative_point) {
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id,
txn_expiration);
if (invisible_rowsets == nullptr) {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
txn_expiration);
} else {
txn_info.is_txn_load = true;
txn_info.invisible_rowsets = *invisible_rowsets;
txn_info.lock_id = _transaction_id;
txn_info.next_visible_version = _version;
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
txn_expiration, tablet_delete_bitmap);
}
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
<< ", status=" << status;
<< ", tablet_id=" << _tablet_id << ", " << txn_str << ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", table_id=" << tablet->table_id() << ", " << txn_str
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << rowset->num_rows()
<< ", get_tablet_time_us=" << get_tablet_time_us
<< ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us
<< ", res=" << status;
if (invisible_rowsets != nullptr) {
invisible_rowsets->push_back(rowset);
// see CloudTablet::save_delete_bitmap
auto dm = txn_info.delete_bitmap->delete_bitmap;
for (auto it = dm.begin(); it != dm.end(); ++it) {
if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
tablet_delete_bitmap->merge(
{std::get<0>(it->first), std::get<1>(it->first), version}, it->second);
}
}
}
return status;
}

Expand Down
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class CloudTabletCalcDeleteBitmapTask {
public:
CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
int64_t transaction_id, int64_t version);
int64_t transaction_id, int64_t version,
const std::vector<int64_t>& sub_txn_ids);
~CloudTabletCalcDeleteBitmapTask() = default;

void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t ms_cumulative_compaction_cnt,
Expand All @@ -43,12 +44,18 @@ class CloudTabletCalcDeleteBitmapTask {
Status handle() const;

private:
Status _handle_rowset(std::shared_ptr<CloudTablet> tablet, int64_t version,
int64_t sub_txn_id = -1,
std::vector<RowsetSharedPtr>* invisible_rowsets = nullptr,
DeleteBitmapPtr tablet_delete_bitmap = nullptr) const;

CloudStorageEngine& _engine;
CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;

int64_t _tablet_id;
int64_t _transaction_id;
int64_t _version;
std::vector<int64_t> _sub_txn_ids;

int64_t _ms_base_compaction_cnt {-1};
int64_t _ms_cumulative_compaction_cnt {-1};
Expand Down
10 changes: 6 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ int CloudTablet::delete_expired_stale_rowsets() {
}

for (int64_t path_id : path_ids) {
int start_version = -1;
int end_version = -1;
int64_t start_version = -1;
int64_t end_version = -1;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
Expand Down Expand Up @@ -690,7 +690,8 @@ CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {

Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
Expand All @@ -714,8 +715,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class CloudTablet final : public BaseTablet {

Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) override;
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1) override;

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
Expand Down
23 changes: 19 additions & 4 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "util/cgroup_util.h"
#include "util/error_util.h"

namespace doris {

Expand Down Expand Up @@ -84,14 +85,23 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
: _mount_file_dir(std::move(mount_file_dir)) {}

Status read_memory_limit(int64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
value));
std::filesystem::path file_path = _mount_file_dir / "memory.max";
std::string line;
std::ifstream file_stream(file_path, std::ios::in);
getline(file_stream, line);
if (file_stream.fail() || file_stream.bad()) {
return Status::CgroupError("Error reading {}: {}", file_path.string(),
get_str_err_msg());
}
if (line == "max") {
*value = std::numeric_limits<int64_t>::max();
return Status::OK();
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(file_path, value));
return Status::OK();
}

Status read_memory_usage(int64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), value));
std::unordered_map<std::string, int64_t> metrics_map;
Expand All @@ -100,7 +110,12 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
if (*value < metrics_map["inactive_file"]) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
// the reason why we subtract inactive_file described here:
// https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
*value -= metrics_map["inactive_file"];
// Part of "slab" that might be reclaimed, such as dentries and inodes.
// https://arthurchiao.art/blog/cgroupv2-zh/
*value -= metrics_map["slab_reclaimable"];
return Status::OK();
}

Expand Down
Loading

0 comments on commit 20511bd

Please sign in to comment.