Skip to content

Commit

Permalink
[Refactor](query) refactor lock in fragment mgr and change std::unord…
Browse files Browse the repository at this point in the history
…er_map to phmap (#45069)

### What problem does this PR solve?

Related PR: #44821
  • Loading branch information
HappenLee authored Dec 19, 2024
1 parent e91a810 commit 9272c65
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
71 changes: 47 additions & 24 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,17 @@ void FragmentMgr::stop() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.clear();
_query_ctx_map.clear();
for (auto& pipeline : _pipeline_map) {
pipeline.second->close_sink();
}
_pipeline_map.clear();
}

{
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.clear();
}

_async_report_thread_pool->shutdown();
}

Expand Down Expand Up @@ -620,11 +625,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());

LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id()));

if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
}
if (all_done && query_ctx) {
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}

// Callback after remove from this id
Expand Down Expand Up @@ -713,8 +718,10 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
query_id.__set_lo(request->query_id().lo());
std::shared_ptr<QueryContext> q_ctx = nullptr;
{
std::lock_guard<std::mutex> lock(_lock);

TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
Expand All @@ -732,22 +739,24 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
auto* q_context = f_context->get_query_ctx();
bool all_done = false;
TUniqueId query_id = f_context->get_query_id();
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
all_done = q_context->countdown(ins_ids.size());
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id),
print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
if (all_done) {
LOG_INFO("Query {} finished", print_id(query_id));
_query_ctx_map.erase(query_id);
}
}
if (all_done) {
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_id);
LOG_INFO("Query {} finished", print_id(query_id));
}
}

Expand All @@ -759,7 +768,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
{ return Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
Expand All @@ -771,7 +780,16 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
{
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
return Status::OK();
}
}

std::unique_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
Expand Down Expand Up @@ -1170,7 +1188,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query
}

std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& query_id) {
std::lock_guard<std::mutex> state_lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
return ctx->second;
Expand All @@ -1184,7 +1202,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
std::shared_ptr<QueryContext> query_ctx;
std::vector<TUniqueId> all_instance_ids;
{
std::lock_guard<std::mutex> state_lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto ctx_iter = _query_ctx_map.find(query_id);

if (ctx_iter == _query_ctx_map.end()) {
Expand Down Expand Up @@ -1251,7 +1269,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id,

void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
const PPlanFragmentCancelReason& reason, const std::string& msg) {
std::unique_lock<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto q_ctx_iter = _query_ctx_map.find(query_id);
if (q_ctx_iter != _query_ctx_map.end()) {
// Has to use value to keep the shared ptr not deconstructed.
Expand Down Expand Up @@ -1315,6 +1333,9 @@ void FragmentMgr::cancel_worker() {
pipeline_itr.second->clear_finished_tasks();
}
}
}
{
std::unique_lock lock(_query_ctx_map_lock);
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
Expand All @@ -1335,7 +1356,9 @@ void FragmentMgr::cancel_worker() {
++it;
}
}

}
{
std::shared_lock lock(_query_ctx_map_lock);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
Expand Down Expand Up @@ -1773,7 +1796,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished",
Expand All @@ -1796,7 +1819,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
Expand All @@ -1819,7 +1842,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
Expand Down Expand Up @@ -1914,7 +1937,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag

void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class FragmentMgr : public RestMonitorIface {
std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);

int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
std::shared_lock ctx_lock(_query_ctx_map_lock);
return _query_ctx_map.size();
}

Expand Down Expand Up @@ -201,8 +201,9 @@ class FragmentMgr : public RestMonitorIface {

std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;

std::shared_mutex _query_ctx_map_lock;
// query id -> QueryContext
std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;

CountDownLatch _stop_background_threads_latch;
Expand Down

0 comments on commit 9272c65

Please sign in to comment.