Skip to content

Commit

Permalink
[Refactor](query) refactor lock in fragment mgr and change std::unord…
Browse files Browse the repository at this point in the history
…er_map to phmap
  • Loading branch information
HappenLee committed Dec 1, 2024
1 parent 8751287 commit ab0a948
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 99 deletions.
163 changes: 79 additions & 84 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,11 @@ void FragmentMgr::stop() {

// Only me can delete
{
std::lock_guard<std::mutex> lock(_lock);
std::unique_lock lock(_query_ctx_map_mutex);
_query_ctx_map.clear();
}
{
std::unique_lock lock(_pipeline_map_mutex);
_pipeline_map.clear();
}
_thread_pool->shutdown();
Expand Down Expand Up @@ -583,11 +586,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
std::shared_ptr<QueryContext> q_ctx = nullptr;
{
std::lock_guard<std::mutex> lock(_lock);
q_ctx = _get_or_erase_query_ctx(query_id);
}
auto q_ctx = get_query_ctx(query_id);
if (q_ctx) {
q_ctx->set_ready_to_execute(Status::OK());
LOG_INFO("Query {} start execution", print_id(query_id));
Expand All @@ -602,37 +601,44 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r

void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
auto query_id = f_context->get_query_id();
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);

std::unique_lock lock(_pipeline_map_mutex);
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
}

std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(const TUniqueId& query_id) {
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
std::shared_lock lock(_query_ctx_map_mutex);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
if (auto q_ctx = search->second.lock()) {
return q_ctx;
} else {
LOG(WARNING) << "Query context (query id = " << print_id(query_id)
<< ") has been released.";
_query_ctx_map.erase(search);
return nullptr;
}
}
return nullptr;
}

std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
const TUniqueId& query_id) {
std::unique_lock<std::mutex> lock(_lock);
return _get_or_erase_query_ctx(query_id);
Result<std::shared_ptr<QueryContext>> FragmentMgr::_get_or_insert_query_ctx(
const doris::TUniqueId& query_id, std::shared_ptr<QueryContext>& query_ctx) {
std::unique_lock lock(_query_ctx_map_mutex);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
if (auto q_ctx = search->second.lock()) {
return q_ctx;
} else {
return ResultError(Status::InternalError(
"Query {} is already in query_ctx_map, but the weak_ptr is expired",
print_id(query_id)));
}
} else {
_query_ctx_map.insert({query_id, query_ctx});
}
return query_ctx;
}

template <typename Params>
Expand All @@ -645,8 +651,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
});
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InternalError(
Expand All @@ -657,8 +662,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
return Status::OK();
}
Expand Down Expand Up @@ -709,7 +713,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
query_ctx = DORIS_TRY(_get_or_insert_query_ctx(query_ctx->query_id(), query_ctx));
}
return Status::OK();
}
Expand All @@ -723,13 +727,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
std::lock_guard<std::mutex> lock(_lock);
fmt::format_to(debug_string_buffer,
"{} pipeline fragment contexts are still running! duration_limit={}\n",
_pipeline_map.size(), duration);

timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);

std::shared_lock lock(_pipeline_map_mutex);
for (auto& it : _pipeline_map) {
auto elapsed = it.second->elapsed_time() / 1000000000.0;
if (elapsed < duration) {
Expand All @@ -748,7 +752,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
}

std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format(
Expand Down Expand Up @@ -800,16 +804,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}

{
// (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
std::lock_guard<std::mutex> lock(_lock);
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment query_id({}) input duplicated fragment_id({})",
print_id(params.query_id), params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}

Expand All @@ -818,7 +814,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
.count();
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
// TODO: simplify this mapping

// (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
std::unique_lock lock(_pipeline_map_mutex);
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment query_id({}) input duplicated fragment_id({})",
print_id(params.query_id), params.fragment_id);
}
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}

Expand Down Expand Up @@ -848,8 +852,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
std::shared_ptr<QueryContext> query_ctx = nullptr;
std::vector<TUniqueId> all_instance_ids;
{
std::lock_guard<std::mutex> state_lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
Expand All @@ -862,7 +865,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
}
query_ctx->cancel(reason);
{
std::lock_guard<std::mutex> state_lock(_lock);
std::unique_lock l(_query_ctx_map_mutex);
_query_ctx_map.erase(query_id);
}
LOG(INFO) << "Query " << print_id(query_id)
Expand Down Expand Up @@ -898,7 +901,7 @@ void FragmentMgr::cancel_worker() {

std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
{
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_pipeline_map_mutex);
ctx.reserve(_pipeline_map.size());
for (auto& pipeline_itr : _pipeline_map) {
ctx.push_back(pipeline_itr.second);
Expand All @@ -910,29 +913,34 @@ void FragmentMgr::cancel_worker() {

std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
{
std::lock_guard<std::mutex> lock(_lock);
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (auto q_ctx = it->second.lock()) {
if (q_ctx->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
queries_timeout.push_back(it->first);
} else if (config::enable_brpc_connection_check) {
auto brpc_stubs = q_ctx->get_using_brpc_stubs();
for (auto& item : brpc_stubs) {
if (!brpc_stub_with_queries.contains(item.second)) {
brpc_stub_with_queries.emplace(item.second,
BrpcItem {item.first, {q_ctx}});
} else {
brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
{
// TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
// do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
std::unique_lock lock(_query_ctx_map_mutex);
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (auto q_ctx = it->second.lock()) {
if (q_ctx->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
queries_timeout.push_back(it->first);
} else if (config::enable_brpc_connection_check) {
auto brpc_stubs = q_ctx->get_using_brpc_stubs();
for (auto& item : brpc_stubs) {
if (!brpc_stub_with_queries.contains(item.second)) {
brpc_stub_with_queries.emplace(item.second,
BrpcItem {item.first, {q_ctx}});
} else {
brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
}
}
}
++it;
} else {
it = _query_ctx_map.erase(it);
}
++it;
} else {
it = _query_ctx_map.erase(it);
}
}

std::shared_lock lock(_query_ctx_map_mutex);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
Expand Down Expand Up @@ -1215,7 +1223,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,

const auto& fragment_ids = request->fragment_ids();
{
std::unique_lock<std::mutex> lock(_lock);
std::shared_lock lock(_pipeline_map_mutex);
for (auto fragment_id : fragment_ids) {
auto iter =
_pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
Expand Down Expand Up @@ -1267,8 +1275,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
Expand All @@ -1291,8 +1298,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
Expand All @@ -1312,8 +1318,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
Expand All @@ -1330,7 +1335,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,

void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
std::unique_lock lock(_query_ctx_map_mutex);
for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) {
if (auto q_ctx = iter->second.lock()) {
WorkloadQueryInfo workload_query_info;
Expand All @@ -1353,19 +1358,9 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
return Status::InvalidArgument("exes_status is nullptr");
}

std::shared_ptr<QueryContext> query_context = nullptr;

{
std::lock_guard<std::mutex> lock(_lock);
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_context = q_ctx;
} else {
return Status::NotFound("Query {} has been released", print_id(query_id));
}
}

std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
if (query_context == nullptr) {
return Status::NotFound("Query {} not found", print_id(query_id));
return Status::NotFound("Query {} not found or released", print_id(query_id));
}

*exec_status = query_context->get_realtime_exec_status();
Expand Down
24 changes: 13 additions & 11 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class FragmentMgr : public RestMonitorIface {
ThreadPool* get_thread_pool() { return _thread_pool.get(); }

int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
std::shared_lock lock(_query_ctx_map_mutex);
return _query_ctx_map.size();
}

Expand All @@ -145,15 +145,16 @@ class FragmentMgr : public RestMonitorIface {
Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);

std::shared_ptr<QueryContext> get_or_erase_query_ctx_with_lock(const TUniqueId& query_id);
std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);

private:
struct BrpcItem {
TNetworkAddress network_address;
std::vector<std::weak_ptr<QueryContext>> queries;
};

std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& query_id);
Result<std::shared_ptr<QueryContext>> _get_or_insert_query_ctx(
const TUniqueId& query_id, std::shared_ptr<QueryContext>& query_ctx);

template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
Expand All @@ -168,20 +169,21 @@ class FragmentMgr : public RestMonitorIface {
// This is input params
ExecEnv* _exec_env = nullptr;

// The lock protect the `_pipeline_map`
std::shared_mutex _pipeline_map_mutex;
// (QueryID, FragmentID) -> PipelineFragmentContext
phmap::flat_hash_map<std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>
_pipeline_map;

// The lock should only be used to protect the structures in fragment manager. Has to be
// used in a very small scope because it may dead lock. For example, if the _lock is used
// in prepare stage, the call path is prepare --> expr prepare --> may call allocator
// when allocate failed, allocator may call query_is_cancelled, query is callced will also
// call _lock, so that there is dead lock.
std::mutex _lock;

// (QueryID, FragmentID) -> PipelineFragmentContext
std::unordered_map<std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>
_pipeline_map;

std::shared_mutex _query_ctx_map_mutex;
// query id -> QueryContext
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;

CountDownLatch _stop_background_threads_latch;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
_load_id.to_thrift());
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr;
WorkloadGroupPtr wg_ptr = nullptr;

Expand Down
Loading

0 comments on commit ab0a948

Please sign in to comment.