Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr committed Dec 4, 2024
2 parents 40ef90e + abff86a commit dd76ef3
Show file tree
Hide file tree
Showing 156 changed files with 3,865 additions and 632 deletions.
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Status CloudBaseCompaction::prepare_compact() {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
Expand Down Expand Up @@ -320,6 +321,10 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
int64_t update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
_version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,23 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
int64_t num_output_rows = 0;
int64_t size_output_rowsets = 0;
int64_t num_output_segments = 0;
int64_t index_size_output_rowsets = 0;
int64_t segment_size_output_rowsets = 0;
for (auto& rs : _output_rowsets) {
sc_job->add_txn_ids(rs->txn_id());
sc_job->add_output_versions(rs->end_version());
num_output_rows += rs->num_rows();
size_output_rowsets += rs->total_disk_size();
num_output_segments += rs->num_segments();
index_size_output_rowsets += rs->index_disk_size();
segment_size_output_rowsets += rs->data_disk_size();
}
sc_job->set_num_output_rows(num_output_rows);
sc_job->set_size_output_rowsets(size_output_rowsets);
sc_job->set_num_output_segments(num_output_segments);
sc_job->set_num_output_rowsets(_output_rowsets.size());
sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
}
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ int CloudTablet::delete_expired_stale_rowsets() {
}

for (int64_t path_id : path_ids) {
int start_version = -1;
int end_version = -1;
int64_t start_version = -1;
int64_t end_version = -1;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
Expand Down
23 changes: 19 additions & 4 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "util/cgroup_util.h"
#include "util/error_util.h"

namespace doris {

Expand Down Expand Up @@ -84,14 +85,23 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
: _mount_file_dir(std::move(mount_file_dir)) {}

Status read_memory_limit(int64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
value));
std::filesystem::path file_path = _mount_file_dir / "memory.max";
std::string line;
std::ifstream file_stream(file_path, std::ios::in);
getline(file_stream, line);
if (file_stream.fail() || file_stream.bad()) {
return Status::CgroupError("Error reading {}: {}", file_path.string(),
get_str_err_msg());
}
if (line == "max") {
*value = std::numeric_limits<int64_t>::max();
return Status::OK();
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(file_path, value));
return Status::OK();
}

Status read_memory_usage(int64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), value));
std::unordered_map<std::string, int64_t> metrics_map;
Expand All @@ -100,7 +110,12 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
if (*value < metrics_map["inactive_file"]) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
// the reason why we subtract inactive_file described here:
// https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
*value -= metrics_map["inactive_file"];
// Part of "slab" that might be reclaimed, such as dentries and inodes.
// https://arthurchiao.art/blog/cgroupv2-zh/
*value -= metrics_map["slab_reclaimable"];
return Status::OK();
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
Expand All @@ -144,9 +143,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}
Expand Down
7 changes: 1 addition & 6 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ class SchemaScanner {
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
Status get_next_block_async(RuntimeState* state);

protected:
Expand Down Expand Up @@ -141,7 +137,6 @@ class SchemaScanner {
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
Expand Down
16 changes: 14 additions & 2 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.avg_row_length) {
if (tbl_status.__isset.data_length) {
srcs[i] = tbl_status.data_length;
datas[i] = srcs.data() + i;
} else {
Expand All @@ -248,7 +248,19 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
// max_data_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, null_datas)); }
// index_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, null_datas)); }
{
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.index_length) {
srcs[i] = tbl_status.index_length;
datas[i] = srcs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// data_free
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, null_datas)); }
// auto_increment
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1987,6 +1987,12 @@ Status SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_
return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col);
}

void SegmentIterator::_clear_iterators() {
_column_iterators.clear();
_bitmap_index_iterators.clear();
_inverted_index_iterators.clear();
}

Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
Expand Down Expand Up @@ -2093,6 +2099,8 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
}
block->clear_column_data();
// clear and release iterators memory footprint in advance
_clear_iterators();
return Status::EndOfFile("no more data in segment");
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ class SegmentIterator : public RowwiseIterator {

void _calculate_expr_in_remaining_conjunct_root();

void _clear_iterators();

class BitmapRangeIterator;
class BackwardBitmapRangeIterator;

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
// tablet may not have cooldowned data, but the storage policy is set
tablet_info->__set_cooldown_term(_cooldown_conf.term);
}
tablet_info->__set_local_index_size(_tablet_meta->tablet_local_index_size());
tablet_info->__set_local_segment_size(_tablet_meta->tablet_local_segment_size());
tablet_info->__set_remote_index_size(_tablet_meta->tablet_remote_index_size());
tablet_info->__set_remote_segment_size(_tablet_meta->tablet_remote_segment_size());
}

void Tablet::report_error(const Status& st) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,10 @@ void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
t_tablet_stat.__set_total_version_count(tablet_info.total_version_count);
t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count);
t_tablet_stat.__set_visible_version(tablet_info.version);
t_tablet_stat.__set_local_index_size(tablet_info.local_index_size);
t_tablet_stat.__set_local_segment_size(tablet_info.local_segment_size);
t_tablet_stat.__set_remote_index_size(tablet_info.remote_index_size);
t_tablet_stat.__set_remote_segment_size(tablet_info.remote_segment_size);
};
for_each_tablet(handler, filter_all_tablets);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
RowsetId rst_id;
rst_id.init(tablet_meta_pb.delete_bitmap().rowset_ids(i));
auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i);
uint32_t ver = tablet_meta_pb.delete_bitmap().versions(i);
auto ver = tablet_meta_pb.delete_bitmap().versions(i);
auto bitmap = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data();
delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap);
}
Expand Down
46 changes: 46 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ class TabletMeta {
size_t tablet_local_size() const;
// Remote disk space occupied by tablet.
size_t tablet_remote_size() const;

size_t tablet_local_index_size() const;
size_t tablet_local_segment_size() const;
size_t tablet_remote_index_size() const;
size_t tablet_remote_segment_size() const;

size_t version_count() const;
size_t stale_version_count() const;
size_t version_count_cross_with_range(const Version& range) const;
Expand Down Expand Up @@ -668,6 +674,46 @@ inline size_t TabletMeta::tablet_remote_size() const {
return total_size;
}

inline size_t TabletMeta::tablet_local_index_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (rs->is_local()) {
total_size += rs->index_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_local_segment_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (rs->is_local()) {
total_size += rs->data_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_remote_index_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (!rs->is_local()) {
total_size += rs->index_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_remote_segment_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (!rs->is_local()) {
total_size += rs->data_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::version_count() const {
return _rs_metas.size();
}
Expand Down
31 changes: 18 additions & 13 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,29 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
return Base::close(state, exec_status);
}

if (_should_build_hash_table) {
if (state->get_task()->wake_up_by_downstream()) {
if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
} else if (_should_build_hash_table) {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
return Base::close(state, exec_status);
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
// new one scanner
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());

_schema_scanner->set_dependency(_data_dependency, _finish_dependency);
_schema_scanner->set_dependency(_data_dependency);
if (nullptr == _schema_scanner) {
return Status::InternalError("schema scanner get nullptr pointer.");
}
Expand Down Expand Up @@ -266,9 +266,6 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
Loading

0 comments on commit dd76ef3

Please sign in to comment.