Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 2, 2024
1 parent d7f43e8 commit 0531b4a
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 209 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/metadata_adder.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ class MetadataAdder {

virtual int64_t get_metadata_size() const { return sizeof(T); }

void update_metadata_size();
virtual void update_metadata_size();

MetadataAdder<T>& operator=(const MetadataAdder<T>& other) = default;

private:
int64_t _current_meta_size {0};

void add_mem_size(int64_t val);
Expand Down
18 changes: 14 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ int64_t Segment::get_metadata_size() const {
(_pk_index_meta ? _pk_index_meta->ByteSizeLong() : 0);
}

void Segment::update_metadata_size() {
int64_t old_size = _current_meta_size;
_current_meta_size = get_metadata_size();
int64_t size_diff = _current_meta_size - old_size;

add_mem_size(size_diff);

g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage;
_tracked_meta_mem_usage = _meta_mem_usage;
}

Status Segment::_open() {
_footer_pb = std::make_unique<SegmentFooterPB>();
RETURN_IF_ERROR(_parse_footer(_footer_pb.get()));
Expand All @@ -195,17 +206,15 @@ Status Segment::_open() {
_meta_mem_usage += _pk_index_meta->ByteSizeLong();
}

update_metadata_size();

_meta_mem_usage += sizeof(*this);
_meta_mem_usage += _tablet_schema->num_columns() * config::estimated_mem_per_column_reader;

// 1024 comes from SegmentWriterOptions
_meta_mem_usage += (_num_rows + 1023) / 1024 * (36 + 4);
// 0.01 comes from PrimaryKeyIndexBuilder::init
_meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage;
_tracked_meta_mem_usage = _meta_mem_usage;

update_metadata_size();

return Status::OK();
}
Expand Down Expand Up @@ -473,6 +482,7 @@ Status Segment::_load_pk_bloom_filter() {
// for BE UT "segment_cache_test"
return _load_pk_bf_once.call([this] {
_meta_mem_usage += 100;
update_metadata_size();
return Status::OK();
});
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
~Segment();

int64_t get_metadata_size() const override;
void update_metadata_size() override;

Status new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter);
Expand Down
18 changes: 7 additions & 11 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,20 @@ std::string MemTrackerLimiter::print_address_sanitizers() {
RuntimeProfile* MemTrackerLimiter::make_profile(RuntimeProfile* profile) const {
RuntimeProfile* profile_snapshot = profile->create_child(
fmt::format("{}@{}@id={}", _label, type_string(_type), _uid.to_string()), true, false);
RuntimeProfile::Counter* current_usage_counter =
ADD_COUNTER(profile_snapshot, "CurrentUsage", TUnit::BYTES);
RuntimeProfile::Counter* peak_usage_counter =
ADD_COUNTER(profile_snapshot, "PeakUsage", TUnit::BYTES);
COUNTER_SET(current_usage_counter, consumption());
COUNTER_SET(peak_usage_counter, peak_consumption());
RuntimeProfile::HighWaterMarkCounter* usage_counter =
profile_snapshot->AddHighWaterMarkCounter("Memory", TUnit::BYTES);
COUNTER_SET(usage_counter, peak_consumption());
COUNTER_SET(usage_counter, consumption());
if (has_limit()) {
RuntimeProfile::Counter* limit_counter =
ADD_COUNTER(profile_snapshot, "Limit", TUnit::BYTES);
COUNTER_SET(limit_counter, _limit);
}
if (reserved_peak_consumption() != 0) {
RuntimeProfile::Counter* reserved_counter =
ADD_COUNTER(profile_snapshot, "ReservedMemory", TUnit::BYTES);
RuntimeProfile::Counter* reserved_peak_counter =
ADD_COUNTER(profile_snapshot, "ReservedPeakMemory", TUnit::BYTES);
RuntimeProfile::HighWaterMarkCounter* reserved_counter =
profile_snapshot->AddHighWaterMarkCounter("ReservedMemory", TUnit::BYTES);
COUNTER_SET(reserved_counter, reserved_peak_consumption());
COUNTER_SET(reserved_counter, reserved_consumption());
COUNTER_SET(reserved_peak_counter, reserved_peak_consumption());
}
return profile_snapshot;
}
Expand Down
271 changes: 100 additions & 171 deletions be/src/runtime/memory/memory_profile.cpp

Large diffs are not rendered by default.

46 changes: 38 additions & 8 deletions be/src/runtime/memory/memory_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ class MemoryProfile {
}

std::string print_global_memory_profile() const {
return return_memory_profile_str(_global_memory_profile.get());
return return_memory_profile_str(_global_memory_profile.get().get());
}

std::string print_metadata_memory_profile() const {
return return_memory_profile_str(_metadata_memory_profile.get());
return return_memory_profile_str(_metadata_memory_profile.get().get());
}

std::string print_cache_memory_profile() const {
return return_memory_profile_str(_cache_memory_profile.get());
return return_memory_profile_str(_cache_memory_profile.get().get());
}

std::string print_top_memory_tasks_profile() const {
return return_memory_profile_str(_top_memory_tasks_profile.get());
return return_memory_profile_str(_top_memory_tasks_profile.get().get());
}

std::string print_tasks_memory_profile() const {
return return_memory_profile_str(_tasks_memory_profile.get());
return return_memory_profile_str(_tasks_memory_profile.get().get());
}

static int64_t query_current_usage();
Expand All @@ -67,20 +67,50 @@ class MemoryProfile {
void print_log_process_usage();

private:
std::string return_memory_profile_str(
const std::shared_ptr<const RuntimeProfile>& profile) const {
std::string return_memory_profile_str(const RuntimeProfile* profile) const {
std::stringstream ss;
profile->pretty_print(&ss);
return ss.str();
}

MultiVersion<RuntimeProfile> _memory_overview_profile;
void init_memory_overview_counter();

std::unique_ptr<RuntimeProfile> _memory_overview_profile;
MultiVersion<RuntimeProfile> _global_memory_profile;
MultiVersion<RuntimeProfile> _metadata_memory_profile;
MultiVersion<RuntimeProfile> _cache_memory_profile;
MultiVersion<RuntimeProfile> _top_memory_tasks_profile;
MultiVersion<RuntimeProfile> _tasks_memory_profile;

// process memory counter
RuntimeProfile::HighWaterMarkCounter* _process_physical_memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _process_virtual_memory_usage_counter;

// untracked/tracked memory counter
RuntimeProfile::HighWaterMarkCounter* _untracked_memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _tracked_memory_usage_counter;

// Jemalloc memory counter
RuntimeProfile::HighWaterMarkCounter* _jemalloc_memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _jemalloc_cache_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _jemalloc_metadata_usage_counter;

// global/metadata/cache memory counter
RuntimeProfile::HighWaterMarkCounter* _global_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _metadata_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _cache_usage_counter;

// tasks memory counter
RuntimeProfile::HighWaterMarkCounter* _tasks_memory_usage_counter;
// reserved memory is the sum of all task reserved memory, is duplicated with all task memory counter.
RuntimeProfile::HighWaterMarkCounter* _reserved_memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _query_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _load_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _load_all_memtables_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _compaction_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _schema_change_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _other_usage_counter;

std::atomic<bool> _enable_print_log_process_usage {true};
};

Expand Down
4 changes: 2 additions & 2 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <type_traits>

#include "common/object_pool.h"
#include "util/container_util.hpp"
Expand Down Expand Up @@ -72,8 +73,7 @@ void RuntimeProfile::merge(RuntimeProfile* other) {
dst_iter = _counter_map.find(src_iter->first);

if (dst_iter == _counter_map.end()) {
_counter_map[src_iter->first] = _pool->add(
new Counter(src_iter->second->type(), src_iter->second->value()));
_counter_map[src_iter->first] = _pool->add(src_iter->second->clone());
} else {
DCHECK(dst_iter->second->type() == src_iter->second->type());

Expand Down
45 changes: 34 additions & 11 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class RuntimeProfile {
: _value(value), _type(type), _level(level) {}
virtual ~Counter() = default;

virtual Counter* clone() const { return new Counter(type(), value(), _level); }

virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }

void bit_or(int64_t delta) { _value.fetch_or(delta, std::memory_order_relaxed); }
Expand Down Expand Up @@ -137,7 +139,7 @@ class RuntimeProfile {

TUnit::type type() const { return _type; }

virtual int64_t level() { return _level; }
virtual int64_t level() const { return _level; }

private:
friend class RuntimeProfile;
Expand All @@ -151,8 +153,16 @@ class RuntimeProfile {
/// as value()) and the current value.
class HighWaterMarkCounter : public Counter {
public:
HighWaterMarkCounter(TUnit::type unit, int64_t level, const std::string& parent_name)
: Counter(unit, 0, level), current_value_(0), _parent_name(parent_name) {}
HighWaterMarkCounter(TUnit::type unit, int64_t level, const std::string& parent_name,
int64_t value = 0, int64_t current_value = 0)
: Counter(unit, value, level),
current_value_(current_value),
_parent_name(parent_name) {}

virtual Counter* clone() const override {
return new HighWaterMarkCounter(type(), level(), parent_name(), value(),
current_value());
}

void add(int64_t delta) {
current_value_.fetch_add(delta, std::memory_order_relaxed);
Expand Down Expand Up @@ -188,10 +198,9 @@ class RuntimeProfile {
virtual void pretty_print(std::ostream* s, const std::string& prefix,
const std::string& name) const override {
std::ostream& stream = *s;
stream << prefix << " - " << name << ": "
<< PrettyPrinter::print(current_value(), type()) << std::endl;
stream << prefix << " - " << name << "Peak: "
<< PrettyPrinter::print(_value.load(std::memory_order_relaxed), type())
stream << prefix << " - " << name
<< " Current: " << PrettyPrinter::print(current_value(), type()) << " (Peak: "
<< PrettyPrinter::print(_value.load(std::memory_order_relaxed), type()) << ")"
<< std::endl;
}

Expand All @@ -217,6 +226,8 @@ class RuntimeProfile {

int64_t current_value() const { return current_value_.load(std::memory_order_relaxed); }

std::string parent_name() const { return _parent_name; }

private:
/// Set '_value' to 'v' if 'v' is larger than '_value'. The entire operation is
/// atomic.
Expand Down Expand Up @@ -247,8 +258,13 @@ class RuntimeProfile {
// Do not call Set() and Update().
class DerivedCounter : public Counter {
public:
DerivedCounter(TUnit::type type, const DerivedCounterFunction& counter_fn)
: Counter(type, 0), _counter_fn(counter_fn) {}
DerivedCounter(TUnit::type type, const DerivedCounterFunction& counter_fn,
int64_t value = 0, int64_t level = 1)
: Counter(type, value, level), _counter_fn(counter_fn) {}

virtual Counter* clone() const override {
return new DerivedCounter(type(), _counter_fn, value(), level());
}

int64_t value() const override { return _counter_fn(); }

Expand All @@ -259,8 +275,13 @@ class RuntimeProfile {
// NonZeroCounter will not be converted to Thrift if the value is 0.
class NonZeroCounter : public Counter {
public:
NonZeroCounter(TUnit::type type, int64_t level, const std::string& parent_name)
: Counter(type, 0, level), _parent_name(parent_name) {}
NonZeroCounter(TUnit::type type, int64_t level, const std::string& parent_name,
int64_t value = 0)
: Counter(type, value, level), _parent_name(parent_name) {}

virtual Counter* clone() const override {
return new NonZeroCounter(type(), level(), parent_name(), value());
}

void to_thrift(const std::string& name, std::vector<TCounter>& tcounters,
std::map<std::string, std::set<std::string>>& child_counters_map) override {
Expand All @@ -272,6 +293,8 @@ class RuntimeProfile {
}
}

std::string parent_name() const { return _parent_name; }

private:
const std::string _parent_name;
};
Expand Down

0 comments on commit 0531b4a

Please sign in to comment.