Skip to content

Commit

Permalink
add code
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Nov 14, 2024
1 parent db0288b commit cf0aa2f
Show file tree
Hide file tree
Showing 30 changed files with 701 additions and 206 deletions.
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
int64_t num_output_rows = 0;
int64_t size_output_rowsets = 0;
int64_t num_output_segments = 0;
int64_t index_size_output_rowsets = 0;
int64_t segment_size_output_rowsets = 0;
for (auto& rs : _output_rowsets) {
sc_job->add_txn_ids(rs->txn_id());
sc_job->add_output_versions(rs->end_version());
Expand All @@ -351,6 +353,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
sc_job->set_size_output_rowsets(size_output_rowsets);
sc_job->set_num_output_segments(num_output_segments);
sc_job->set_num_output_rowsets(_output_rowsets.size());
sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
}
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);
Expand Down
16 changes: 14 additions & 2 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.avg_row_length) {
if (tbl_status.__isset.data_length) {
srcs[i] = tbl_status.data_length;
datas[i] = srcs.data() + i;
} else {
Expand All @@ -248,7 +248,19 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
// max_data_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, null_datas)); }
// index_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, null_datas)); }
{
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.index_length) {
srcs[i] = tbl_status.index_length;
datas[i] = srcs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// data_free
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, null_datas)); }
// auto_increment
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
// tablet may not have cooldowned data, but the storage policy is set
tablet_info->__set_cooldown_term(_cooldown_conf.term);
}
tablet_info->__set_local_index_size(_tablet_meta->tablet_local_index_size());
tablet_info->__set_local_segment_size(_tablet_meta->tablet_local_segment_size());
tablet_info->__set_remote_index_size(_tablet_meta->tablet_remote_index_size());
tablet_info->__set_remote_segment_size(_tablet_meta->tablet_remote_segment_size());
}

void Tablet::report_error(const Status& st) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,10 @@ void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
t_tablet_stat.__set_total_version_count(tablet_info.total_version_count);
t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count);
t_tablet_stat.__set_visible_version(tablet_info.version);
t_tablet_stat.__set_local_index_size(tablet_info.local_index_size);
t_tablet_stat.__set_local_segment_size(tablet_info.local_segment_size);
t_tablet_stat.__set_remote_index_size(tablet_info.remote_index_size);
t_tablet_stat.__set_remote_segment_size(tablet_info.remote_segment_size);
};
for_each_tablet(handler, filter_all_tablets);

Expand Down
46 changes: 46 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
size_t tablet_local_size() const;
// Remote disk space occupied by tablet.
size_t tablet_remote_size() const;

size_t tablet_local_index_size() const;
size_t tablet_local_segment_size() const;
size_t tablet_remote_index_size() const;
size_t tablet_remote_segment_size() const;

size_t version_count() const;
size_t stale_version_count() const;
size_t version_count_cross_with_range(const Version& range) const;
Expand Down Expand Up @@ -667,6 +673,46 @@ inline size_t TabletMeta::tablet_remote_size() const {
return total_size;
}

inline size_t TabletMeta::tablet_local_index_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (rs->is_local()) {
total_size += rs->index_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_local_segment_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (rs->is_local()) {
total_size += rs->data_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_remote_index_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (!rs->is_local()) {
total_size += rs->index_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::tablet_remote_segment_size() const {
size_t total_size = 0;
for (auto& rs : _rs_metas) {
if (!rs->is_local()) {
total_size += rs->data_disk_size();
}
}
return total_size;
}

inline size_t TabletMeta::version_count() const {
return _rs_metas.size();
}
Expand Down
8 changes: 8 additions & 0 deletions cloud/src/meta-service/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ void stats_tablet_num_segs_key(const StatsTabletKeyInfo& in, std::string* out) {
stats_tablet_key(in, out);
encode_bytes(STATS_KEY_SUFFIX_NUM_SEGS, out);
}
void stats_tablet_index_size_key(const StatsTabletKeyInfo& in, std::string* out) {
stats_tablet_key(in, out);
encode_bytes(STATS_KEY_SUFFIX_INDEX_SIZE, out);
}
void stats_tablet_segment_size_key(const StatsTabletKeyInfo& in, std::string* out) {
stats_tablet_key(in, out);
encode_bytes(STATS_KEY_SUFFIX_SEGMENT_SIZE, out);
}

//==============================================================================
// Job keys
Expand Down
6 changes: 6 additions & 0 deletions cloud/src/meta-service/keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_rows" -> int64
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_rowsets" -> int64
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_segs" -> int64
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "index_size" -> int64
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "segment_size"-> int64
//
// 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
// 0x01 "recycle" ${instance_id} "partition" ${partition_id} -> RecyclePartitionPB
Expand Down Expand Up @@ -83,6 +85,8 @@ static constexpr std::string_view STATS_KEY_SUFFIX_DATA_SIZE = "data_size";
static constexpr std::string_view STATS_KEY_SUFFIX_NUM_ROWS = "num_rows";
static constexpr std::string_view STATS_KEY_SUFFIX_NUM_ROWSETS = "num_rowsets";
static constexpr std::string_view STATS_KEY_SUFFIX_NUM_SEGS = "num_segs";
static constexpr std::string_view STATS_KEY_SUFFIX_INDEX_SIZE = "index_size";
static constexpr std::string_view STATS_KEY_SUFFIX_SEGMENT_SIZE = "segment_size";

// clang-format off
/**
Expand Down Expand Up @@ -247,6 +251,8 @@ void stats_tablet_data_size_key(const StatsTabletKeyInfo& in, std::string* out);
void stats_tablet_num_rows_key(const StatsTabletKeyInfo& in, std::string* out);
void stats_tablet_num_rowsets_key(const StatsTabletKeyInfo& in, std::string* out);
void stats_tablet_num_segs_key(const StatsTabletKeyInfo& in, std::string* out);
void stats_tablet_index_size_key(const StatsTabletKeyInfo& in, std::string* out);
void stats_tablet_segment_size_key(const StatsTabletKeyInfo& in, std::string* out);
static inline std::string stats_tablet_key(const StatsTabletKeyInfo& in) { std::string s; stats_tablet_key(in, &s); return s; }

void job_recycle_key(const JobRecycleKeyInfo& in, std::string* out);
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,8 @@ void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* contro
#ifdef NDEBUG
// Force data size >= 0 to reduce the losses caused by bugs
if (tablet_stats->data_size() < 0) tablet_stats->set_data_size(0);
if (tablet_stats->index_size() < 0) tablet_stats->set_index_size(0);
if (tablet_stats->segment_size() < 0) tablet_stats->set_segment_size(0);
#endif
}
}
Expand Down
20 changes: 20 additions & 0 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_cumu_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::BASE) {
Expand All @@ -710,6 +712,8 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_base_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
Expand All @@ -724,6 +728,8 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_full_compaction_time_ms(now * 1000);
// clang-format on
} else {
Expand All @@ -738,10 +744,14 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets=" << compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
txn->put(stats_key, stats_val);
Expand All @@ -752,10 +762,14 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets="
<< compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
Expand Down Expand Up @@ -1133,6 +1147,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
int64_t size_remove_rowsets = 0;
int64_t num_remove_rowsets = 0;
int64_t num_remove_segments = 0;
int64_t index_size_remove_rowsets = 0;
int64_t segment_size_remove_rowsets = 0;

auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
auto rs_end = meta_rowset_key({instance_id, new_tablet_id, schema_change.alter_version() + 1});
Expand Down Expand Up @@ -1165,6 +1181,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
size_remove_rowsets += rs.data_disk_size();
++num_remove_rowsets;
num_remove_segments += rs.num_segments();
index_size_remove_rowsets += rs.index_disk_size();
segment_size_remove_rowsets += rs.data_disk_size();

auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, rs.rowset_id_v2()});
RecycleRowsetPB recycle_rowset;
Expand Down Expand Up @@ -1199,6 +1217,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
stats->set_data_size(stats->data_size() + (schema_change.size_output_rowsets() - size_remove_rowsets));
stats->set_num_rowsets(stats->num_rowsets() + (schema_change.num_output_rowsets() - num_remove_rowsets));
stats->set_num_segments(stats->num_segments() + (schema_change.num_output_segments() - num_remove_segments));
stats->set_index_size(stats->index_size() + (schema_change.index_size_output_rowsets() - index_size_remove_rowsets));
stats->set_segment_size(stats->segment_size() + (schema_change.segment_size_output_rowsets() - segment_size_remove_rowsets));
// clang-format on
auto stats_key = stats_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
Expand Down
Loading

0 comments on commit cf0aa2f

Please sign in to comment.