Skip to content

Commit

Permalink
[Refactor] Remove column pool (#51105)
Browse files Browse the repository at this point in the history
Why I'm doing:
ColumnPool is designed to address performance issues caused by frequent cross thread memory alloc/free.

Tcmalloc has a CentrlHeap, which results in high lock contention overhead. Currently it has been already been replaced by Jemalloc, which is now an independent arena and no longer has this issue. In version 3.2, ColumnPool` has been disabled by default, and there have been no performance issues in performance testing or user production environments.

So in main branch, it can be safely removed.

What I'm doing:
Remove column pool

Signed-off-by: trueeyu <[email protected]>
  • Loading branch information
trueeyu authored Sep 20, 2024
1 parent 1edc42b commit 37d72ac
Show file tree
Hide file tree
Showing 30 changed files with 37 additions and 740 deletions.
453 changes: 0 additions & 453 deletions be/src/column/column_pool.h

This file was deleted.

2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ CONF_mBool(enable_bitmap_index_memory_page_cache, "false");
CONF_mBool(enable_zonemap_index_memory_page_cache, "false");
// whether to enable the ordinal index memory cache
CONF_mBool(enable_ordinal_index_memory_page_cache, "false");
// whether to disable column pool
CONF_Bool(disable_column_pool, "true");

CONF_mInt32(base_compaction_check_interval_seconds, "60");
CONF_mInt64(min_base_compaction_num_singleton_deltas, "5");
Expand Down
43 changes: 4 additions & 39 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

#include "block_cache/block_cache.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "common/config.h"
#include "common/minidump.h"
#include "exec/workgroup/work_group.h"
Expand Down Expand Up @@ -85,36 +84,6 @@ std::atomic<bool> k_starrocks_exit = false;
// but also waiting for all threads to exit gracefully.
std::atomic<bool> k_starrocks_exit_quick = false;

class ReleaseColumnPool {
public:
explicit ReleaseColumnPool(double ratio) : _ratio(ratio) {}

template <typename Pool>
void operator()() {
_freed_bytes += Pool::singleton()->release_free_columns(_ratio);
}

size_t freed_bytes() const { return _freed_bytes; }

private:
double _ratio;
size_t _freed_bytes = 0;
};

void gc_memory(void* arg_this) {
using namespace starrocks;
const static float kFreeRatio = 0.5;

auto* daemon = static_cast<Daemon*>(arg_this);
while (!daemon->stopped()) {
nap_sleep(config::memory_maintenance_sleep_time_s, [daemon] { return daemon->stopped(); });

ReleaseColumnPool releaser(kFreeRatio);
ForEach<ColumnPoolList>(releaser);
LOG_IF(INFO, releaser.freed_bytes() > 0) << "Released " << releaser.freed_bytes() << " bytes from column pool";
}
}

/*
* This thread will calculate some metrics at a fix interval(15 sec)
* 1. push bytes per second
Expand Down Expand Up @@ -198,15 +167,15 @@ void calculate_metrics(void* arg_this) {

LOG(INFO) << fmt::format(
"Current memory statistics: process({}), query_pool({}), load({}), "
"metadata({}), compaction({}), schema_change({}), column_pool({}), "
"metadata({}), compaction({}), schema_change({}), "
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({}), "
"datacache({}), jit({})",
mem_metrics->process_mem_bytes.value(), mem_metrics->query_mem_bytes.value(),
mem_metrics->load_mem_bytes.value(), mem_metrics->metadata_mem_bytes.value(),
mem_metrics->compaction_mem_bytes.value(), mem_metrics->schema_change_mem_bytes.value(),
mem_metrics->column_pool_mem_bytes.value(), mem_metrics->storage_page_cache_mem_bytes.value(),
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->storage_page_cache_mem_bytes.value(), mem_metrics->update_mem_bytes.value(),
mem_metrics->chunk_allocator_mem_bytes.value(), mem_metrics->clone_mem_bytes.value(),
mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->jit_cache_mem_bytes.value());

nap_sleep(15, [daemon] { return daemon->stopped(); });
Expand Down Expand Up @@ -307,10 +276,6 @@ void Daemon::init(bool as_cn, const std::vector<StorePath>& paths) {

TimezoneUtils::init_time_zones();

std::thread gc_thread(gc_memory, this);
Thread::set_thread_name(gc_thread, "gc_daemon");
_daemon_threads.emplace_back(std::move(gc_thread));

init_starrocks_metrics(paths);

if (config::enable_metric_calculator) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ void LakeDataSource::close(RuntimeState* state) {
}

Status LakeDataSource::get_next(RuntimeState* state, ChunkPtr* chunk) {
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(),
_runtime_state->use_column_pool()));
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size()));
auto* chunk_ptr = chunk->get();

do {
Expand Down
15 changes: 4 additions & 11 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <thread>

#include "column/column_access_path.h"
#include "column/column_pool.h"
#include "column/type_traits.h"
#include "common/compiler_util.h"
#include "common/status.h"
Expand Down Expand Up @@ -159,7 +158,6 @@ Status OlapScanNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());

bool first_call = !_start;
if (!_start && _status.ok()) {
Status status = _start_scan(state);
_update_status(status);
Expand Down Expand Up @@ -210,7 +208,7 @@ Status OlapScanNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool* eos) {
// is the first time of calling `get_next`, pass the second argument of `_fill_chunk_pool` as
// true to ensure that the newly allocated column objects will be returned back into the column
// pool.
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(1, first_call && state->use_column_pool()));
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(1));
eval_join_runtime_filters(chunk);
_num_rows_returned += (*chunk)->num_rows();
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
Expand Down Expand Up @@ -256,11 +254,6 @@ void OlapScanNode::close(RuntimeState* state) {
chunk.reset();
}

if (runtime_state() != nullptr) {
// Reduce the memory usage if the the average string size is greater than 512.
release_large_columns<BinaryColumn>(runtime_state()->chunk_size() * 512);
}

for (const auto& rowsets_per_tablet : _tablet_rowsets) {
Rowset::release_readers(rowsets_per_tablet);
}
Expand All @@ -275,10 +268,10 @@ OlapScanNode::~OlapScanNode() {
DCHECK(is_closed());
}

void OlapScanNode::_fill_chunk_pool(int count, bool force_column_pool) {
void OlapScanNode::_fill_chunk_pool(int count) {
const size_t capacity = runtime_state()->chunk_size();
for (int i = 0; i < count; i++) {
ChunkPtr chunk(ChunkHelper::new_chunk_pooled(*_chunk_schema, capacity, force_column_pool));
ChunkPtr chunk(ChunkHelper::new_chunk_pooled(*_chunk_schema, capacity));
{
std::lock_guard<std::mutex> l(_mtx);
_chunk_pool.push(std::move(chunk));
Expand Down Expand Up @@ -736,7 +729,7 @@ Status OlapScanNode::_start_scan_thread(RuntimeState* state) {
COUNTER_SET(_task_concurrency, (int64_t)concurrency);
int chunks = _chunks_per_scanner * concurrency;
_chunk_pool.reserve(chunks);
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(chunks, state->use_column_pool()));
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(chunks));
std::lock_guard<std::mutex> l(_mtx);
for (int i = 0; i < concurrency; i++) {
CHECK(_submit_scanner(_pending_scanners.pop(), true));
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class OlapScanNode final : public starrocks::ScanNode {
void _update_status(const Status& status);
Status _get_status();

void _fill_chunk_pool(int count, bool force_column_pool);
void _fill_chunk_pool(int count);
bool _submit_scanner(TabletScanner* scanner, bool blockable);
void _close_pending_scanners();

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,7 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
}

Status OlapChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(),
_runtime_state->use_column_pool()));
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size()));
auto scope = IOProfiler::scope(IOProfiler::TAG_QUERY, _tablet->tablet_id());
return _read_chunk_from_storage(_runtime_state, (*chunk).get());
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/tablet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <memory>
#include <utility>

#include "column/column_pool.h"
#include "column/vectorized_fwd.h"
#include "common/status.h"
#include "exec/olap_scan_node.h"
Expand Down Expand Up @@ -118,8 +117,6 @@ void TabletScanner::close(RuntimeState* state) {
_reader.reset();
_predicate_free_pool.clear();
Expr::close(_conjunct_ctxs, state);
// Reduce the memory usage if the the average string size is greater than 512.
release_large_columns<BinaryColumn>(state->chunk_size() * 512);
_is_closed = true;
}

Expand Down
1 change: 0 additions & 1 deletion be/src/http/action/memory_metrics_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ void MemoryMetricsAction::handle(HttpRequest* req) {
"bloom_filter_index",
"compaction",
"schema_change",
"column_pool",
"page_cache",
"datacache",
"update",
Expand Down
3 changes: 0 additions & 3 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ void mem_tracker_handler(MemTracker* mem_tracker, const WebPageHandler::Argument
} else if (iter->second == "clone") {
start_mem_tracker = GlobalEnv::GetInstance()->clone_mem_tracker();
cur_level = 2;
} else if (iter->second == "column_pool") {
start_mem_tracker = GlobalEnv::GetInstance()->column_pool_mem_tracker();
cur_level = 2;
} else if (iter->second == "page_cache") {
start_mem_tracker = GlobalEnv::GetInstance()->page_cache_mem_tracker();
cur_level = 2;
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "agent/agent_server.h"
#include "agent/master_info.h"
#include "block_cache/block_cache.h"
#include "column/column_pool.h"
#include "common/config.h"
#include "common/configbase.h"
#include "common/logging.h"
Expand Down Expand Up @@ -234,7 +233,6 @@ Status GlobalEnv::_init_mem_tracker() {
int64_t compaction_mem_limit = calc_max_compaction_memory(_process_mem_tracker->limit());
_compaction_mem_tracker = regist_tracker(compaction_mem_limit, "compaction", _process_mem_tracker.get());
_schema_change_mem_tracker = regist_tracker(-1, "schema_change", _process_mem_tracker.get());
_column_pool_mem_tracker = regist_tracker(-1, "column_pool", _process_mem_tracker.get());
_page_cache_mem_tracker = regist_tracker(-1, "page_cache", _process_mem_tracker.get());
_jit_cache_mem_tracker = regist_tracker(-1, "jit_cache", _process_mem_tracker.get());
int32_t update_mem_percent = std::max(std::min(100, config::update_memory_limit_percent), 0);
Expand All @@ -248,8 +246,6 @@ Status GlobalEnv::_init_mem_tracker() {

MemChunkAllocator::init_instance(_chunk_allocator_mem_tracker.get(), config::chunk_reserved_bytes_limit);

SetMemTrackerForColumnPool op(_column_pool_mem_tracker);
ForEach<ColumnPoolList>(op);
_init_storage_page_cache(); // TODO: move to StorageEngine
return Status::OK();
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class GlobalEnv {
MemTracker* short_key_index_mem_tracker() { return _short_key_index_mem_tracker.get(); }
MemTracker* compaction_mem_tracker() { return _compaction_mem_tracker.get(); }
MemTracker* schema_change_mem_tracker() { return _schema_change_mem_tracker.get(); }
MemTracker* column_pool_mem_tracker() { return _column_pool_mem_tracker.get(); }
MemTracker* page_cache_mem_tracker() { return _page_cache_mem_tracker.get(); }
MemTracker* jit_cache_mem_tracker() { return _jit_cache_mem_tracker.get(); }
MemTracker* update_mem_tracker() { return _update_mem_tracker.get(); }
Expand Down Expand Up @@ -199,9 +198,6 @@ class GlobalEnv {
// The memory used for schema change
std::shared_ptr<MemTracker> _schema_change_mem_tracker;

// The memory used for column pool
std::shared_ptr<MemTracker> _column_pool_mem_tracker;

// The memory used for page cache
std::shared_ptr<MemTracker> _page_cache_mem_tracker;

Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, c
query_options.query_type = TQueryType::EXTERNAL;
// For spark sql / flink sql, we dont use page cache.
query_options.use_page_cache = false;
query_options.use_column_pool = false;
query_options.enable_profile = config::enable_profile_for_external_plan;
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
Expand Down
11 changes: 0 additions & 11 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,6 @@ bool RuntimeState::use_page_cache() {
return true;
}

bool RuntimeState::use_column_pool() const {
if (config::disable_column_pool) {
return false;
}

if (_query_options.__isset.use_column_pool) {
return _query_options.use_column_pool;
}
return true;
}

Status RuntimeState::set_mem_limit_exceeded(MemTracker* tracker, int64_t failed_allocation_size, std::string_view msg) {
DCHECK_GE(failed_allocation_size, 0);
{
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class RuntimeState {
void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int chunk_size() const { return _query_options.batch_size; }
void set_chunk_size(int chunk_size) { _query_options.batch_size = chunk_size; }
bool use_column_pool() const;
bool abort_on_default_limit_exceeded() const { return _query_options.abort_on_default_limit_exceeded; }
int64_t timestamp_ms() const { return _timestamp_us / 1000; }
int64_t timestamp_us() const { return _timestamp_us; }
Expand Down
1 change: 0 additions & 1 deletion be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ void bind_exec_env(ForeignModule& m) {
REG_METHOD(GlobalEnv, metadata_mem_tracker);
REG_METHOD(GlobalEnv, compaction_mem_tracker);
REG_METHOD(GlobalEnv, schema_change_mem_tracker);
REG_METHOD(GlobalEnv, column_pool_mem_tracker);
REG_METHOD(GlobalEnv, page_cache_mem_tracker);
REG_METHOD(GlobalEnv, jit_cache_mem_tracker);
REG_METHOD(GlobalEnv, update_mem_tracker);
Expand Down
Loading

0 comments on commit 37d72ac

Please sign in to comment.