Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] avoid BE OOM when handle large number of tablet writes (backport #48495) #48645

Merged
merged 1 commit into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,20 @@ CONF_mInt64(write_buffer_size, "104857600");
CONF_Int32(query_max_memory_limit_percent, "90");
CONF_Double(query_pool_spill_mem_limit_threshold, "1.0");
CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB
CONF_Int32(load_process_max_memory_limit_percent, "30"); // 30%
CONF_mBool(enable_new_load_on_memory_limit_exceeded, "true");
// It's is a soft limit, when this limit is hit,
// memtable in delta writer will be flush to reduce memory cost.
// Load memory beyond this limit is allowed.
CONF_Int32(load_process_max_memory_limit_percent, "30"); // 30%
// It's hard limit ratio, when this limit is hit, new loading task will be rejected.
// we can caculate and got the hard limit percent.
// E.g.
// load_process_max_memory_limit_percent is 30%,
// load_process_max_memory_hard_limit_ratio is 2.
// then hard limit percent is 30% * 2 = 60%.
// And when hard limit percent is larger than process limit percent,
// use process limit percent as hard limit percent.
CONF_mDouble(load_process_max_memory_hard_limit_ratio, "2");
CONF_mBool(enable_new_load_on_memory_limit_exceeded, "false");
CONF_Int64(compaction_max_memory_limit, "-1");
CONF_Int32(compaction_max_memory_limit_percent, "100");
CONF_Int64(compaction_memory_limit_per_worker, "2147483648"); // 2GB
Expand Down
8 changes: 5 additions & 3 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "runtime/load_channel.h"
#include "runtime/mem_tracker.h"
#include "storage/lake/tablet_manager.h"
#include "storage/utils.h"
#include "util/starrocks_metrics.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
Expand Down Expand Up @@ -117,7 +118,9 @@ void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest
auto it = _load_channels.find(load_id);
if (it != _load_channels.end()) {
channel = it->second;
} else if (!_mem_tracker->limit_exceeded() || config::enable_new_load_on_memory_limit_exceeded) {
} else if (!is_tracker_hit_hard_limit(_mem_tracker, config::load_process_max_memory_hard_limit_ratio) ||
config::enable_new_load_on_memory_limit_exceeded) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
int64_t mem_limit_in_req = request.has_load_mem_limit() ? request.load_mem_limit() : -1;
int64_t job_max_memory = calc_job_max_load_memory(mem_limit_in_req, _mem_tracker->limit());

Expand All @@ -135,8 +138,7 @@ void LoadChannelMgr::open(brpc::Controller* cntl, const PTabletWriterOpenRequest
response->mutable_status()->set_status_code(TStatusCode::MEM_LIMIT_EXCEEDED);
response->mutable_status()->add_error_msgs(
"memory limit exceeded, please reduce load frequency or increase config "
"`load_process_max_memory_limit_percent` or `load_process_max_memory_limit_bytes` "
"or add more BE nodes");
"`load_process_max_memory_hard_limit_ratio` or add more BE nodes");
return;
}
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t
// Delay the creation memtables until we write data.
// Because for the tablet which doesn't have any written data, we will not use their memtables.
if (_mem_table == nullptr) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
if (!config::enable_new_load_on_memory_limit_exceeded &&
is_tracker_hit_hard_limit(GlobalEnv::GetInstance()->load_mem_tracker(),
config::load_process_max_memory_hard_limit_ratio)) {
return Status::MemoryLimitExceeded(
"memory limit exceeded, please reduce load frequency or increase config "
"`load_process_max_memory_hard_limit_ratio` or add more BE nodes");
}
_reset_mem_table();
}
auto state = get_state();
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,14 @@ Status DeltaWriterImpl::write(const Chunk& chunk, const uint32_t* indexes, uint3
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);

if (_mem_table == nullptr) {
// When loading memory usage is larger than hard limit, we will reject new loading task.
if (!config::enable_new_load_on_memory_limit_exceeded &&
is_tracker_hit_hard_limit(GlobalEnv::GetInstance()->load_mem_tracker(),
config::load_process_max_memory_hard_limit_ratio)) {
return Status::MemoryLimitExceeded(
"memory limit exceeded, please reduce load frequency or increase config "
"`load_process_max_memory_hard_limit_ratio` or add more BE nodes");
}
RETURN_IF_ERROR(reset_memtable());
}
RETURN_IF_ERROR(check_partial_update_with_sort_key(chunk));
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "fs/fs.h"
#include "fs/fs_util.h"
#include "gutil/strings/substitute.h"
#include "runtime/mem_tracker.h"
#include "storage/olap_define.h"
#include "util/errno.h"
#include "util/string_parser.hpp"
Expand Down Expand Up @@ -390,4 +391,10 @@ std::string file_name(const std::string& fullpath) {
return path.filename().string();
}

bool is_tracker_hit_hard_limit(MemTracker* tracker, double hard_limit_ratio) {
hard_limit_ratio = std::max(hard_limit_ratio, 1.0);
return tracker->limit_exceeded_by_ratio((int64_t)(hard_limit_ratio * 100)) ||
(tracker->parent() != nullptr && tracker->parent()->limit_exceeded());
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/storage/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@

namespace starrocks {

class MemTracker;

const static int32_t g_power_table[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000};

class OlapStopWatch {
Expand Down Expand Up @@ -162,6 +164,8 @@ bool valid_bool(const std::string& value_str);
std::string parent_name(const std::string& fullpath);
std::string file_name(const std::string& fullpath);

bool is_tracker_hit_hard_limit(MemTracker* tracker, double hard_limit_ratio);

// Util used to get string name of thrift enum item
#define EnumToString(enum_type, index, out) \
do { \
Expand Down
31 changes: 31 additions & 0 deletions be/test/storage/lake/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,4 +609,35 @@ TEST_F(LakeDeltaWriterTest, test_memtable_full) {
ASSERT_GT(txnlog->op_write().rowset().data_size(), 0);
}

TEST_F(LakeDeltaWriterTest, test_write_oom) {
// Prepare data for writing
static const int kChunkSize = 128;
auto chunk0 = generate_data(kChunkSize);
auto indexes = std::vector<uint32_t>(kChunkSize);
for (int i = 0; i < kChunkSize; i++) {
indexes[i] = i;
}

// Create and open DeltaWriter
auto txn_id = next_id();
auto tablet_id = _tablet_metadata->id();
int64_t old_limit = GlobalEnv::GetInstance()->load_mem_tracker()->limit();
GlobalEnv::GetInstance()->load_mem_tracker()->set_limit(1);
GlobalEnv::GetInstance()->load_mem_tracker()->consume(100);
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_immutable_tablet_size(1)
.build());
ASSERT_OK(delta_writer->open());
// Write and flush
ASSERT_ERROR(delta_writer->write(chunk0, indexes.data(), indexes.size()));
GlobalEnv::GetInstance()->load_mem_tracker()->release(100);
GlobalEnv::GetInstance()->load_mem_tracker()->set_limit(old_limit);
}

} // namespace starrocks::lake
16 changes: 16 additions & 0 deletions be/test/storage/utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <gtest/gtest.h>

#include "runtime/mem_tracker.h"

namespace starrocks {
class TestUtils : public ::testing::Test {};
TEST_F(TestUtils, test_valid_decimal) {
Expand All @@ -41,4 +43,18 @@ TEST_F(TestUtils, test_valid_decimal) {
ASSERT_TRUE(valid_decimal("31.4", 3, 1));
ASSERT_TRUE(valid_decimal("314.15925", 8, 5));
}

TEST_F(TestUtils, test_is_tracker_hit_hard_limit) {
std::unique_ptr<MemTracker> tracker = std::make_unique<MemTracker>(1000, "test", nullptr);
tracker->consume(2000);
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 0.1));
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 1.1));
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 1.5));
ASSERT_TRUE(is_tracker_hit_hard_limit(tracker.get(), 1.7));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 2));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 2.5));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 3));
ASSERT_TRUE(!is_tracker_hit_hard_limit(tracker.get(), 4));
}

} // namespace starrocks
Loading