Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha committed Jul 18, 2024
1 parent 9f12416 commit 8c91da0
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 6 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ CONF_Int32(query_max_memory_limit_percent, "90");
CONF_Double(query_pool_spill_mem_limit_threshold, "1.0");
CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB
CONF_Int32(load_process_max_memory_limit_percent, "30"); // 30%
// It's hard limit for loading, when this limit is hit, new loading task will be rejected.
CONF_mInt32(load_process_max_memory_hard_limit_percent, "60"); // 60%
CONF_mBool(enable_new_load_on_memory_limit_exceeded, "false");
CONF_Int64(compaction_max_memory_limit, "-1");
CONF_Int32(compaction_max_memory_limit_percent, "100");
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest
auto it = _load_channels.find(load_id);
if (it != _load_channels.end()) {
channel = it->second;
} else if (!_mem_tracker->limit_exceeded_by_ratio(200) || config::enable_new_load_on_memory_limit_exceeded) {
// If current loading memory usage is larger than 2x limit, reject new coming loading task.
} else if (!is_tracker_hit_hard_limit(_mem_tracker, config::load_process_max_memory_limit_percent,
config::load_process_max_memory_hard_limit_percent) ||
config::enable_new_load_on_memory_limit_exceeded) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
int64_t mem_limit_in_req = request.has_load_mem_limit() ? request.load_mem_limit() : -1;
int64_t job_max_memory = calc_job_max_load_memory(mem_limit_in_req, _mem_tracker->limit());

Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t
// Delay the creation memtables until we write data.
// Because for the tablet which doesn't have any written data, we will not use their memtables.
if (_mem_table == nullptr) {
if (GlobalEnv::GetInstance()->load_mem_tracker()->limit_exceeded_by_ratio(200) &&
!config::enable_new_load_on_memory_limit_exceeded) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
if (!config::enable_new_load_on_memory_limit_exceeded &&
is_tracker_hit_hard_limit(GlobalEnv::GetInstance()->load_mem_tracker(),
config::load_process_max_memory_limit_percent,
config::load_process_max_memory_hard_limit_percent)) {
return Status::MemoryLimitExceeded(
"memory limit exceeded, please reduce load frequency or increase config "
"`load_process_max_memory_limit_percent` or `load_process_max_memory_limit_bytes` "
Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,11 @@ Status DeltaWriterImpl::write(const Chunk& chunk, const uint32_t* indexes, uint3
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);

if (_mem_table == nullptr) {
if (GlobalEnv::GetInstance()->load_mem_tracker()->limit_exceeded_by_ratio(200) &&
!config::enable_new_load_on_memory_limit_exceeded) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
if (!config::enable_new_load_on_memory_limit_exceeded &&
is_tracker_hit_hard_limit(GlobalEnv::GetInstance()->load_mem_tracker(),
config::load_process_max_memory_limit_percent,
config::load_process_max_memory_hard_limit_percent)) {
return Status::MemoryLimitExceeded(
"memory limit exceeded, please reduce load frequency or increase config "
"`load_process_max_memory_limit_percent` or `load_process_max_memory_limit_bytes` "
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "fs/fs.h"
#include "fs/fs_util.h"
#include "gutil/strings/substitute.h"
#include "runtime/mem_tracker.h"
#include "storage/olap_define.h"
#include "util/errno.h"
#include "util/string_parser.hpp"
Expand Down Expand Up @@ -390,4 +391,9 @@ std::string file_name(const std::string& fullpath) {
return path.filename().string();
}

bool is_tracker_hit_hard_limit(MemTracker* tracker, int soft_limit_percent, int hard_limit_percent) {
int64_t limit_ratio = std::max(hard_limit_percent * 100 / soft_limit_percent, 100);
return tracker->limit_exceeded_by_ratio(limit_ratio);
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/storage/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@

namespace starrocks {

class MemTracker;

const static int32_t g_power_table[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000};

class OlapStopWatch {
Expand Down Expand Up @@ -162,6 +164,8 @@ bool valid_bool(const std::string& value_str);
std::string parent_name(const std::string& fullpath);
std::string file_name(const std::string& fullpath);

bool is_tracker_hit_hard_limit(MemTracker* tracker, int soft_limit_percent, int hard_limit_percent);

// Util used to get string name of thrift enum item
#define EnumToString(enum_type, index, out) \
do { \
Expand Down
14 changes: 14 additions & 0 deletions be/test/storage/utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <gtest/gtest.h>

#include "runtime/mem_tracker.h"

namespace starrocks {
class TestUtils : public ::testing::Test {};
TEST_F(TestUtils, test_valid_decimal) {
Expand All @@ -41,4 +43,16 @@ TEST_F(TestUtils, test_valid_decimal) {
ASSERT_TRUE(valid_decimal("31.4", 3, 1));
ASSERT_TRUE(valid_decimal("314.15925", 8, 5));
}

TEST_F(TestUtils, test_is_tracker_hit_hard_limit) {
std::unique_ptr<MemTracker> tracker = std::make_unique<MemTracker>(1000, "test", nullptr);
tracker->consume(2000);
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 30, 30));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 30, 40));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 30, 50));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 30, 60));
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 30, 65));
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 30, 70));
}

} // namespace starrocks

0 comments on commit 8c91da0

Please sign in to comment.