From ab0a9484e23d5abeebd6c36a90c13d69e70e68ea Mon Sep 17 00:00:00 2001 From: HappenLee Date: Sun, 1 Dec 2024 00:23:37 +0800 Subject: [PATCH] [Refactor](query) refactor lock in fragment mgr and change std::unorder_map to phmap --- be/src/runtime/fragment_mgr.cpp | 163 +++++++++++++++---------------- be/src/runtime/fragment_mgr.h | 24 ++--- be/src/runtime/load_channel.cpp | 3 +- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/runtime_state.cpp | 2 +- 5 files changed, 95 insertions(+), 99 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1e72fa756d3dd3c..0ccb90a0f89f700 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -269,8 +269,11 @@ void FragmentMgr::stop() { // Only me can delete { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); _query_ctx_map.clear(); + } + { + std::unique_lock lock(_pipeline_map_mutex); _pipeline_map.clear(); } _thread_pool->shutdown(); @@ -583,11 +586,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); - std::shared_ptr q_ctx = nullptr; - { - std::lock_guard lock(_lock); - q_ctx = _get_or_erase_query_ctx(query_id); - } + auto q_ctx = get_query_ctx(query_id); if (q_ctx) { q_ctx->set_ready_to_execute(Status::OK()); LOG_INFO("Query {} start execution", print_id(query_id)); @@ -602,37 +601,44 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { - { - std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - g_fragment_executing_count << -1; - g_fragment_last_active_time.set_value(now); - _pipeline_map.erase({query_id, f_context->get_fragment_id()}); - } + auto query_id = f_context->get_query_id(); + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + g_fragment_executing_count << -1; + g_fragment_last_active_time.set_value(now); + + std::unique_lock lock(_pipeline_map_mutex); + _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } -std::shared_ptr FragmentMgr::_get_or_erase_query_ctx(const TUniqueId& query_id) { +std::shared_ptr FragmentMgr::get_query_ctx(const TUniqueId& query_id) { + std::shared_lock lock(_query_ctx_map_mutex); auto search = _query_ctx_map.find(query_id); if (search != _query_ctx_map.end()) { if (auto q_ctx = search->second.lock()) { return q_ctx; - } else { - LOG(WARNING) << "Query context (query id = " << print_id(query_id) - << ") has been released."; - _query_ctx_map.erase(search); - return nullptr; } } return nullptr; } -std::shared_ptr FragmentMgr::get_or_erase_query_ctx_with_lock( - const TUniqueId& query_id) { - std::unique_lock lock(_lock); - return _get_or_erase_query_ctx(query_id); +Result> FragmentMgr::_get_or_insert_query_ctx( + const doris::TUniqueId& query_id, std::shared_ptr& query_ctx) { + std::unique_lock lock(_query_ctx_map_mutex); + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + if (auto q_ctx = search->second.lock()) { + return q_ctx; + } else { + return ResultError(Status::InternalError( + "Query {} is already in query_ctx_map, but the weak_ptr is expired", + print_id(query_id))); + } + } else { + _query_ctx_map.insert({query_id, query_ctx}); + } + return query_ctx; } template @@ -645,8 +651,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo }); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::InternalError( @@ -657,8 +662,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // Find _query_ctx_map, in case some other request has already // create the query fragments context. - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; return Status::OK(); } @@ -709,7 +713,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } // There is some logic in query ctx's dctor, we could not check if exists and delete the // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); + query_ctx = DORIS_TRY(_get_or_insert_query_ctx(query_ctx->query_id(), query_ctx)); } return Status::OK(); } @@ -723,13 +727,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { fmt::memory_buffer debug_string_buffer; size_t i = 0; { - std::lock_guard lock(_lock); fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running! duration_limit={}\n", _pipeline_map.size(), duration); - timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + + std::shared_lock lock(_pipeline_map_mutex); for (auto& it : _pipeline_map) { auto elapsed = it.second->elapsed_time() / 1000000000.0; if (elapsed < duration) { @@ -748,7 +752,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { } std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { return q_ctx->print_all_pipeline_context(); } else { return fmt::format( @@ -800,16 +804,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } { - // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. - std::lock_guard lock(_lock); for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError( - "exec_plan_fragment query_id({}) input duplicated fragment_id({})", - print_id(params.query_id), params.fragment_id); - } query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } @@ -818,7 +814,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, .count(); g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); - // TODO: simplify this mapping + + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. + std::unique_lock lock(_pipeline_map_mutex); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError( + "exec_plan_fragment query_id({}) input duplicated fragment_id({})", + print_id(params.query_id), params.fragment_id); + } _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } @@ -848,8 +852,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { std::shared_ptr query_ctx = nullptr; std::vector all_instance_ids; { - std::lock_guard state_lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; // Copy instanceids to avoid concurrent modification. // And to reduce the scope of lock. @@ -862,7 +865,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { } query_ctx->cancel(reason); { - std::lock_guard state_lock(_lock); + std::unique_lock l(_query_ctx_map_mutex); _query_ctx_map.erase(query_id); } LOG(INFO) << "Query " << print_id(query_id) @@ -898,7 +901,7 @@ void FragmentMgr::cancel_worker() { std::vector> ctx; { - std::lock_guard lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); ctx.reserve(_pipeline_map.size()); for (auto& pipeline_itr : _pipeline_map) { ctx.push_back(pipeline_itr.second); @@ -910,29 +913,34 @@ void FragmentMgr::cancel_worker() { std::unordered_map, BrpcItem> brpc_stub_with_queries; { - std::lock_guard lock(_lock); - for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { - if (auto q_ctx = it->second.lock()) { - if (q_ctx->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - queries_timeout.push_back(it->first); - } else if (config::enable_brpc_connection_check) { - auto brpc_stubs = q_ctx->get_using_brpc_stubs(); - for (auto& item : brpc_stubs) { - if (!brpc_stub_with_queries.contains(item.second)) { - brpc_stub_with_queries.emplace(item.second, - BrpcItem {item.first, {q_ctx}}); - } else { - brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx); + { + // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must + // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok + std::unique_lock lock(_query_ctx_map_mutex); + for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { + if (auto q_ctx = it->second.lock()) { + if (q_ctx->is_timeout(now)) { + LOG_WARNING("Query {} is timeout", print_id(it->first)); + queries_timeout.push_back(it->first); + } else if (config::enable_brpc_connection_check) { + auto brpc_stubs = q_ctx->get_using_brpc_stubs(); + for (auto& item : brpc_stubs) { + if (!brpc_stub_with_queries.contains(item.second)) { + brpc_stub_with_queries.emplace(item.second, + BrpcItem {item.first, {q_ctx}}); + } else { + brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx); + } } } + ++it; + } else { + it = _query_ctx_map.erase(it); } - ++it; - } else { - it = _query_ctx_map.erase(it); } } + std::shared_lock lock(_query_ctx_map_mutex); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1215,7 +1223,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, const auto& fragment_ids = request->fragment_ids(); { - std::unique_lock lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); for (auto fragment_id : fragment_ids) { auto iter = _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id}); @@ -1267,8 +1275,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1291,8 +1298,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1312,8 +1318,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1330,7 +1335,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) { if (auto q_ctx = iter->second.lock()) { WorkloadQueryInfo workload_query_info; @@ -1353,19 +1358,9 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, return Status::InvalidArgument("exes_status is nullptr"); } - std::shared_ptr query_context = nullptr; - - { - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_context = q_ctx; - } else { - return Status::NotFound("Query {} has been released", print_id(query_id)); - } - } - + std::shared_ptr query_context = get_query_ctx(query_id); if (query_context == nullptr) { - return Status::NotFound("Query {} not found", print_id(query_id)); + return Status::NotFound("Query {} not found or released", print_id(query_id)); } *exec_status = query_context->get_realtime_exec_status(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0eac0469683961e..6f6ae85a1533c7b 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -133,7 +133,7 @@ class FragmentMgr : public RestMonitorIface { ThreadPool* get_thread_pool() { return _thread_pool.get(); } int32_t running_query_num() { - std::unique_lock ctx_lock(_lock); + std::shared_lock lock(_query_ctx_map_mutex); return _query_ctx_map.size(); } @@ -145,7 +145,7 @@ class FragmentMgr : public RestMonitorIface { Status get_realtime_exec_status(const TUniqueId& query_id, TReportExecStatusParams* exec_status); - std::shared_ptr get_or_erase_query_ctx_with_lock(const TUniqueId& query_id); + std::shared_ptr get_query_ctx(const TUniqueId& query_id); private: struct BrpcItem { @@ -153,7 +153,8 @@ class FragmentMgr : public RestMonitorIface { std::vector> queries; }; - std::shared_ptr _get_or_erase_query_ctx(const TUniqueId& query_id); + Result> _get_or_insert_query_ctx( + const TUniqueId& query_id, std::shared_ptr& query_ctx); template void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); @@ -168,20 +169,21 @@ class FragmentMgr : public RestMonitorIface { // This is input params ExecEnv* _exec_env = nullptr; + // The lock protect the `_pipeline_map` + std::shared_mutex _pipeline_map_mutex; + // (QueryID, FragmentID) -> PipelineFragmentContext + phmap::flat_hash_map, + std::shared_ptr> + _pipeline_map; + // The lock should only be used to protect the structures in fragment manager. Has to be // used in a very small scope because it may dead lock. For example, if the _lock is used // in prepare stage, the call path is prepare --> expr prepare --> may call allocator // when allocate failed, allocator may call query_is_cancelled, query is callced will also // call _lock, so that there is dead lock. - std::mutex _lock; - - // (QueryID, FragmentID) -> PipelineFragmentContext - std::unordered_map, - std::shared_ptr> - _pipeline_map; - + std::shared_mutex _query_ctx_map_mutex; // query id -> QueryContext - std::unordered_map> _query_ctx_map; + phmap::flat_hash_map> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 9369c0c833c53cc..4ff83ff93dfe6f8 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock( - _load_id.to_thrift()); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift()); std::shared_ptr mem_tracker = nullptr; WorkloadGroupPtr wg_ptr = nullptr; diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 752e2ff95b29174..60da45fa685fbf0 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -428,7 +428,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); #ifndef BE_TEST std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); if (query_context != nullptr) { _query_thread_context = {load_tid, query_context->query_mem_tracker, query_context->workload_group()}; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 344180bad771ac5..072c6e87f740a69 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -295,7 +295,7 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt } std::weak_ptr RuntimeState::get_query_ctx_weak() { - return _exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id()); + return _exec_env->fragment_mgr()->get_query_ctx(_query_ctx->query_id()); } void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) {